Dave Hagman created HUDI-2275:
---------------------------------
Summary: HoodieDeltaStreamerException when using OCC and a second
concurrent writer
Key: HUDI-2275
URL: https://issues.apache.org/jira/browse/HUDI-2275
Project: Apache Hudi
Issue Type: Bug
Components: DeltaStreamer, Spark Integration, Writer Core
Affects Versions: 0.9.0
Reporter: Dave Hagman
I am trying to utilize [Optimistic Concurrency
Control|https://hudi.apache.org/docs/concurrency_control] in order to allow two
writers to update a single table simultaneously. The two writers are:
* Writer A: Deltastreamer job consuming continuously from Kafka
* Writer B: A spark datasource-based writer that is consuming parquet files
out of S3
* Table Type: Copy on Write
After a few commits from each writer the deltastreamer will fail with the
following exception:
org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find previous
checkpoint. Please double check if this table was indeed built via delta
streamer. Last Commit :Option\{val=[20210803165741__commit__COMPLETED]},
Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
"partitionToWriteStats" : {
...
What appears to be happening is a lack of commit isolation between the two
writers
Writer B (spark datasource writer) will land commits which are eventually
picked up by Writer A (Delta Streamer). This is an issue because the Delta
Streamer needs checkpoint information which the spark datasource of course does
not include in its commits. My understanding was that OCC was built for this
very purpose (among others).
OCC config for Delta Streamer:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=<zk_host>
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=writer_lock
hoodie.write.lock.zookeeper.base_path=/hudi-write-locks
OCC config for spark datasource:
// Multi-writer concurrency
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option(
"hoodie.write.lock.provider",
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
)
.option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
.option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
.option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
.option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks")
h3. Steps to Reproduce:
* Start a deltastreamer job against some table Foo
* In parallel, start writing to the same table Foo using spark datasource
writer
* Note that after a few commits from each the deltastreamer is likely to fail
with the above exception when the datasource writer creates non-isolated
inflight commits
NOTE: I have not tested this with two of the same datasources (ex. two
deltastreamer jobs)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)