[ 
https://issues.apache.org/jira/browse/HUDI-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404765#comment-17404765
 ] 

ASF GitHub Bot commented on HUDI-2354:
--------------------------------------

satishkotha commented on a change in pull request #3536:
URL: https://github.com/apache/hudi/pull/3536#discussion_r696171192



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -212,7 +212,15 @@ private void resetFileGroupsReplaced(HoodieTimeline 
timeline) {
     hoodieTimer.startTimer();
     // for each REPLACE instant, get map of (partitionPath -> deleteFileGroup)
     HoodieTimeline replacedTimeline = timeline.getCompletedReplaceTimeline();
-    Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> resultStream = 
replacedTimeline.getInstants().flatMap(instant -> {
+    Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> resultStream = 
replacedTimeline.getInstants().filter(instant -> {
+      try {
+        // Replace instant could be deleted by archive in timeline
+        // So that we need to check if the replace commit files were existed.
+        return metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
instant.getFileName()));

Review comment:
       there is still a race condition right?
   1) we check file exists
   2) archival deltes file
   3) filesystem view tries to read the file and throws error.
   
   instead of this maybe we can surround  below line with try { 
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
               HoodieReplaceCommitMetadata.class);
   } catch (FileNotFoundException e) {
   // ignore because replacecommit may have been deleted by archival?
   }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> archive delete replacecommit, but stop timeline server meet file not found
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2354
>                 URL: https://issues.apache.org/jira/browse/HUDI-2354
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: liwei
>            Assignee: liwei
>            Priority: Major
>              Labels: pull-request-available
>
> 1、in spark writeclient postcommit will archive replacecommit which meet the 
> archive Requirement
> 21/08/23 14:57:12 INFO HoodieTimelineArchiveLog: Archived and deleted instant 
> file .hoodie/20210823114552.commit
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant 
> file .hoodie/20210823114553.replacecommit.requested
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant 
> file .hoodie/20210823114553.replacecommit.inflight
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant 
> file .hoodie/20210823114553.replacecommit
>  
> 2、if you start timelineservice, after sparksqlwrite post commit  it will stop 
> .  In HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106) need 
> to read replace instant metadata ,  but the replace instant file is delete , 
> but the timeline not update
>  
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.hudi.exception.HoodieIOException: Could not read commit 
> details from .hoodie/20210823114553.replacecommit
> at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:555)
> at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:219)
> at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$resetFileGroupsReplaced$8(AbstractTableFileSystemView.java:217)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:228)
> at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:106)
> at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106)
> at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.reset(AbstractTableFileSystemView.java:248)
> at 
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.close(HoodieTableFileSystemView.java:353)
> at 
> java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
> at 
> org.apache.hudi.common.table.view.FileSystemViewManager.close(FileSystemViewManager.java:118)
> at 
> org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:207)
> at 
> org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:121)
> at 
> org.apache.hudi.client.AbstractHoodieClient.stopEmbeddedServerView(AbstractHoodieClient.java:94)
> at 
> org.apache.hudi.client.AbstractHoodieClient.close(AbstractHoodieClient.java:86)
> at 
> org.apache.hudi.client.AbstractHoodieWriteClient.close(AbstractHoodieWriteClient.java:1094)
> at 
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:509)
> at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:226)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
> at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at 
> com.alibaba.cloud.dla.lakehouse.spark.sink.SingleTableStreamingSinkHudiFormat.lambda$getDatasetProcessFunction$8867d9e3$1(SingleTableStreamingSinkHudiFormat.java:134)
> at 
> org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
> at 
> org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
> at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
> ... 1 more
> Caused by: 
> java.io.FileNotFoundException:.hoodie/20210823114553.replacecommit: No such 
> file or directory!
> at 
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:281)
> at 
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.open(AliyunOSSFileSystem.java:587)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
> at 
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:459)
> at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:552)
> ... 67 more
> org.apache.spark.sql.streaming.StreamingQueryException: Could not read commit 
> details from 
> oss://zhongze-test2/zhongze-lakehouse/zhongze_test_schema91/zhongze_clustering_test2/.hoodie/20210823114553.replacecommit
> === Streaming Query ===
> Identifier: [id = d027f673-4905-4ef3-b8b3-7d1714a2217e, runId = 
> 9aaf6275-bed4-4380-84da-47319b97da8e]
>  
> resolution:
> we need review 
> HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106) code . how 
> to set the replace data clean with archive



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to