[ 
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2800:
--------------------------------------
    Description: 
desc: when a failed clustering is retried again, we see that HoodieCreateHandle 
is triggered twice for isEmpty call for JavaRDD<WriteStatus>. And so markers 
show one file extra when compared to actual replace commit metadata. 


Scenario:

setup : induced intentional failure in clustering code path. i.e. after 
applying changes to metadata table, jvm is crashed. 

use-case: 

deltastreamer continuous mode with async clustering enabled. at the end of 
first clustering, we crash the jvm. 

and we bring up the deltastreamer again. this will trigger rollback of pending 
clustering. and then gets retried. 

again, at the end of clustering, we crash it again. 

Here, marker files show that there are 3 files in total, where as commit 
metadata shows only 2 files. incidentally, two files have the same fileId, but 
differs only with write token and one among them is the missing file.

//excerpts from logs which shows the creation of two files of interest. 

 
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just 
before updating metadata table :: 
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
 size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
 size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil:  MDT. applying 
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size 
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size 
566209
{code}
 

// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r--  1 nsb  wheel  566209 Nov 19 07:02 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
 

I inspected the spark UI and could not spot any failures. And then triaged the 
issue to 

isEmpty() at SparkExecuteClusteringCommitActionExecutor L103. 

We have a collect() and isEmpty(). collect() triggers the actual execution of 
all writes. But a following isEmpty() triggers just one spark task which again 
creates a file w/ same fileId but with different write token. Confirmed this 
from logs. 

!Screen Shot 2021-11-19 at 10.08.48 AM.png!

 

code path of interest in 

code path of interest in SparkExecuteClusteringCommitActionExecutor. 

 
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
    ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
        new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
    .performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
 
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
  if (writeMetadata.getWriteStatuses().isEmpty()) {
    throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus 
for " + instantTime
        + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at 
least "
        + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
        + " write statuses");
  }
} {code}
 

According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is 
invoked as part of  validateWriteResult, HoodieCreateHandle is triggered again 
which results in a new file (same fileId, but diff write token). 

 

This is strange as to why this does not happen for first time when clustering 
is triggered and happens only with retry of failed clustering. But confirmed 
that if we remove the validate call, things are in good shape. marker files and 
replace commit metadata are intact. 

 

 

  was:
Scenario:

setup : induced intentional failure in clustering code path. i.e. after 
applying changes to metadata table, jvm is crashed. 

 

use-case: 

deltastreamer continuous mode with async clustering enabled. at the end of 
first clustering, we crash the jvm. 

and we bring up the deltastreamer again. this will trigger rollback of pending 
clustering. and then gets retried. 

again, at the end of clustering, we crash it again. 

Here, marker files show that there are 3 files in total, where as commit 
metadata shows only 2 files. incidentally, two files have the same fileId, but 
differs only with write token and one among them is the missing file.

//excerpts from logs which shows the creation of two files of interest. 

 
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just 
before updating metadata table :: 
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
 size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
 size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil:  MDT. applying 
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size 
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size 
566209
{code}
 

 

// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r--  1 nsb  wheel  566209 Nov 19 07:02 
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
 

I inspected the spark UI and could not spot any failures. And then triaged the 
issue to 

isEmpty() at SparkExecuteClusteringCommitActionExecutorL103. 

We have a collect() and isEmpty(). collect() triggers the actual execution of 
all writes. But a following isEmpty() triggers just one spark task which again 
creates a file w/ same fileId but with different write token. Confirmed this 
from logs. 

!Screen Shot 2021-11-19 at 10.08.48 AM.png!

 

code path of interest in 

code path of interest in SparkExecuteClusteringCommitActionExecutor. 

 
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
    ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
        new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
    .performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
 

 

 
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
  if (writeMetadata.getWriteStatuses().isEmpty()) {
    throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus 
for " + instantTime
        + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at 
least "
        + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
        + " write statuses");
  }
} {code}
 

 

According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is 
invoked as part of  validateWriteResult, HoodieCreateHandle is triggered again 
which results in a new file (same fileId, but diff write token). 

 

Still, could not answer as to why this does not happen for first time when 
clustering is triggered and happens only with retry of failed clustering. But 
confirmed that if we remove the validate call, things are in good shape. marker 
files and replace commit metadata are intact. 

 

 

 


> Clustering execution triggers HoodieCreateHandle twice during 
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2800
>                 URL: https://issues.apache.org/jira/browse/HUDI-2800
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Priority: Major
>         Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that 
> HoodieCreateHandle is triggered twice for isEmpty call for 
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to 
> actual replace commit metadata. 
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after 
> applying changes to metadata table, jvm is crashed. 
> use-case: 
> deltastreamer continuous mode with async clustering enabled. at the end of 
> first clustering, we crash the jvm. 
> and we bring up the deltastreamer again. this will trigger rollback of 
> pending clustering. and then gets retried. 
> again, at the end of clustering, we crash it again. 
> Here, marker files show that there are 3 files in total, where as commit 
> metadata shows only 2 files. incidentally, two files have the same fileId, 
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest. 
>  
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just 
> before updating metadata table :: 
> 21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
>  size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient:  file path 
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
>  size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil:  MDT. applying 
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, 
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:   for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil:    new file 
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, 
> size 566209
> {code}
>  
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r--  1 nsb  wheel  741905 Nov 19 07:02 
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r--  1 nsb  wheel  566209 Nov 19 07:02 
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet 
> {code}
>  
> I inspected the spark UI and could not spot any failures. And then triaged 
> the issue to 
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103. 
> We have a collect() and isEmpty(). collect() triggers the actual execution of 
> all writes. But a following isEmpty() triggers just one spark task which 
> again creates a file w/ same fileId but with different write token. Confirmed 
> this from logs. 
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>  
> code path of interest in 
> code path of interest in SparkExecuteClusteringCommitActionExecutor. 
>  
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
>     ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
>         new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
> HoodieWriteConfig.class}, table, context, config))
>     .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>  
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
> writeMetadata) {
>   if (writeMetadata.getWriteStatuses().isEmpty()) {
>     throw new HoodieClusteringException("Clustering plan produced 0 
> WriteStatus for " + instantTime
>         + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
> at least "
>         + 
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
>         + " write statuses");
>   }
> } {code}
>  
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is 
> invoked as part of  validateWriteResult, HoodieCreateHandle is triggered 
> again which results in a new file (same fileId, but diff write token). 
>  
> This is strange as to why this does not happen for first time when clustering 
> is triggered and happens only with retry of failed clustering. But confirmed 
> that if we remove the validate call, things are in good shape. marker files 
> and replace commit metadata are intact. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to