yihua commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1797780051


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -90,24 +78,25 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException("Incremental queries are not supported when meta 
fields are disabled")
   }
 
+  private val queryContext: IncrementalQueryAnalyzer.QueryContext =
+    IncrementalQueryAnalyzer.builder()
+      .metaClient(metaClient)
+      .startTime(optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))
+      .endTime(optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
null))
+      .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+      .limit(optParams.getOrElse(
+        DataSourceReadOptions.INCREMENTAL_LIMIT.key,
+        DataSourceReadOptions.INCREMENTAL_LIMIT.defaultValue).toInt)
+      .build()
+      .analyze()
+
+  private val commitsToReturn = List.concat(

Review Comment:
   Are these commits sorted by completion time?  There are logic in deriving 
the schema that relies on the ordering to get the latest table schema.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -90,24 +78,25 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException("Incremental queries are not supported when meta 
fields are disabled")
   }
 
+  private val queryContext: IncrementalQueryAnalyzer.QueryContext =
+    IncrementalQueryAnalyzer.builder()
+      .metaClient(metaClient)
+      .startTime(optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))
+      .endTime(optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
null))

Review Comment:
   So for incremental query in Spark, these begin and end instant time 
represent completion time.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -220,15 +209,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
       val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
 
-      val startInstantTime = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+      val startInstantTime = queryContext.getStartInstant.get()
       val startInstantArchived = 
commitTimeline.isBeforeTimelineStarts(startInstantTime)
-      val endInstantTime = 
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getTimestamp)
+      val endInstantTime = queryContext.getEndInstant.get()

Review Comment:
   `startInstantArchived` and `endInstantArchived` can be avoided as 
`queryContext` already indicates whether there are instants in the archived 
timeline to pull.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -90,24 +78,25 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException("Incremental queries are not supported when meta 
fields are disabled")
   }
 
+  private val queryContext: IncrementalQueryAnalyzer.QueryContext =
+    IncrementalQueryAnalyzer.builder()
+      .metaClient(metaClient)
+      .startTime(optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))
+      .endTime(optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
null))
+      .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+      .limit(optParams.getOrElse(
+        DataSourceReadOptions.INCREMENTAL_LIMIT.key,
+        DataSourceReadOptions.INCREMENTAL_LIMIT.defaultValue).toInt)
+      .build()
+      .analyze()
+
+  private val commitsToReturn = List.concat(
+    queryContext.getArchivedInstants.asScala,
+    queryContext.getActiveInstants.asScala)
+
   private val useEndInstantSchema = 
optParams.getOrElse(INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key,
     INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.defaultValue).toBoolean
 
-  private val lastInstant = commitTimeline.lastInstant().get()
-
-  private val commitsTimelineToReturn = {
-    if (hollowCommitHandling == USE_TRANSITION_TIME) {
-      commitTimeline.findInstantsInRangeByCompletionTime(
-        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
-        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getCompletionTime))
-    } else {
-      commitTimeline.findInstantsInRange(
-        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
-        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getTimestamp))
-    }
-  }
-  private val commitsToReturn = 
commitsTimelineToReturn.getInstantsAsStream.iterator().asScala.toList

Review Comment:
   Let's file a Jira to track how to differentiate the completion time for 0.x 
vs 1.0 tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to