[
https://issues.apache.org/jira/browse/HUDI-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dave Hagman closed HUDI-2549.
-----------------------------
Resolution: Duplicate
> Exceptions when using second writer into Hudi table managed by DeltaStreamer
> ----------------------------------------------------------------------------
>
> Key: HUDI-2549
> URL: https://issues.apache.org/jira/browse/HUDI-2549
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer, Spark Integration, Writer Core
> Reporter: Dave Hagman
> Assignee: Dave Hagman
> Priority: Critical
> Labels: multi-writer, sev:critical
> Fix For: 0.10.0
>
>
> When running the DeltaStreamer along with a second spark datasource writer
> (with [ZK-based OCC
> enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing]
> we receive the following exception (which haults the spark datasource
> writer). This occurs following warnings of timeline inconsistencies:
>
> {code:java}
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ending with
> transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]}
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock
> atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath
> = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ended
> Exception in thread "main" java.lang.IllegalArgumentException
> at
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
> at
> org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
> at
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
> at
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> {code}
> The validation at _*ValidationUtils.checkArgument*_ fails because the
> expected commit file was not present on DFS.
>
> The test setup is this:
> * Deltastreamer, continuous mode. Small batch sizes and very fast commit
> times (high commit rate, every 10-30 seconds)
> * A spark datasource writer moving flat parquet files from a source bucket
> into the table maintained by the deltastreamer
> * The spark datasource is much slower than the deltastreamer so
> time-to-first-commit is about 2-8 minutes
> What I see happen is the deltastreamer finalizing many commits while the
> spark datasource is performing its write. It appears that the timeline
> changes so much, so fast that the spark datasource writer becomes "out of
> sync" in ways that it cannot recover. I see the {{Exception in thread "main"
> java.lang.IllegalArgumentException}} error on the first commit of the
> *datasource writer* every time. This appears to be a race condition when the
> rate-of-change of the hudi timeline is very high (due to a fast deltastreamer
> process). The spark datasource does not properly sync those changes in a
> multi-writer configuration which causes enough inconsistency to crash the
> job.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)