hbgstc123 opened a new issue, #6711:
URL: https://github.com/apache/hudi/issues/6711
I meet a situation where hudi table need 2 writers, the other is flink job
writing new data, one is for deleting old data.
In flink job, use occ concurrency Control:
```
'hoodie.write.concurrency.mode'='optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes'='LAZY',
'hoodie.write.lock.provider'='org.apache.hudi.hive.HiveMetastoreBasedLockProvider',
'hoodie.write.lock.hivemetastore.database'='db1',
'hoodie.write.lock.hivemetastore.table'='table1'
```
When run with 4 upsert writer, it run normally.
But when raise to 64 upsert writers, ckp start to fail occasionally with
`Caused by: org.apache.hudi.exception.HoodieLockException: Unable to acquire
lock, lock object null`
Steps to reproduce the behavior:
1.write to a hudi table with flink
2.set concurrency mode to occ, use hms lock provider
3.set writer number to 64 or higher
4.Then some checkpoint fail with HoodieLockException
**Expected behavior**
no error
**Environment Description**
* Hudi version : 0.11.1
* Flink version : 1.13
**Stacktrace**
```
java.io.IOException: Could not perform checkpoint 2 for operator
bucket_write: table1 (32/32)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1048)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:249)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:435)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:226)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:782)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 2 for operator bucket_write: table1 (32/32)#0. Failure
reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1092)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1076)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1032)
... 19 more
Caused by: org.apache.hudi.exception.HoodieLockException: Unable to acquire
lock, lock object null
at
org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:82)
at
org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:53)
at
org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1458)
at
org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1493)
at
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:138)
at
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:187)
at
org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:466)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at
org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:458)
at
org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:134)
at
org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:100)
at
org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:157)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 29 more
```
I add some log and find the exception thrown because StreamWriteFunction
fail to get lock in ```maxRetries``` time which is 10 config by
```hoodie.write.lock.client.num_retries```.
Do we have a good solution for this condition, like don't lock the table
when a writer try to flush data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]