[
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425741#comment-17425741
]
Dave Hagman commented on HUDI-2275:
-----------------------------------
I did not try that patch. I will do that and report back. FYI with Hudi 0.9 I'm
receiving a new error (below). Looking at the code, that validation is throwing
an exception because the commit file that it expects to be present is not
there.
{code:java}
21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction
owner Option{val=[==>20211007170717__commit__INFLIGHT]}
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath
= /events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath =
/events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO TransactionManager: Transaction ended
Exception in thread "main" java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}
> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
> Key: HUDI-2275
> URL: https://issues.apache.org/jira/browse/HUDI-2275
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer, Spark Integration, Writer Core
> Affects Versions: 0.9.0
> Reporter: Dave Hagman
> Assignee: Sagar Sumit
> Priority: Critical
> Labels: sev:critical
> Fix For: 0.10.0
>
>
> I am trying to utilize [Optimistic Concurrency
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow
> two writers to update a single table simultaneously. The two writers are:
> * Writer A: Deltastreamer job consuming continuously from Kafka
> * Writer B: A spark datasource-based writer that is consuming parquet files
> out of S3
> * Table Type: Copy on Write
>
> After a few commits from each writer the deltastreamer will fail with the
> following exception:
>
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find
> previous checkpoint. Please double check if this table was indeed built via
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]},
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
> "partitionToWriteStats" : {
> ...{code}
>
> What appears to be happening is a lack of commit isolation between the two
> writers
> Writer B (spark datasource writer) will land commits which are eventually
> picked up by Writer A (Delta Streamer). This is an issue because the Delta
> Streamer needs checkpoint information which the spark datasource of course
> does not include in its commits. My understanding was that OCC was built for
> this very purpose (among others).
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
> hoodie.cleaner.policy.failed.writes=LAZY
>
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
> hoodie.write.lock.zookeeper.url=<zk_host>
> hoodie.write.lock.zookeeper.port=2181
> hoodie.write.lock.zookeeper.lock_key=writer_lock
> hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
> .option("hoodie.cleaner.policy.failed.writes", "LAZY")
> .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
> .option(
> "hoodie.write.lock.provider",
>
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
> )
> .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
> .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
> .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
> .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
> * Start a deltastreamer job against some table Foo
> * In parallel, start writing to the same table Foo using spark datasource
> writer
> * Note that after a few commits from each the deltastreamer is likely to
> fail with the above exception when the datasource writer creates non-isolated
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on
> completely different spark clusters but I assumed this shouldn't be an issue
> since we're locking using Zookeeper
--
This message was sent by Atlassian Jira
(v8.3.4#803005)