[
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2800:
--------------------------------------
Description:
desc: when a failed clustering is retried again, we see that HoodieCreateHandle
is triggered twice for isEmpty call for JavaRDD<WriteStatus>. And so markers
show one file extra when compared to actual replace commit metadata.
Scenario:
setup : induced intentional failure in clustering code path. i.e. after
applying changes to metadata table, jvm is crashed.
use-case:
deltastreamer continuous mode with async clustering enabled. at the end of
first clustering, we crash the jvm.
and we bring up the deltastreamer again. this will trigger rollback of pending
clustering. and then gets retried.
again, at the end of clustering, we crash it again.
Here, marker files show that there are 3 files in total, where as commit
metadata shows only 2 files. incidentally, two files have the same fileId, but
differs only with write token and one among them is the missing file.
//excerpts from logs which shows the creation of two files of interest.
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
before updating metadata table ::
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size
566209
{code}
// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
I inspected the spark UI and could not spot any failures. And then triaged the
issue to
isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
We have a collect() and isEmpty(). collect() triggers the actual execution of
all writes. But a following isEmpty() triggers just one spark task which again
creates a file w/ same fileId but with different write token. Confirmed this
from logs.
!Screen Shot 2021-11-19 at 10.08.48 AM.png!
code path of interest in
code path of interest in SparkExecuteClusteringCommitActionExecutor.
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus
for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at
least "
+
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
} {code}
According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
invoked as part of validateWriteResult, HoodieCreateHandle is triggered again
which results in a new file (same fileId, but diff write token).
This is strange as to why this does not happen for first time when clustering
is triggered and happens only with retry of failed clustering. But confirmed
that if we remove the validate call, things are in good shape. marker files and
replace commit metadata are intact.
was:
Scenario:
setup : induced intentional failure in clustering code path. i.e. after
applying changes to metadata table, jvm is crashed.
use-case:
deltastreamer continuous mode with async clustering enabled. at the end of
first clustering, we crash the jvm.
and we bring up the deltastreamer again. this will trigger rollback of pending
clustering. and then gets retried.
again, at the end of clustering, we crash it again.
Here, marker files show that there are 3 files in total, where as commit
metadata shows only 2 files. incidentally, two files have the same fileId, but
differs only with write token and one among them is the missing file.
//excerpts from logs which shows the creation of two files of interest.
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
before updating metadata table ::
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size
566209
{code}
// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
I inspected the spark UI and could not spot any failures. And then triaged the
issue to
isEmpty() at SparkExecuteClusteringCommitActionExecutorL103.
We have a collect() and isEmpty(). collect() triggers the actual execution of
all writes. But a following isEmpty() triggers just one spark task which again
creates a file w/ same fileId but with different write token. Confirmed this
from logs.
!Screen Shot 2021-11-19 at 10.08.48 AM.png!
code path of interest in
code path of interest in SparkExecuteClusteringCommitActionExecutor.
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus
for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at
least "
+
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
} {code}
According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
invoked as part of validateWriteResult, HoodieCreateHandle is triggered again
which results in a new file (same fileId, but diff write token).
Still, could not answer as to why this does not happen for first time when
clustering is triggered and happens only with retry of failed clustering. But
confirmed that if we remove the validate call, things are in good shape. marker
files and replace commit metadata are intact.
> Clustering execution triggers HoodieCreateHandle twice during
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
> Key: HUDI-2800
> URL: https://issues.apache.org/jira/browse/HUDI-2800
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Priority: Major
> Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that
> HoodieCreateHandle is triggered twice for isEmpty call for
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to
> actual replace commit metadata.
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after
> applying changes to metadata table, jvm is crashed.
> use-case:
> deltastreamer continuous mode with async clustering enabled. at the end of
> first clustering, we crash the jvm.
> and we bring up the deltastreamer again. this will trigger rollback of
> pending clustering. and then gets retried.
> again, at the end of clustering, we crash it again.
> Here, marker files show that there are 3 files in total, where as commit
> metadata shows only 2 files. incidentally, two files have the same fileId,
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest.
>
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
> before updating metadata table ::
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 566209
> {code}
>
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet
> {code}
>
> I inspected the spark UI and could not spot any failures. And then triaged
> the issue to
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
> We have a collect() and isEmpty(). collect() triggers the actual execution of
> all writes. But a following isEmpty() triggers just one spark task which
> again creates a file w/ same fileId but with different write token. Confirmed
> this from logs.
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>
> code path of interest in
> code path of interest in SparkExecuteClusteringCommitActionExecutor.
>
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
> ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
> new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
> HoodieWriteConfig.class}, table, context, config))
> .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
> writeMetadata) {
> if (writeMetadata.getWriteStatuses().isEmpty()) {
> throw new HoodieClusteringException("Clustering plan produced 0
> WriteStatus for " + instantTime
> + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
> at least "
> +
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
> + " write statuses");
> }
> } {code}
>
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
> invoked as part of validateWriteResult, HoodieCreateHandle is triggered
> again which results in a new file (same fileId, but diff write token).
>
> This is strange as to why this does not happen for first time when clustering
> is triggered and happens only with retry of failed clustering. But confirmed
> that if we remove the validate call, things are in good shape. marker files
> and replace commit metadata are intact.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)