[
https://issues.apache.org/jira/browse/HUDI-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405578#comment-17405578
]
ASF GitHub Bot commented on HUDI-2354:
--------------------------------------
zhangyue19921010 commented on a change in pull request #3536:
URL: https://github.com/apache/hudi/pull/3536#discussion_r697141153
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
##########
@@ -141,6 +141,45 @@ public void
testViewForFileSlicesWithNoBaseFileNonPartitioned() throws Exception
testViewForFileSlicesWithNoBaseFile(1, 0, "");
}
+ @Test
+ public void testCloseHoodieTableFileSystemView() throws Exception {
+ String instantTime1 = "1";
+ String instantTime2 = "2";
+ String clusteringInstantTime3 = "3";
+ String clusteringInstantTime4 = "4";
+
+ // prepare metadata
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
+ List<String> replacedFileIds = new ArrayList<>();
+ replacedFileIds.add("fake_file_id");
+ partitionToReplaceFileIds.put("fake_partition_path", replacedFileIds);
+
+ // prepare Instants
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+ HoodieInstant instant2 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime2);
+ HoodieInstant clusteringInstant3 = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime3);
+ HoodieInstant clusteringInstant4 = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime4);
+ HoodieCommitMetadata commitMetadata =
+ CommitUtils.buildMetadata(Collections.emptyList(),
partitionToReplaceFileIds, Option.empty(), WriteOperationType.CLUSTER, "",
HoodieTimeline.REPLACE_COMMIT_ACTION);
+
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ saveAsComplete(commitTimeline, instant2, Option.empty());
+ saveAsComplete(commitTimeline, clusteringInstant3, Option.empty());
+ saveAsComplete(commitTimeline, clusteringInstant4,
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+ refreshFsView();
+
+ // Now create a scenario where archiving deleted replace commits
(requested,inflight and replacecommit)
+ boolean deleteReplaceCommit = new File(this.basePath + "/.hoodie/" +
clusteringInstantTime3 + ".replacecommit").delete();
+ boolean deleteReplaceCommitRequested = new File(this.basePath +
"/.hoodie/" + clusteringInstantTime3 + ".replacecommit.requested").delete();
+ boolean deleteReplaceCommitInflight = new File(this.basePath + "/.hoodie/"
+ clusteringInstantTime3 + ".replacecommit.inflight").delete();
+
+ // confirm deleted
+ assertTrue(deleteReplaceCommit && deleteReplaceCommitInflight &&
deleteReplaceCommitRequested);
+ fsView.close();
Review comment:
Okay, changed. Thanks for your review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> archive delete replacecommit, but stop timeline server meet file not found
> --------------------------------------------------------------------------
>
> Key: HUDI-2354
> URL: https://issues.apache.org/jira/browse/HUDI-2354
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: liwei
> Assignee: liwei
> Priority: Major
> Labels: pull-request-available
>
> 1、in spark writeclient postcommit will archive replacecommit which meet the
> archive Requirement
> 21/08/23 14:57:12 INFO HoodieTimelineArchiveLog: Archived and deleted instant
> file .hoodie/20210823114552.commit
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant
> file .hoodie/20210823114553.replacecommit.requested
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant
> file .hoodie/20210823114553.replacecommit.inflight
> 21/08/23 14:57:13 INFO HoodieTimelineArchiveLog: Archived and deleted instant
> file .hoodie/20210823114553.replacecommit
>
> 2、if you start timelineservice, after sparksqlwrite post commit it will stop
> . In HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106) need
> to read replace instant metadata , but the replace instant file is delete ,
> but the timeline not update
>
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.hudi.exception.HoodieIOException: Could not read commit
> details from .hoodie/20210823114553.replacecommit
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:555)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:219)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$resetFileGroupsReplaced$8(AbstractTableFileSystemView.java:217)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:228)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:106)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.reset(AbstractTableFileSystemView.java:248)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.close(HoodieTableFileSystemView.java:353)
> at
> java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
> at
> org.apache.hudi.common.table.view.FileSystemViewManager.close(FileSystemViewManager.java:118)
> at
> org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:207)
> at
> org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:121)
> at
> org.apache.hudi.client.AbstractHoodieClient.stopEmbeddedServerView(AbstractHoodieClient.java:94)
> at
> org.apache.hudi.client.AbstractHoodieClient.close(AbstractHoodieClient.java:86)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.close(AbstractHoodieWriteClient.java:1094)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:509)
> at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:226)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at
> com.alibaba.cloud.dla.lakehouse.spark.sink.SingleTableStreamingSinkHudiFormat.lambda$getDatasetProcessFunction$8867d9e3$1(SingleTableStreamingSinkHudiFormat.java:134)
> at
> org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
> at
> org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
> ... 1 more
> Caused by:
> java.io.FileNotFoundException:.hoodie/20210823114553.replacecommit: No such
> file or directory!
> at
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:281)
> at
> org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.open(AliyunOSSFileSystem.java:587)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:459)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:552)
> ... 67 more
> org.apache.spark.sql.streaming.StreamingQueryException: Could not read commit
> details from
> oss://zhongze-test2/zhongze-lakehouse/zhongze_test_schema91/zhongze_clustering_test2/.hoodie/20210823114553.replacecommit
> === Streaming Query ===
> Identifier: [id = d027f673-4905-4ef3-b8b3-7d1714a2217e, runId =
> 9aaf6275-bed4-4380-84da-47319b97da8e]
>
> resolution:
> we need review
> HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106) code . how
> to set the replace data clean with archive
--
This message was sent by Atlassian Jira
(v8.3.4#803005)