yihua commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1797777212


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -308,6 +309,38 @@ public static HoodieTimeline getCommitsTimelineAfter(
     return timelineSinceLastSync;
   }
 
+  public static HoodieTimeline getCommitsTimeLineAfterByCompletionTimeRange(
+      HoodieTableMetaClient metaClient,
+      String exclusiveStartCompletionTime,
+      String inclusiveEndCompletionTime) {
+    HoodieDefaultTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline();
+
+    HoodieDefaultTimeline timeline;
+    if 
(writeTimeline.isBeforeTimelineStartsByCompletionTime(exclusiveStartCompletionTime))
 {
+      // need to load archived timeline as well
+      try (CompletionTimeQueryView view = new 
CompletionTimeQueryView(metaClient)) {
+        List<String> instants = view.getStartTimes(
+            exclusiveStartCompletionTime,
+            inclusiveEndCompletionTime,
+            RangeType.OPEN_CLOSED);
+        if (instants.isEmpty()) {
+          LOG.warn("Did not find any instant when getting timeline, using 
write timeline for incremental read");
+          timeline = writeTimeline;
+        } else {
+          timeline = metaClient.getArchivedTimeline(instants.get(0))
+              .mergeTimeline(writeTimeline)
+              .mergeTimeline(writeTimeline);

Review Comment:
   If this is still needed, only need to do one merge here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -82,7 +82,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            @transient fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
                            includeStartTime: Boolean = false,
                            startTimestamp: String = null,
-                           endTimestamp: String = null)
+                           endTimestamp: String = null,
+                           includedTimestamps: Set[String] = null)

Review Comment:
   Are all these timestamps still representing instant times, not completion 
time?  The scaladocs should also be updated.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -681,6 +681,10 @@ public List<HoodieRecord> generateInserts(String 
instantTime, Integer n) {
     return generateInserts(instantTime, n, false);
   }
 
+  public List<HoodieRecord> generateInserts(Long instantTime, Integer n) {
+    return generateInserts(String.valueOf(instantTime), n, false);
+  }

Review Comment:
   Could caller just use String type instead of adding redundant test methods 
here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -125,6 +125,15 @@ object DataSourceReadOptions {
       + HollowCommitHandling.USE_TRANSITION_TIME + "`, will use instant's "
       + "`stateTransitionTime` to perform comparison.")
 
+  /* TODO: this is a replacement of 
HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH
+      to make incremental instant limit available in spark incremental 
relations
+      Need to link HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH to this 
correctly and add doc*/
+  val INCREMENTAL_LIMIT: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.streamer.source.hoodieincr.num_instants")

Review Comment:
   Spark should not be aware of this config.  We can pass down the correct 
instant range without using the `num_instants` config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to