[
https://issues.apache.org/jira/browse/HUDI-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2800:
--------------------------------------
Description:
desc: when a failed clustering is retried again, we see that HoodieCreateHandle
is triggered twice for isEmpty call for JavaRDD<WriteStatus>. And so markers
show one file extra when compared to actual replace commit metadata.
Scenario:
setup : induced intentional failure in clustering code path. i.e. after
applying changes to metadata table, jvm is crashed.
use-case:
deltastreamer continuous mode with async clustering enabled. at the end of
first clustering, we crash the jvm.
and we bring up the deltastreamer again. this will trigger rollback of pending
clustering. and then gets retried.
again, at the end of clustering, we crash it again.
Here, marker files show that there are 3 files in total, where as commit
metadata shows only 2 files. incidentally, two files have the same fileId, but
differs only with write token and one among them is the missing file.
//excerpts from logs which shows the creation of two files of interest.
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
before updating metadata table ::
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size
566209
{code}
// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
I inspected the spark UI and could not spot any failures. And then triaged the
issue to
isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
We have a collect() and isEmpty(). collect() triggers the actual execution of
all writes. But a following isEmpty() triggers just one spark task which again
creates a file w/ same fileId but with different write token. Confirmed this
from logs.
!Screen Shot 2021-11-19 at 10.08.48 AM.png!
code path of interest in
code path of interest in SparkExecuteClusteringCommitActionExecutor.
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus
for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at
least "
+
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
} {code}
According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
invoked as part of validateWriteResult, HoodieCreateHandle is triggered again
which results in a new file (same fileId, but diff write token).
//excerpts from logs to show the different stageIds with which create handle is
called.
{code:java}
21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info. StageId
40, Partition
Id0, Attempt ID 79
21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
:PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-0
21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
Path=file:/tmp/hudi-deltastreamer-
gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-40-7
9_20211119083434372.parquet.marker.CREATE
21/11/19 08:35:45 DEBUG TaskMemoryManager: Task 79 release 0.0 B from
org.apache.spark.util.c
ollection.ExternalSorter@52fd6797
21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info. StageId
46, Partition
Id0, Attempt ID 83
21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
Path=file:/tmp/hudi-deltastreamer-
gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-46-8
3_20211119083434372.parquet.marker.CREATE
21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
:PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-021/11/19 08:35:45
DEBUG TaskMemoryManager: Task 83 release 0.0 B from
org.apache.spark.util.collection.ExternalSorter@149ed8ce
{code}
first creation refers to stage 40 and 2nd creation refers to stage 46. // Do
not coorrelate with above screenshots. they were for different runs for which I
did not collect this stage and attemptId info.
This is strange as to why this does not happen for first time when clustering
is triggered and happens only with retry of failed clustering. But confirmed
that if we remove the validate call, things are in good shape. marker files and
replace commit metadata are intact.
was:
desc: when a failed clustering is retried again, we see that HoodieCreateHandle
is triggered twice for isEmpty call for JavaRDD<WriteStatus>. And so markers
show one file extra when compared to actual replace commit metadata.
Scenario:
setup : induced intentional failure in clustering code path. i.e. after
applying changes to metadata table, jvm is crashed.
use-case:
deltastreamer continuous mode with async clustering enabled. at the end of
first clustering, we crash the jvm.
and we bring up the deltastreamer again. this will trigger rollback of pending
clustering. and then gets retried.
again, at the end of clustering, we crash it again.
Here, marker files show that there are 3 files in total, where as commit
metadata shows only 2 files. incidentally, two files have the same fileId, but
differs only with write token and one among them is the missing file.
//excerpts from logs which shows the creation of two files of interest.
{code:java}
21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
before updating metadata table ::
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
size 741905
21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
commit 20211119065822448
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet, size
741905
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet, size
566209
{code}
// marker files
{code:java}
ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
-rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
-rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet {code}
I inspected the spark UI and could not spot any failures. And then triaged the
issue to
isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
We have a collect() and isEmpty(). collect() triggers the actual execution of
all writes. But a following isEmpty() triggers just one spark task which again
creates a file w/ same fileId but with different write token. Confirmed this
from logs.
!Screen Shot 2021-11-19 at 10.08.48 AM.png!
code path of interest in
code path of interest in SparkExecuteClusteringCommitActionExecutor.
{code:java}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata); {code}
{code:java}
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus
for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at
least "
+
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
} {code}
According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
invoked as part of validateWriteResult, HoodieCreateHandle is triggered again
which results in a new file (same fileId, but diff write token).
This is strange as to why this does not happen for first time when clustering
is triggered and happens only with retry of failed clustering. But confirmed
that if we remove the validate call, things are in good shape. marker files and
replace commit metadata are intact.
> Clustering execution triggers HoodieCreateHandle twice during
> validateWriteResult isEmpty call
> ----------------------------------------------------------------------------------------------
>
> Key: HUDI-2800
> URL: https://issues.apache.org/jira/browse/HUDI-2800
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Priority: Major
> Fix For: 0.10.0
>
> Attachments: Screen Shot 2021-11-19 at 10.08.48 AM.png
>
>
> desc: when a failed clustering is retried again, we see that
> HoodieCreateHandle is triggered twice for isEmpty call for
> JavaRDD<WriteStatus>. And so markers show one file extra when compared to
> actual replace commit metadata.
> Scenario:
> setup : induced intentional failure in clustering code path. i.e. after
> applying changes to metadata table, jvm is crashed.
> use-case:
> deltastreamer continuous mode with async clustering enabled. at the end of
> first clustering, we crash the jvm.
> and we bring up the deltastreamer again. this will trigger rollback of
> pending clustering. and then gets retried.
> again, at the end of clustering, we crash it again.
> Here, marker files show that there are 3 files in total, where as commit
> metadata shows only 2 files. incidentally, two files have the same fileId,
> but differs only with write token and one among them is the missing file.
> //excerpts from logs which shows the creation of two files of interest.
>
> {code:java}
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: Clustering commit metadata just
> before updating metadata table ::
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> PushEvent/355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN SparkRDDWriteClient: file path
> CreateEvent/a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 56620921/11/19 07:02:46 WARN HoodieTableMetadataUtil: MDT. applying
> commit 20211119065822448
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition PushEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet,
> size 741905
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: for partition CreateEvent
> 21/11/19 07:02:46 WARN HoodieTableMetadataUtil: new file
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet,
> size 566209
> {code}
>
> // marker files
> {code:java}
> ls -ltr /tmp/hudi-deltastreamer-gh-mw/PushEvent/ | grep 355c9651-395e
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-42-79_20211119065822448.parquet
> -rw-r--r-- 1 nsb wheel 741905 Nov 19 07:02
> 355c9651-395e-4b17-9600-79f4dfb077f0-0_0-46-81_20211119065822448.parquet
> nsb$ ls -ltr /tmp/hudi-deltastreamer-gh-mw/CreateEvent/ | grep a7f7a48b-10fc
> -rw-r--r-- 1 nsb wheel 566209 Nov 19 07:02
> a7f7a48b-10fc-41aa-a4c6-c4963a83aed5-0_1-42-80_20211119065822448.parquet
> {code}
>
> I inspected the spark UI and could not spot any failures. And then triaged
> the issue to
> isEmpty() at SparkExecuteClusteringCommitActionExecutor L103.
> We have a collect() and isEmpty(). collect() triggers the actual execution of
> all writes. But a following isEmpty() triggers just one spark task which
> again creates a file w/ same fileId but with different write token. Confirmed
> this from logs.
> !Screen Shot 2021-11-19 at 10.08.48 AM.png!
>
> code path of interest in
> code path of interest in SparkExecuteClusteringCommitActionExecutor.
>
> {code:java}
> HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata =
> ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends
> HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
> ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
> new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
> HoodieWriteConfig.class}, table, context, config))
> .performClustering(clusteringPlan, schema, instantTime);
> JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
> JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
> writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
> writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
> validateWriteResult(writeMetadata);
> commitOnAutoCommit(writeMetadata); {code}
>
> {code:java}
> private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>>
> writeMetadata) {
> if (writeMetadata.getWriteStatuses().isEmpty()) {
> throw new HoodieClusteringException("Clustering plan produced 0
> WriteStatus for " + instantTime
> + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
> at least "
> +
> clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
> + " write statuses");
> }
> } {code}
>
> According to spark logs, when writeMetadata.getWriteStatuses().isEmpty() is
> invoked as part of validateWriteResult, HoodieCreateHandle is triggered
> again which results in a new file (same fileId, but diff write token).
>
> //excerpts from logs to show the different stageIds with which create handle
> is called.
>
> {code:java}
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info.
> StageId 40, Partition
> Id0, Attempt ID 79
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-0
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-40-7
> 9_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 DEBUG TaskMemoryManager: Task 79 release 0.0 B from
> org.apache.spark.util.c
> ollection.ExternalSorter@52fd6797
> 21/11/19 08:35:45 WARN HoodieCreateHandle: new Create Handle task info.
> StageId 46, Partition
> Id0, Attempt ID 83
> 21/11/19 08:35:45 INFO DirectWriteMarkers: Creating Marker
> Path=file:/tmp/hudi-deltastreamer-
> gh-mw/.hoodie/.temp/20211119083434372/PushEvent/53bd9257-f833-4e04-a7df-423891d69779-0_0-46-8
> 3_20211119083434372.parquet.marker.CREATE
> 21/11/19 08:35:45 INFO HoodieCreateHandle: New CreateHandle for partition
> :PushEvent with fileId 53bd9257-f833-4e04-a7df-423891d69779-021/11/19
> 08:35:45 DEBUG TaskMemoryManager: Task 83 release 0.0 B from
> org.apache.spark.util.collection.ExternalSorter@149ed8ce
> {code}
>
> first creation refers to stage 40 and 2nd creation refers to stage 46. // Do
> not coorrelate with above screenshots. they were for different runs for which
> I did not collect this stage and attemptId info.
>
> This is strange as to why this does not happen for first time when clustering
> is triggered and happens only with retry of failed clustering. But confirmed
> that if we remove the validate call, things are in good shape. marker files
> and replace commit metadata are intact.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)