Ok, so the InstantRange would be an exact match one, with only the instants of compactions?
On Mon, Aug 26, 2024, 02:46 Danny Chan <danny0...@apache.org> wrote: > > So if it works as you say, then there would have to be some other > filtering > going on somewhere. > > It is set up as intention, because for mor tables, a commit that is > not a compaction (insert overwrite etc.) should also be read. > > > it calls getCdcInputSplits(metaClient, instantRange) which does not use > the > configured IncrementalQueryAnalyzer > > It utilities the InstantRange which got the filtered instants by > IncrementalQueryAnalyzer, maybe you can add a test case and fire a fix > for it. > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月21日周三 16:32写道: > > > > Thanks Danny. I think there may be a problem, as reading the code and > > having written a test to check it, it seems this is not happening. > > > > The CDCExtractor itself is fed an instant range and a boolean > > (consumeChangesFromCompaction ) for whether to consume from compaction > > instants. When consumeChangesFromCompaction is True is reads from > COMPACT *and > > *the other instant types like COMMIT etc ( > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > ). > > So if it works as you say, then there would have to be some other > filtering > > going on somewhere. > > > > I went to look for the code that would further filter down the instants > in > > some way. In the Flink code, the IncrementalInputSplits class uses the > > CDCExtractor. The interesting thing in this class is that the inputSplits > > method ( > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132 > ) > > uses the IncrementalQueryAnalyzer which is configured to read only from > the > > CDC changelog when the Flink option is enabled ( > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30 > ). > > However, importantly, when cdc is enabled ( > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315 > ), > > it calls getCdcInputSplits(metaClient, instantRange) which does not use > the > > configured IncrementalQueryAnalyzer to source the timeline. It just uses > > the HoodieTableMetaClient, so there is no additional timeline filtering, > > only what there is within HoodieCDCExtractor. > > > > I wrote a MOR test case in TestIncrementalSplits and it does indeed > return > > inference based splits (LOG_FILE) even when told to only read from CDC > > change log. This looks like a bug to me. > > > > Thanks > > Jack > > > > > > Thanks > > Jack > > > > On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote: > > > > > yeah, you are right, for mor table, when the cdc log is enabled, which > > > are generated during compaction, there are two choices for the reader: > > > > > > 1. read the changes from the change log, which got a hight TTL delay > > > because these logs are only generated during compaction; > > > 2. or it can infers the changes by itself, which got much shorter TTL > > > delay but more resource cost on the reader side. > > > > > > 1 and 2 are mutual exclusion, you can only enable either one of them. > > > > > > The reason we add some tricky logic in HoodieCDCExtractor is that when > > > we choose 1, the timeline needs to be filtered to only include > > > compaction commits; > > > while when 2 is enabled, the timeline needs to be filtered to exlucde > > > compaction commits. > > > > > > Best, > > > Danny > > > > > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道: > > > > > > > > Hi all, > > > > > > > > I'm trying to understand the CDC read process that Flink uses in > Hudi. > > > > According to the Flink option, READ_CDC_FROM_CHANGELOG ( > > > > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365 > > > ), > > > > when True, Flink should only read from CDC files, when false, it must > > > infer > > > > the deltas from the base and log files. But reading the code, the > > > > HoodieCDCExtractor creates file splits for both CDC files and for the > > > > inference cases ( > > > > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > > > ) > > > > when READ_CDC_FROM_CHANGELOG =True. > > > > > > > > This is confusing as it seems you would do one or the other but not > both. > > > > Why go through all the work of inferring deltas from base and log > files > > > > when perhaps a couple of commits ahead there is a compact instant > that > > > has > > > > it all precomputed? > > > > > > > > Thanks > > > > Jack > > > >