hudi-agent commented on code in PR #18802:
URL: https://github.com/apache/hudi/pull/18802#discussion_r3278705438
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatchBasedOnRequestedTim
metricsOption.ifPresent(metrics ->
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
return coalesceOrRepartition(sourceWithMetaColumnsDropped,
sourceProfile.getSourcePartitions());
}).orElse(sourceWithMetaColumnsDropped);
+
+ maybeExtractUpstreamWatermarksRequestTime(srcPath,
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
return Pair.of(Option.of(src), new
StreamerCheckpointV1(queryInfo.getEndInstant()));
}
// Try to fetch the latestSourceProfile, this ensures the profile is
refreshed if it's no longer valid.
private Option<SourceProfile<Integer>> getLatestSourceProfile() {
return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
}
+
+ private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext
queryContext, String endCompletionTime) {
+ if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+ return;
+ }
+ if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+ return;
+ }
+ List<HoodieInstant> instants = queryContext.getInstants().stream()
+ .filter(i -> compareTimestamps(i.getCompletionTime(),
LESSER_THAN_OR_EQUALS, endCompletionTime))
+ .collect(Collectors.toList());
+ if (instants.isEmpty()) {
+ return;
+ }
+ this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor
+ .extractPerPartitionWatermarks(queryContext.getActiveTimeline(),
instants);
+ if (!this.upstreamEventTimeWatermarks.isEmpty()) {
+ log.info("Propagating upstream event-time watermarks for {} partition(s)
from {} commit(s)",
+ this.upstreamEventTimeWatermarks.size(), instants.size());
+ }
+ }
+
+ private void maybeExtractUpstreamWatermarksRequestTime(String srcPath,
String startInstant, String endInstant) {
+ if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+ return;
+ }
+ if (StringUtils.isNullOrEmpty(endInstant)) {
+ return;
+ }
+ String startExclusive = StringUtils.isNullOrEmpty(startInstant) ? "" :
startInstant;
+ try {
+ HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration()))
+ .setBasePath(srcPath)
+ .setLoadActiveTimelineOnLoad(true)
Review Comment:
🤖 `getCommitsTimeline()` only includes `COMMIT` + `DELTA_COMMIT` and
excludes `REPLACE_COMMIT` (clustering / insert_overwrite). If the upstream
table had clustering instants in this range that carried event-time stats, the
request-time path will silently skip them — while the completion-time path
above can include them via `queryContext.getInstants()`. Should this use
`getCommitsAndCompactionTimeline()` (or `getCommitAndReplaceTimeline()`) to
keep the two paths symmetric?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatchBasedOnRequestedTim
metricsOption.ifPresent(metrics ->
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
return coalesceOrRepartition(sourceWithMetaColumnsDropped,
sourceProfile.getSourcePartitions());
}).orElse(sourceWithMetaColumnsDropped);
+
+ maybeExtractUpstreamWatermarksRequestTime(srcPath,
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
return Pair.of(Option.of(src), new
StreamerCheckpointV1(queryInfo.getEndInstant()));
}
// Try to fetch the latestSourceProfile, this ensures the profile is
refreshed if it's no longer valid.
private Option<SourceProfile<Integer>> getLatestSourceProfile() {
return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
}
+
+ private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext
queryContext, String endCompletionTime) {
+ if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+ return;
+ }
+ if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+ return;
Review Comment:
🤖 Is there a reason the completion-time path doesn't wrap the extraction in
a try/catch the way the request-time path below does? Today any unexpected
exception here (e.g. from `compareTimestamps` or `getActiveTimeline()`) will
fail the source, while the same failure on the request-time path would only log
a warning. Worth keeping the best-effort posture symmetric across the two paths.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatchBasedOnRequestedTim
metricsOption.ifPresent(metrics ->
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
return coalesceOrRepartition(sourceWithMetaColumnsDropped,
sourceProfile.getSourcePartitions());
}).orElse(sourceWithMetaColumnsDropped);
+
+ maybeExtractUpstreamWatermarksRequestTime(srcPath,
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
return Pair.of(Option.of(src), new
StreamerCheckpointV1(queryInfo.getEndInstant()));
}
// Try to fetch the latestSourceProfile, this ensures the profile is
refreshed if it's no longer valid.
private Option<SourceProfile<Integer>> getLatestSourceProfile() {
return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
}
+
+ private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext
queryContext, String endCompletionTime) {
+ if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+ return;
+ }
+ if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+ return;
+ }
+ List<HoodieInstant> instants = queryContext.getInstants().stream()
+ .filter(i -> compareTimestamps(i.getCompletionTime(),
LESSER_THAN_OR_EQUALS, endCompletionTime))
+ .collect(Collectors.toList());
+ if (instants.isEmpty()) {
+ return;
+ }
+ this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor
+ .extractPerPartitionWatermarks(queryContext.getActiveTimeline(),
instants);
+ if (!this.upstreamEventTimeWatermarks.isEmpty()) {
+ log.info("Propagating upstream event-time watermarks for {} partition(s)
from {} commit(s)",
Review Comment:
🤖 nit: both `maybeExtractUpstreamWatermarksCompletionTime` and
`maybeExtractUpstreamWatermarksRequestTime` emit the identical info message
here and at line 525, so in production logs there's no way to tell which code
path fired. The warn at line 530 already sets a good example with
"(request-time path)" — could you add a similar discriminator to both info
logs, e.g. `"...from {} commit(s) (completion-time path)"`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]