hudi-agent commented on code in PR #18802:
URL: https://github.com/apache/hudi/pull/18802#discussion_r3278705438


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint> 
fetchNextBatchBasedOnRequestedTim
       metricsOption.ifPresent(metrics -> 
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
       return coalesceOrRepartition(sourceWithMetaColumnsDropped, 
sourceProfile.getSourcePartitions());
     }).orElse(sourceWithMetaColumnsDropped);
+
+    maybeExtractUpstreamWatermarksRequestTime(srcPath, 
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
     return Pair.of(Option.of(src), new 
StreamerCheckpointV1(queryInfo.getEndInstant()));
   }
 
   // Try to fetch the latestSourceProfile, this ensures the profile is 
refreshed if it's no longer valid.
   private Option<SourceProfile<Integer>> getLatestSourceProfile() {
     return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
   }
+
+  private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext 
queryContext, String endCompletionTime) {
+    if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+      return;
+    }
+    if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+      return;
+    }
+    List<HoodieInstant> instants = queryContext.getInstants().stream()
+        .filter(i -> compareTimestamps(i.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, endCompletionTime))
+        .collect(Collectors.toList());
+    if (instants.isEmpty()) {
+      return;
+    }
+    this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor
+        .extractPerPartitionWatermarks(queryContext.getActiveTimeline(), 
instants);
+    if (!this.upstreamEventTimeWatermarks.isEmpty()) {
+      log.info("Propagating upstream event-time watermarks for {} partition(s) 
from {} commit(s)",
+          this.upstreamEventTimeWatermarks.size(), instants.size());
+    }
+  }
+
+  private void maybeExtractUpstreamWatermarksRequestTime(String srcPath, 
String startInstant, String endInstant) {
+    if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+      return;
+    }
+    if (StringUtils.isNullOrEmpty(endInstant)) {
+      return;
+    }
+    String startExclusive = StringUtils.isNullOrEmpty(startInstant) ? "" : 
startInstant;
+    try {
+      HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder()
+          
.setConf(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration()))
+          .setBasePath(srcPath)
+          .setLoadActiveTimelineOnLoad(true)

Review Comment:
   🤖 `getCommitsTimeline()` only includes `COMMIT` + `DELTA_COMMIT` and 
excludes `REPLACE_COMMIT` (clustering / insert_overwrite). If the upstream 
table had clustering instants in this range that carried event-time stats, the 
request-time path will silently skip them — while the completion-time path 
above can include them via `queryContext.getInstants()`. Should this use 
`getCommitsAndCompactionTimeline()` (or `getCommitAndReplaceTimeline()`) to 
keep the two paths symmetric?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint> 
fetchNextBatchBasedOnRequestedTim
       metricsOption.ifPresent(metrics -> 
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
       return coalesceOrRepartition(sourceWithMetaColumnsDropped, 
sourceProfile.getSourcePartitions());
     }).orElse(sourceWithMetaColumnsDropped);
+
+    maybeExtractUpstreamWatermarksRequestTime(srcPath, 
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
     return Pair.of(Option.of(src), new 
StreamerCheckpointV1(queryInfo.getEndInstant()));
   }
 
   // Try to fetch the latestSourceProfile, this ensures the profile is 
refreshed if it's no longer valid.
   private Option<SourceProfile<Integer>> getLatestSourceProfile() {
     return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
   }
+
+  private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext 
queryContext, String endCompletionTime) {
+    if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+      return;
+    }
+    if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+      return;

Review Comment:
   🤖 Is there a reason the completion-time path doesn't wrap the extraction in 
a try/catch the way the request-time path below does? Today any unexpected 
exception here (e.g. from `compareTimestamps` or `getActiveTimeline()`) will 
fail the source, while the same failure on the request-time path would only log 
a warning. Worth keeping the best-effort posture symmetric across the two paths.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -439,11 +466,69 @@ private Pair<Option<Dataset<Row>>, Checkpoint> 
fetchNextBatchBasedOnRequestedTim
       metricsOption.ifPresent(metrics -> 
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
       return coalesceOrRepartition(sourceWithMetaColumnsDropped, 
sourceProfile.getSourcePartitions());
     }).orElse(sourceWithMetaColumnsDropped);
+
+    maybeExtractUpstreamWatermarksRequestTime(srcPath, 
queryInfo.getStartInstant(), queryInfo.getEndInstant());
+
     return Pair.of(Option.of(src), new 
StreamerCheckpointV1(queryInfo.getEndInstant()));
   }
 
   // Try to fetch the latestSourceProfile, this ensures the profile is 
refreshed if it's no longer valid.
   private Option<SourceProfile<Integer>> getLatestSourceProfile() {
     return sourceProfileSupplier.map(SourceProfileSupplier::getSourceProfile);
   }
+
+  private void maybeExtractUpstreamWatermarksCompletionTime(QueryContext 
queryContext, String endCompletionTime) {
+    if (!ConfigUtils.isPropagatingEventTimeFromUpstream(props)) {
+      return;
+    }
+    if (StringUtils.isNullOrEmpty(endCompletionTime)) {
+      return;
+    }
+    List<HoodieInstant> instants = queryContext.getInstants().stream()
+        .filter(i -> compareTimestamps(i.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, endCompletionTime))
+        .collect(Collectors.toList());
+    if (instants.isEmpty()) {
+      return;
+    }
+    this.upstreamEventTimeWatermarks = UpstreamEventTimeWatermarkExtractor
+        .extractPerPartitionWatermarks(queryContext.getActiveTimeline(), 
instants);
+    if (!this.upstreamEventTimeWatermarks.isEmpty()) {
+      log.info("Propagating upstream event-time watermarks for {} partition(s) 
from {} commit(s)",

Review Comment:
   🤖 nit: both `maybeExtractUpstreamWatermarksCompletionTime` and 
`maybeExtractUpstreamWatermarksRequestTime` emit the identical info message 
here and at line 525, so in production logs there's no way to tell which code 
path fired. The warn at line 530 already sets a good example with 
"(request-time path)" — could you add a similar discriminator to both info 
logs, e.g. `"...from {} commit(s) (completion-time path)"`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to