[
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2800:
--------------------------------------
Fix Version/s: 0.10.0
> Clustering execution triggers HoodieCreateHandle twice during
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
> Key: HUDI-2800
> URL: https://issues.apache.org/jira/browse/HUDI-2800
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Priority: Major
> Fix For: 0.10.0
>
> Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that
> HoodieCreateHandle is triggered twice for isEmpty call for
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to
> actual replace commit metadata.
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after
> applying changes to metadata table, jvm is crashed.
> use-case:
> deltastreamer continuous mode with async clustering enabled. at the end of
> first clustering, we crash the jvm.
> and we bring up the deltastreamer again. this will trigger rollback of
> pending clustering. and then gets retried.
> again, at the end of clustering, we crash it again.
> Here, marker files show that there are 3 files in total, where as commit
> metadata shows only 2 files. incidentally, two files have the same fileId,
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest.
>
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
> before updating metadata table ::
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 566209
> {code}
>
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet
> {code}
>
> I inspected the spark UI and could not spot any failures. And then triaged
> the issue to
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
> We have a collect() and isEmpty(). collect() triggers the actual execution of
> all writes. But a following isEmpty() triggers just one spark task which
> again creates a file w/ same fileId but with different write token. Confirmed
> this from logs.
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>
> code path of interest in
> code path of interest in SparkExecuteClusteringCommitActionExecutor.
>
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
> ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
> new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
> HoodieWriteConfig.class}, table, context, config))
> .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
> writeMetadata) {
> if (writeMetadata.getWriteStatuses().isEmpty()) {
> throw new HoodieClusteringException("Clustering plan produced 0
> WriteStatus for " + instantTime
> + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
> at least "
> +
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
> + " write statuses");
> }
> } {code}
>
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
> invoked as part of validateWriteResult, HoodieCreateHandle is triggered
> again which results in a new file (same fileId, but diff write token).
>
> This is strange as to why this does not happen for first time when clustering
> is triggered and happens only with retry of failed clustering. But confirmed
> that if we remove the validate call, things are in good shape. marker files
> and replace commit metadata are intact.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)