Github user bjlovegithub commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r86517940
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
@@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData
checkpointMetaData) throws
synchronized (lock) {
if (isRunning) {
+ checkpointState(checkpointMetaData);
- // Since both state checkpointing and
downstream barrier emission occurs in this
- // lock scope, they are an atomic operation
regardless of the order in which they occur.
- // Given this, we immediately emit the
checkpoint barriers, so the downstream operators
- // can start their checkpoint work as soon as
possible
+ // broadcast barriers after snapshot operators'
states.
operatorChain.broadcastCheckpointBarrier(
-
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
- checkpointState(checkpointMetaData);
+
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
+ );
--- End diff --
Yes. We have discussed this problem, and the solution will make sure that
we will not change the order of broadcasting barriers and then snapshotting.
The inelegant way is to stop all `Emitter` Thread first before broadcasting
barrier. But it is a little tricky.
Another way is to change checkpoint lock into ReentrantReadWriteLock. For
main thread and emitter thread, they have to acquire read lock. But for
checkpoint thread or checkpoint procedure, write lock should be taken first, so
that all emitter threads also stop working. In this way, main thread and all
emitter thread will not block each other.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---