[ 
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dave Hagman updated HUDI-2275:
------------------------------
    Description: 
 I am trying to utilize [Optimistic Concurrency 
Control|https://hudi.apache.org/docs/concurrency_control] in order to allow two 
writers to update a single table simultaneously. The two writers are:
 * Writer A: Deltastreamer job consuming continuously from Kafka
 * Writer B: A spark datasource-based writer that is consuming parquet files 
out of S3
 * Table Type: Copy on Write

 

After a few commits from each writer the deltastreamer will fail with the 
following exception:

 
{code:java}
org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find previous 
checkpoint. Please double check if this table was indeed built via delta 
streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
 "partitionToWriteStats" : {
 ...{code}
 

What appears to be happening is a lack of commit isolation between the two 
writers
 Writer B (spark datasource writer) will land commits which are eventually 
picked up by Writer A (Delta Streamer). This is an issue because the Delta 
Streamer needs checkpoint information which the spark datasource of course does 
not include in its commits. My understanding was that OCC was built for this 
very purpose (among others). 

OCC config for Delta Streamer:
{code:java}
hoodie.write.concurrency.mode=optimistic_concurrency_control
 hoodie.cleaner.policy.failed.writes=LAZY
 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
 hoodie.write.lock.zookeeper.url=<zk_host>
 hoodie.write.lock.zookeeper.port=2181
 hoodie.write.lock.zookeeper.lock_key=writer_lock
 hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}

  

OCC config for spark datasource:
{code:java}
// Multi-writer concurrency
 .option("hoodie.cleaner.policy.failed.writes", "LAZY")
 .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
 .option(
 "hoodie.write.lock.provider",
 
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
 )
 .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
 .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
 .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
 .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
h3. Steps to Reproduce:
 * Start a deltastreamer job against some table Foo
 * In parallel, start writing to the same table Foo using spark datasource 
writer
 * Note that after a few commits from each the deltastreamer is likely to fail 
with the above exception when the datasource writer creates non-isolated 
inflight commits

NOTE: I have not tested this with two of the same datasources (ex. two 
deltastreamer jobs)

  was:
 I am trying to utilize [Optimistic Concurrency 
Control|https://hudi.apache.org/docs/concurrency_control] in order to allow two 
writers to update a single table simultaneously. The two writers are:
 * Writer A: Deltastreamer job consuming continuously from Kafka
 * Writer B: A spark datasource-based writer that is consuming parquet files 
out of S3
 * Table Type: Copy on Write

 

After a few commits from each writer the deltastreamer will fail with the 
following exception:

 
{code:java}
org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find previous 
checkpoint. Please double check if this table was indeed built via delta 
streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
 "partitionToWriteStats" : {
 ...{code}
 

What appears to be happening is a lack of commit isolation between the two 
writers
 Writer B (spark datasource writer) will land commits which are eventually 
picked up by Writer A (Delta Streamer). This is an issue because the Delta 
Streamer needs checkpoint information which the spark datasource of course does 
not include in its commits. My understanding was that OCC was built for this 
very purpose (among others). 

OCC config for Delta Streamer:
 hoodie.write.concurrency.mode=optimistic_concurrency_control
 hoodie.cleaner.policy.failed.writes=LAZY
 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
 hoodie.write.lock.zookeeper.url=<zk_host>
 hoodie.write.lock.zookeeper.port=2181
 hoodie.write.lock.zookeeper.lock_key=writer_lock
 hoodie.write.lock.zookeeper.base_path=/hudi-write-locks
  

OCC config for spark datasource:
 // Multi-writer concurrency
 .option("hoodie.cleaner.policy.failed.writes", "LAZY")
 .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
 .option(
 "hoodie.write.lock.provider",
 
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
 )
 .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
 .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
 .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
 .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks")
h3. Steps to Reproduce:
 * Start a deltastreamer job against some table Foo
 * In parallel, start writing to the same table Foo using spark datasource 
writer
 * Note that after a few commits from each the deltastreamer is likely to fail 
with the above exception when the datasource writer creates non-isolated 
inflight commits

NOTE: I have not tested this with two of the same datasources (ex. two 
deltastreamer jobs)


> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Priority: Critical
>
>  I am trying to utilize [Optimistic Concurrency 
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow 
> two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files 
> out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the 
> following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find 
> previous checkpoint. Please double check if this table was indeed built via 
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, 
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two 
> writers
>  Writer B (spark datasource writer) will land commits which are eventually 
> picked up by Writer A (Delta Streamer). This is an issue because the Delta 
> Streamer needs checkpoint information which the spark datasource of course 
> does not include in its commits. My understanding was that OCC was built for 
> this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>   
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource 
> writer
>  * Note that after a few commits from each the deltastreamer is likely to 
> fail with the above exception when the datasource writer creates non-isolated 
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two 
> deltastreamer jobs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to