[ 
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit updated HUDI-2800:
------------------------------
    Status: Resolved  (was: Patch Available)

> Clustering execution triggers HoodieCreateHandle twice during 
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2800
>                 URL: https://issues.apache.org/jira/browse/HUDI-2800
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Assignee: Sagar Sumit
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>         Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that 
> HoodieCreateHandle is triggered twice for isEmpty call for 
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to 
> actual replace commit metadata. 
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after 
> applying changes to metadata table, jvm is crashed. 
> use-case: 
> deltastreamer continuous mode with async clustering enabled. at the end of 
> first clustering, we crash the jvm. 
> and we bring up the deltastreamer again. this will trigger rollback of 
> pending clustering. and then gets retried. 
> again, at the end of clustering, we crash it again. 
> Here, marker files show that there are 3 files in total, where as commit 
> metadata shows only 2 files. incidentally, two files have the same fileId, 
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest. 
>  
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just 
> before updating metadata table :: 
> 21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
>  size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
>  size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil:  MDT. applying 
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, 
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, 
> size 566209
> {code}
>  
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r--  1 nsb  wheel  566209 Nov 19 07:02 
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet 
> {code}
>  
> I inspected the spark UI and could not spot any failures. And then triaged 
> the issue to 
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103. 
> We have a collect() and isEmpty(). collect() triggers the actual execution of 
> all writes. But a following isEmpty() triggers just one spark task which 
> again creates a file w/ same fileId but with different write token. Confirmed 
> this from logs. 
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>  
> code path of interest in 
> code path of interest in SparkExecuteClusteringCommitActionExecutor. 
>  
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
>     ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
>         new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
> HoodieWriteConfig.class}, table, context, config))
>     .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>  
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
> writeMetadata) {
>   if (writeMetadata.getWriteStatuses().isEmpty()) {
>     throw new HoodieClusteringException("Clustering plan produced 0 
> WriteStatus for " + instantTime
>         + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
> at least "
>         + 
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
>         + " write statuses");
>   }
> } {code}
>  
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is 
> invoked as part of  validateWriteResult, HoodieCreateHandle is triggered 
> again which results in a new file (same fileId, but diff write token). 
>  
> //excerpts from logs to show the different stageIds with which create handle 
> is called. 
>  
> {code:java}
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info. 
> StageId 40, Partition
> Id0, Attempt ID 79
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition 
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-0
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker 
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-40-7
> 9_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 DEBUG TaskMemoryManager: Task 79 release 0.0 B from 
> org.apache.spark.util.c
> ollection.ExternalSorter@52fd6797
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info. 
> StageId 46, Partition
> Id0, Attempt ID 83
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker 
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-46-8
> 3_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition 
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-021/11/19 
> 08:35:45 DEBUG TaskMemoryManager: Task 83 release 0.0 B from 
> org.apache.spark.util.collection.ExternalSorter@149ed8ce
>  {code}
>  
> first creation refers to stage 40 and 2nd creation refers to stage 46. // Do 
> not coorrelate with above screenshots. they were for different runs for which 
> I did not collect this stage and attemptId info. 
>  
> This is strange as to why this does not happen for first time when clustering 
> is triggered and happens only with retry of failed clustering. But confirmed 
> that if we remove the validate call, things are in good shape. marker files 
> and replace commit metadata are intact. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to