[
https://issues.apache.org/jira/browse/HUDI-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit closed HUDI-8602.
-----------------------------
Resolution: Fixed
> Incremental query failed if the start completion time is in the history of
> the timeline
> ---------------------------------------------------------------------------------------
>
> Key: HUDI-8602
> URL: https://issues.apache.org/jira/browse/HUDI-8602
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Y Ethan Guo
> Assignee: Lin Liu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.0.0
>
>
> Start completion time is in the history of the timeline. The incremental
> query (including CDC mode) failed in this case.
> {code:java}
> val actual =
> spark.read.format("hudi").option("hoodie.datasource.query.type",
> "incremental").option("hoodie.datasource.read.begin.instanttime",
> "20241118011455593").option("hoodie.datasource.read.end.instanttime",
> "20241118014520931").load(basePath).select("key", "partition", "ts",
> "textField", "decimalField", "longField", "round")
> actual.count
> org.apache.hudi.exception.HoodieIOException: Could not read commit details
> from
> file:/Users/ethan/Work/tmp/hudi-1.0.0-testing/2024-11-17-23-52-06-streamer-mor-upsert-inline-no-compaction-2/test_table/.hoodie/timeline/20241118011307959_20241118011455593.deltacommit
> at
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.readDataFromPath(ActiveTimelineV2.java:755)
> at
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.getInstantDetails(ActiveTimelineV2.java:264)
> at
> org.apache.hudi.common.table.timeline.BaseHoodieTimeline.getInstantDetails(BaseHoodieTimeline.java:559)
> at
> org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata(TimelineUtils.java:336)
> at
> org.apache.hudi.HoodieIncrementalRelationTrait.$anonfun$commitsMetadata$1(MergeOnReadIncrementalRelation.scala:212)
> at scala.collection.immutable.List.map(List.scala:293)
> at
> org.apache.hudi.HoodieIncrementalRelationTrait.commitsMetadata(MergeOnReadIncrementalRelation.scala:212)
> at
> org.apache.hudi.HoodieIncrementalRelationTrait.commitsMetadata$(MergeOnReadIncrementalRelation.scala:212)
> at
> org.apache.hudi.MergeOnReadIncrementalRelation.commitsMetadata$lzycompute(MergeOnReadIncrementalRelation.scala:48)
> at
> org.apache.hudi.MergeOnReadIncrementalRelation.commitsMetadata(MergeOnReadIncrementalRelation.scala:48)
> at
> org.apache.hudi.HoodieIncrementalRelationTrait.affectedFilesInCommits(MergeOnReadIncrementalRelation.scala:215)
> at
> org.apache.hudi.HoodieIncrementalRelationTrait.affectedFilesInCommits$(MergeOnReadIncrementalRelation.scala:214)
> at
> org.apache.hudi.MergeOnReadIncrementalRelation.affectedFilesInCommits$lzycompute(MergeOnReadIncrementalRelation.scala:48)
> at
> org.apache.hudi.MergeOnReadIncrementalRelation.affectedFilesInCommits(MergeOnReadIncrementalRelation.scala:48)
> at
> org.apache.hudi.MergeOnReadIncrementalRelation.listFileSplits(MergeOnReadIncrementalRelation.scala:130)
> at
> org.apache.hudi.HoodieIncrementalFileIndex.listFiles(HoodieIncrementalFileIndex.scala:53)
> at
> org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:256)
> at
> org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506)
> at
> org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286)
> at
> org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:566)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
> at
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
> at
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
> at
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:242)
> at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
> at
> org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.inputRDDs(AggregateCodegenSupport.scala:89)
> at
> org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.inputRDDs$(AggregateCodegenSupport.scala:88)
> at
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:49)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:141)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:141)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:146)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:145)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:73)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:73)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:72)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:120)
> at
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:187)
> at
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:187)
> at
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:189)
> at
> org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:61)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:302)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:300)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:300)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
> at
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
> at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3616)
> at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3615)
> at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
> at
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:3615)
> ... 51 elided
> Caused by: java.io.FileNotFoundException: File
> file:/Users/ethan/Work/tmp/hudi-1.0.0-testing/2024-11-17-23-52-06-streamer-mor-upsert-inline-no-compaction-2/test_table/.hoodie/timeline/20241118011307959_20241118011455593.deltacommit
> does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
> at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
> at
> org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:481)
> at
> org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:149)
> at
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.readDataFromPath(ActiveTimelineV2.java:752)
> ... 137 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)