CTTY commented on code in PR #11947:
URL: https://github.com/apache/hudi/pull/11947#discussion_r1797956169
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -205,51 +204,46 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
|| affectedFilesInCommits.asScala.exists(fileStatus =>
!metaClient.getStorage.exists(fileStatus.getPath)))
}
- protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
- if (!startInstantArchived || !endInstantArchived) {
- // If endTimestamp commit is not archived, will filter instants
- // before endTimestamp.
- if (hollowCommitHandling == USE_TRANSITION_TIME) {
- super.timeline.findInstantsInRangeByCompletionTime(startTimestamp,
endTimestamp).getInstants.asScala.toList
- } else {
- super.timeline.findInstantsInRange(startTimestamp,
endTimestamp).getInstants.asScala.toList
- }
- } else {
- super.timeline.getInstants.asScala.toList
- }
- }
+ protected lazy val queryContext: IncrementalQueryAnalyzer.QueryContext =
+ IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .startTime(optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))
+ .endTime(optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key,
null))
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+ .limit(optParams.getOrElse(
+ DataSourceReadOptions.INCREMENTAL_LIMIT.key,
+ DataSourceReadOptions.INCREMENTAL_LIMIT.defaultValue).toInt)
+ .build()
+ .analyze()
+
+ protected lazy val includedCommits: immutable.Seq[HoodieInstant] =
List.concat(
+ queryContext.getArchivedInstants.asScala,
+ queryContext.getActiveInstants.asScala)
protected lazy val commitsMetadata =
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
protected lazy val affectedFilesInCommits: java.util.List[StoragePathInfo] =
{
listAffectedFilesForCommits(conf, metaClient.getBasePath, commitsMetadata)
}
- protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
- (false, startTimestamp)
Review Comment:
I think the variable name `startInstantArchived` is not very accurate. It's
actually checking if the `startTimestamp` passed in is archived. So in this
case, if the `startTimestamp` is archived, Hudi will use the first commit in
the `includedCommits` as the range start, which still falls in original time
range, so it should be included
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]