yihua commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1806020254
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -175,33 +177,27 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy =
(containsConfigProperty(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(
- getStringWithAltKeys(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
+ getStringWithAltKeys(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
: null;
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy =
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}
- // Use begin Instant if set and non-empty
- Option<String> beginInstant =
- lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty()
: lastCkptStr : Option.empty();
+ IncrementalQueryAnalyzer analyzer =
IncrSourceHelper.getIncrementalQueryAnalyzer(
+ sparkContext, srcPath, lastCkptStr, missingCheckpointStrategy,
+ getIntWithAltKeys(props,
HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
+ getLatestSourceProfile());
+ QueryContext queryContext = analyzer.analyze();
+ Option<InstantRange> instantRange = queryContext.getInstantRange();
- // If source profile exists, use the numInstants from source profile.
- final int numInstantsFromConfig = getIntWithAltKeys(props,
HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
- int numInstantsPerFetch = getLatestSourceProfile().map(sourceProfile -> {
- int numInstantsFromSourceProfile =
sourceProfile.getSourceSpecificContext();
- LOG.info("Overriding numInstantsPerFetch from source profile
numInstantsFromSourceProfile {} , numInstantsFromConfig {}",
numInstantsFromSourceProfile, numInstantsFromConfig);
- return numInstantsFromSourceProfile;
- }).orElse(numInstantsFromConfig);
-
- HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
- QueryInfo queryInfo = generateQueryInfo(sparkContext, srcPath,
- numInstantsPerFetch, beginInstant, missingCheckpointStrategy,
handlingMode,
- HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
- null, false, Option.empty());
-
- if (queryInfo.areStartAndEndInstantsEqual()) {
+ String endCompletionTime;
+ // analyzer.getBeginCompletionTime() is empty only when reading the latest
instant
+ // in the first batch
+ if (queryContext.isEmpty()
+ || (endCompletionTime = queryContext.getMaxCompletionTime())
+ .equals(analyzer.getStartCompletionTime().orElseGet(() -> null))) {
LOG.info("Already caught up. No new data to process");
- return Pair.of(Option.empty(), queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), analyzer.getStartCompletionTime().get());
Review Comment:
`analyzer.getStartCompletionTime().get()` and
`queryContext.getMaxCompletionTime()` are the same in this case, or
queryContext is empty. I changed it to `lastCkptStr.orElse(null)` for
readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]