yihua commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1797486335
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -175,33 +181,72 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy =
(containsConfigProperty(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
? IncrSourceHelper.MissingCheckpointStrategy.valueOf(
- getStringWithAltKeys(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
+ getStringWithAltKeys(props,
HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY))
: null;
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy =
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}
- // Use begin Instant if set and non-empty
- Option<String> beginInstant =
- lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty()
: lastCkptStr : Option.empty();
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration()))
+ .setBasePath(srcPath)
+ .setLoadActiveTimelineOnLoad(true)
+ .build();
+ int numInstantsFromConfig = getIntWithAltKeys(props,
HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH);
+
+ String startTime;
+ if (lastCkptStr.isPresent() && !lastCkptStr.get().isEmpty()) {
+ startTime = lastCkptStr.get();
+ } else if (missingCheckpointStrategy != null) {
+ switch (missingCheckpointStrategy) {
+ case READ_UPTO_LATEST_COMMIT:
+ startTime = DEFAULT_BEGIN_TIMESTAMP;
Review Comment:
After reading more, I see that `IncrSourceHelper#generateQueryInfo` contains
the following logic, introduced by #9336 and #9473. Should we consider that
too in the new completion-time-based logic? Basically, the last checkpoint can
be represented by `completionTime#key` for event incremental source.
```
// When `beginInstantTime` is present, `previousInstantTime` is set to
the completed commit before `beginInstantTime` if that exists.
// If there is no completed commit before `beginInstantTime`, e.g.,
`beginInstantTime` is the first commit in the active timeline,
// `previousInstantTime` is set to `DEFAULT_BEGIN_TIMESTAMP`.
String previousInstantTime = DEFAULT_BEGIN_TIMESTAMP;
if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
Option<HoodieInstant> previousInstant =
activeCommitTimeline.findInstantBefore(beginInstantTime);
if (previousInstant.isPresent()) {
previousInstantTime = previousInstant.get().getTimestamp();
} else {
// if begin instant time matches first entry in active timeline, we
can set previous = beginInstantTime - 1
if
(activeCommitTimeline.filterCompletedInstants().firstInstant().isPresent()
&&
activeCommitTimeline.filterCompletedInstants().firstInstant().get().getTimestamp().equals(beginInstantTime))
{
previousInstantTime =
String.valueOf(Long.parseLong(beginInstantTime) - 1);
}
}
}
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST
|| !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
Option<HoodieInstant> nthInstant;
// When we are in the upgrade code path from non-sourcelimit-based
batching to sourcelimit-based batching, we need to avoid fetching the commit
// that is read already. Else we will have duplicates in append-only
use case if we use "findInstantsAfterOrEquals".
// As soon as we have a new format of checkpoint and a key we will
move to the new code of fetching the current commit as well.
if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfterOrEquals(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
} else {
nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
}
return new
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
previousInstantTime,
beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
orderColumn, keyColumn, limitColumn);
} else {
// when MissingCheckpointStrategy is set to read everything until
latest, trigger snapshot query.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return new
QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
previousInstantTime, beginInstantTime,
lastInstant.get().getTimestamp(),
orderColumn, keyColumn, limitColumn);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]