[
https://issues.apache.org/jira/browse/HUDI-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dave Hagman updated HUDI-2549:
------------------------------
Description:
When running the DeltaStreamer along with a second spark datasource writer
(with [ZK-based OCC
enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing]
we receive the following exception (which haults the spark datasource writer).
This occurs following warnings of timeline inconsistencies:
{code:java}
21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction
owner Option{val=[==>20211007170717__commit__INFLIGHT]}
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath
= /events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath =
/events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO TransactionManager: Transaction ended
Exception in thread "main" java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}
The validation at _*ValidationUtils.checkArgument*_ fails because the expected
commit file was not present on DFS.
The test setup is this:
* Deltastreamer, continuous mode. Small batch sizes and very fast commit times
(high commit rate, every 10-30 seconds)
* A spark datasource writer moving flat parquet files from a source bucket
into the table maintained by the deltastreamer
* The spark datasource is much slower than the deltastreamer so
time-to-first-commit is about 2-8 minutes
What I see happen is the deltastreamer finalizing many commits while the spark
datasource is performing its write. It appears that the timeline changes so
much, so fast that the spark datasource writer becomes "out of sync" in ways
that it cannot recover. I see the {{Exception in thread "main"
java.lang.IllegalArgumentException}} error on the first commit of the
*datasource writer* every time. This appears to be a race condition when the
rate-of-change of the hudi timeline is very high (due to a fast deltastreamer
process). The spark datasource does not properly sync those changes in a
multi-writer configuration which causes enough inconsistency to crash the job.
was:
When running the DeltaStreamer along with a second spark datasource writer
(with [ZK-based OCC
enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing]
we receive the following exception (which haults the spark datasource writer).
This occurs following warnings of timeline inconsistencies:
{code:java}
21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction
owner Option{val=[==>20211007170717__commit__INFLIGHT]}
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath
= /events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath =
/events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO TransactionManager: Transaction ended
Exception in thread "main" java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}
The validation at _*ValidationUtils.checkArgument*_ fails because the expected
commit file was not present on DFS.
The test setup is this: * Deltastreamer, continuous mode. Small batch sizes and
very fast commit times (high commit rate, every 10-30 seconds)
* A spark datasource writer moving flat parquet files from a source bucket
into the table maintained by the deltastreamer
* The spark datasource is much slower than the deltastreamer so
time-to-first-commit is about 2-8 minutes
What I see happen is the deltastreamer finalizing many commits while the spark
datasource is performing its write. It appears that the timeline changes so
much, so fast that the spark datasource writer becomes "out of sync" in ways
that it cannot recover. I see the {{Exception in thread "main"
java.lang.IllegalArgumentException}} error on the first commit of the
*datasource writer* every time. This appears to be a race condition when the
rate-of-change of the hudi timeline is very high (due to a fast deltastreamer
process). The spark datasource does not properly sync those changes in a
multi-writer configuration which causes enough inconsistency to crash the job.
> Exceptions when using second writer into Hudi table managed by DeltaStreamer
> ----------------------------------------------------------------------------
>
> Key: HUDI-2549
> URL: https://issues.apache.org/jira/browse/HUDI-2549
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer, Spark Integration, Writer Core
> Reporter: Dave Hagman
> Assignee: Dave Hagman
> Priority: Critical
> Labels: multi-writer
> Fix For: 0.10.0
>
>
> When running the DeltaStreamer along with a second spark datasource writer
> (with [ZK-based OCC
> enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing]
> we receive the following exception (which haults the spark datasource
> writer). This occurs following warnings of timeline inconsistencies:
>
> {code:java}
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ending with
> transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]}
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock
> atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath
> = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ended
> Exception in thread "main" java.lang.IllegalArgumentException
> at
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
> at
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
> at
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> {code}
> The validation at _*ValidationUtils.checkArgument*_ fails because the
> expected commit file was not present on DFS.
>
> The test setup is this:
> * Deltastreamer, continuous mode. Small batch sizes and very fast commit
> times (high commit rate, every 10-30 seconds)
> * A spark datasource writer moving flat parquet files from a source bucket
> into the table maintained by the deltastreamer
> * The spark datasource is much slower than the deltastreamer so
> time-to-first-commit is about 2-8 minutes
> What I see happen is the deltastreamer finalizing many commits while the
> spark datasource is performing its write. It appears that the timeline
> changes so much, so fast that the spark datasource writer becomes "out of
> sync" in ways that it cannot recover. I see the {{Exception in thread "main"
> java.lang.IllegalArgumentException}} error on the first commit of the
> *datasource writer* every time. This appears to be a race condition when the
> rate-of-change of the hudi timeline is very high (due to a fast deltastreamer
> process). The spark datasource does not properly sync those changes in a
> multi-writer configuration which causes enough inconsistency to crash the
> job.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)