yihua commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1797785933


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -94,9 +94,10 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
       tableState = tableState,
       mergeType = mergeType,
       fileSplits = fileSplits,
-      includeStartTime = includeStartTime,
+      includeStartTime = false,
       startTimestamp = startTs,
-      endTimestamp = endTs)
+      endTimestamp = endTs,
+      includedTimestamps = includedCommits.map(_.getTimestamp).toSet)

Review Comment:
   Similar here, clearly define what timestamps are passed in here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -185,13 +186,11 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
   protected def startTimestamp: String = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
 
   protected def endTimestamp: String = optParams.getOrElse(
-    DataSourceReadOptions.END_INSTANTTIME.key,
-    if (hollowCommitHandling == USE_TRANSITION_TIME) 
super.timeline.lastInstant().get.getCompletionTime
-    else super.timeline.lastInstant().get.getTimestamp)
+    DataSourceReadOptions.END_INSTANTTIME.key, 
super.timeline.lastInstant().get.getCompletionTime)

Review Comment:
   `lastInstant()` might not be the max completion time on the timeline.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -185,13 +186,11 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
   protected def startTimestamp: String = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
 
   protected def endTimestamp: String = optParams.getOrElse(
-    DataSourceReadOptions.END_INSTANTTIME.key,
-    if (hollowCommitHandling == USE_TRANSITION_TIME) 
super.timeline.lastInstant().get.getCompletionTime
-    else super.timeline.lastInstant().get.getTimestamp)
+    DataSourceReadOptions.END_INSTANTTIME.key, 
super.timeline.lastInstant().get.getCompletionTime)
 
-  protected def startInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStarts(startTimestamp)
+  protected def startInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStartsByCompletionTime(startTimestamp)
 
-  protected def endInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStarts(endTimestamp)
+  protected def endInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStartsByCompletionTime(endTimestamp)

Review Comment:
   Same here.  These are no longer needed if query context splits the instants 
into active and archived.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -94,9 +94,10 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
       tableState = tableState,
       mergeType = mergeType,
       fileSplits = fileSplits,
-      includeStartTime = includeStartTime,
+      includeStartTime = false,

Review Comment:
   If `includeStartTime` is not used, should we get rid of it?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -205,51 +204,46 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
       || affectedFilesInCommits.asScala.exists(fileStatus => 
!metaClient.getStorage.exists(fileStatus.getPath)))
   }
 
-  protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
-    if (!startInstantArchived || !endInstantArchived) {
-      // If endTimestamp commit is not archived, will filter instants
-      // before endTimestamp.
-      if (hollowCommitHandling == USE_TRANSITION_TIME) {
-        super.timeline.findInstantsInRangeByCompletionTime(startTimestamp, 
endTimestamp).getInstants.asScala.toList
-      } else {
-        super.timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants.asScala.toList
-      }
-    } else {
-      super.timeline.getInstants.asScala.toList
-    }
-  }
+  protected lazy val queryContext: IncrementalQueryAnalyzer.QueryContext =
+    IncrementalQueryAnalyzer.builder()
+      .metaClient(metaClient)
+      .startTime(optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))
+      .endTime(optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
null))
+      .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+      .limit(optParams.getOrElse(
+        DataSourceReadOptions.INCREMENTAL_LIMIT.key,
+        DataSourceReadOptions.INCREMENTAL_LIMIT.defaultValue).toInt)
+      .build()
+      .analyze()
+
+  protected lazy val includedCommits: immutable.Seq[HoodieInstant] = 
List.concat(
+    queryContext.getArchivedInstants.asScala,
+    queryContext.getActiveInstants.asScala)
 
   protected lazy val commitsMetadata = 
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
 
   protected lazy val affectedFilesInCommits: java.util.List[StoragePathInfo] = 
{
     listAffectedFilesForCommits(conf, metaClient.getBasePath, commitsMetadata)
   }
 
-  protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
-    (false, startTimestamp)

Review Comment:
   Do you know why `startInstantArchived` determines `includeStartTime`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to