[
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit updated HUDI-2800:
------------------------------
Status: Resolved (was: Patch Available)
> Clustering execution triggers HoodieCreateHandle twice during
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
> Key: HUDI-2800
> URL: https://issues.apache.org/jira/browse/HUDI-2800
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Assignee: Sagar Sumit
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.10.0
>
> Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that
> HoodieCreateHandle is triggered twice for isEmpty call for
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to
> actual replace commit metadata.
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after
> applying changes to metadata table, jvm is crashed.
> use-case:
> deltastreamer continuous mode with async clustering enabled. at the end of
> first clustering, we crash the jvm.
> and we bring up the deltastreamer again. this will trigger rollback of
> pending clustering. and then gets retried.
> again, at the end of clustering, we crash it again.
> Here, marker files show that there are 3 files in total, where as commit
> metadata shows only 2 files. incidentally, two files have the same fileId,
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest.
>
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
> before updating metadata table ::
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 566209
> {code}
>
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet
> {code}
>
> I inspected the spark UI and could not spot any failures. And then triaged
> the issue to
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
> We have a collect() and isEmpty(). collect() triggers the actual execution of
> all writes. But a following isEmpty() triggers just one spark task which
> again creates a file w/ same fileId but with different write token. Confirmed
> this from logs.
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>
> code path of interest in
> code path of interest in SparkExecuteClusteringCommitActionExecutor.
>
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
> ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
> new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
> HoodieWriteConfig.class}, table, context, config))
> .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
> writeMetadata) {
> if (writeMetadata.getWriteStatuses().isEmpty()) {
> throw new HoodieClusteringException("Clustering plan produced 0
> WriteStatus for " + instantTime
> + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
> at least "
> +
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
> + " write statuses");
> }
> } {code}
>
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
> invoked as part of validateWriteResult, HoodieCreateHandle is triggered
> again which results in a new file (same fileId, but diff write token).
>
> //excerpts from logs to show the different stageIds with which create handle
> is called.
>
> {code:java}
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info.
> StageId 40, Partition
> Id0, Attempt ID 79
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-0
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-40-7
> 9_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 DEBUG TaskMemoryManager: Task 79 release 0.0 B from
> org.apache.spark.util.c
> ollection.ExternalSorter@52fd6797
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info.
> StageId 46, Partition
> Id0, Attempt ID 83
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-46-8
> 3_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-021/11/19
> 08:35:45 DEBUG TaskMemoryManager: Task 83 release 0.0 B from
> org.apache.spark.util.collection.ExternalSorter@149ed8ce
> {code}
>
> first creation refers to stage 40 and 2nd creation refers to stage 46. // Do
> not coorrelate with above screenshots. they were for different runs for which
> I did not collect this stage and attemptId info.
>
> This is strange as to why this does not happen for first time when clustering
> is triggered and happens only with retry of failed clustering. But confirmed
> that if we remove the validate call, things are in good shape. marker files
> and replace commit metadata are intact.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)