[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425741#comment-17425741
 ] 

Dave Hagman commented on HUDI-2275:
-----------------------------------

I did not try that patch. I will do that and report back. FYI with Hudi 0.9 I'm 
receiving a new error (below). Looking at the code, that validation is throwing 
an exception because the commit file that it expects to be present is not 
there. 

 
{code:java}
21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction 
owner Option{val=[==>20211007170717__commit__INFLIGHT]}
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath 
= /events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath = 
/events/test/mwc/v1, lock key = events_mwc_test_v1
21/10/07 17:10:05 INFO TransactionManager: Transaction ended
Exception in thread "main" java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Assignee: Sagar Sumit
>            Priority: Critical
>              Labels: sev:critical
>             Fix For: 0.10.0
>
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on 
> completely different spark clusters but I assumed this shouldn't be an issue 
> since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to