[ 
https://issues.apache.org/jira/browse/HUDI-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit closed HUDI-8602.
-----------------------------
    Resolution: Fixed

> Incremental query failed if the start completion time is in the history of 
> the timeline
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-8602
>                 URL: https://issues.apache.org/jira/browse/HUDI-8602
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Y Ethan Guo
>            Assignee: Lin Liu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.0.0
>
>
> Start completion time is in the history of the timeline.  The incremental 
> query (including CDC mode) failed in this case.
> {code:java}
> val actual =  
> spark.read.format("hudi").option("hoodie.datasource.query.type", 
> "incremental").option("hoodie.datasource.read.begin.instanttime", 
> "20241118011455593").option("hoodie.datasource.read.end.instanttime", 
> "20241118014520931").load(basePath).select("key", "partition", "ts", 
> "textField", "decimalField", "longField", "round")
> actual.count
> org.apache.hudi.exception.HoodieIOException: Could not read commit details 
> from 
> file:/Users/ethan/Work/tmp/hudi-1.0.0-testing/2024-11-17-23-52-06-streamer-mor-upsert-inline-no-compaction-2/test_table/.hoodie/timeline/20241118011307959_20241118011455593.deltacommit
>   at 
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.readDataFromPath(ActiveTimelineV2.java:755)
>   at 
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.getInstantDetails(ActiveTimelineV2.java:264)
>   at 
> org.apache.hudi.common.table.timeline.BaseHoodieTimeline.getInstantDetails(BaseHoodieTimeline.java:559)
>   at 
> org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata(TimelineUtils.java:336)
>   at 
> org.apache.hudi.HoodieIncrementalRelationTrait.$anonfun$commitsMetadata$1(MergeOnReadIncrementalRelation.scala:212)
>   at scala.collection.immutable.List.map(List.scala:293)
>   at 
> org.apache.hudi.HoodieIncrementalRelationTrait.commitsMetadata(MergeOnReadIncrementalRelation.scala:212)
>   at 
> org.apache.hudi.HoodieIncrementalRelationTrait.commitsMetadata$(MergeOnReadIncrementalRelation.scala:212)
>   at 
> org.apache.hudi.MergeOnReadIncrementalRelation.commitsMetadata$lzycompute(MergeOnReadIncrementalRelation.scala:48)
>   at 
> org.apache.hudi.MergeOnReadIncrementalRelation.commitsMetadata(MergeOnReadIncrementalRelation.scala:48)
>   at 
> org.apache.hudi.HoodieIncrementalRelationTrait.affectedFilesInCommits(MergeOnReadIncrementalRelation.scala:215)
>   at 
> org.apache.hudi.HoodieIncrementalRelationTrait.affectedFilesInCommits$(MergeOnReadIncrementalRelation.scala:214)
>   at 
> org.apache.hudi.MergeOnReadIncrementalRelation.affectedFilesInCommits$lzycompute(MergeOnReadIncrementalRelation.scala:48)
>   at 
> org.apache.hudi.MergeOnReadIncrementalRelation.affectedFilesInCommits(MergeOnReadIncrementalRelation.scala:48)
>   at 
> org.apache.hudi.MergeOnReadIncrementalRelation.listFileSplits(MergeOnReadIncrementalRelation.scala:130)
>   at 
> org.apache.hudi.HoodieIncrementalFileIndex.listFiles(HoodieIncrementalFileIndex.scala:53)
>   at 
> org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:256)
>   at 
> org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506)
>   at 
> org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286)
>   at 
> org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:566)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
>   at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
>   at 
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:242)
>   at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
>   at 
> org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.inputRDDs(AggregateCodegenSupport.scala:89)
>   at 
> org.apache.spark.sql.execution.aggregate.AggregateCodegenSupport.inputRDDs$(AggregateCodegenSupport.scala:88)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:49)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:141)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:141)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:146)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:145)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:73)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:73)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:72)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:120)
>   at 
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:187)
>   at 
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:187)
>   at 
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:189)
>   at 
> org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:61)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:302)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:300)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:300)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
>   at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3616)
>   at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3615)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
>   at 
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:3615)
>   ... 51 elided
> Caused by: java.io.FileNotFoundException: File 
> file:/Users/ethan/Work/tmp/hudi-1.0.0-testing/2024-11-17-23-52-06-streamer-mor-upsert-inline-no-compaction-2/test_table/.hoodie/timeline/20241118011307959_20241118011455593.deltacommit
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
>   at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
>   at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
>   at 
> org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:481)
>   at 
> org.apache.hudi.storage.hadoop.HoodieHadoopStorage.open(HoodieHadoopStorage.java:149)
>   at 
> org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2.readDataFromPath(ActiveTimelineV2.java:752)
>   ... 137 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to