sivabalan narayanan created HUDI-2800:
-----------------------------------------

             Summary: Clustering execution triggers HoodieCreateHandle twice
                 Key: HUDI-2800
                 URL: https://issues.apache.org/jira/browse/HUDI-2800
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: sivabalan narayanan
         Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png

Scenario:

setup : induced intentional failure in clustering code path. i.e. after 
applying changes to metadata table, jvm is crashed. 

 

use-case: 

deltastreamer continuous mode with async clustering enabled. at the end of 
first clustering, we crash the jvm. 

and we bring up the deltastreamer again. this will trigger rollback of pending 
clustering. and then gets retried. 

again, at the end of clustering, we crash it again. 

Here, marker files show that there are 3 files in total, where as commit 
metadata shows only 2 files. incidentally, two files have the same fileId, but 
differs only with write token and one among them is the missing file.

//excerpts from logs which shows the creation of two files of interest. 

 
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just 
before updating metadata table :: 
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
 size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
 size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil:  MDT. applying 
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size 
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size 
566209
{code}
 

 

// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r--  1 nsb  wheel  566209 Nov 19 07:02 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
 

I inspected the spark UI and could not spot any failures. And then triaged the 
issue to 

isEmpty() at SparkExecuteClusteringCommitActionExecutorL103. 

We have a collect() and isEmpty(). collect() triggers the actual execution of 
all writes. But a following isEmpty() triggers just one spark task which again 
creates a file w/ same fileId but with different write token. Confirmed this 
from logs. 

!Screen Shot 2021-11-19 at 10.08.48 AM.png!

 

code path of interest in 

code path of interest in SparkExecuteClusteringCommitActionExecutor. 

 
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
    ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
        new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
    .performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
 

 

 
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
  if (writeMetadata.getWriteStatuses().isEmpty()) {
    throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus 
for " + instantTime
        + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at 
least "
        + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
        + " write statuses");
  }
} {code}
 

 

According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is 
invoked as part of  validateWriteResult, HoodieCreateHandle is triggered again 
which results in a new file (same fileId, but diff write token). 

 

Still, could not answer as to why this does not happen for first time when 
clustering is triggered and happens only with retry of failed clustering. But 
confirmed that if we remove the validate call, things are in good shape. marker 
files and replace commit metadata are intact. 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to