pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432413567
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -164,33 +158,32 @@ public void processBarrier(
hasInflightBuffers[channelIndex] = false;
numBarrierConsumed++;
}
- // processBarrier is called from task thread and can actually
happen before notifyBarrierReceived on empty
- // buffer queues
- // to avoid replicating any logic, we simply call
notifyBarrierReceived here as well
Review comment:
nit: isn't this comment still valid and worth keeping?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -284,21 +280,18 @@ private int getFlattenedChannelIndex(InputChannelInfo
channelInfo) {
*/
private long currentReceivedCheckpointId = -1L;
- /** The number of opened channels. */
+ /** The number of open channels. */
Review comment:
nit: just drop the comment as it only adds words "the" and "of" to the
variable name
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
return lastCanceledCheckpointId;
}
}
+
+ /**
+ * Specific {@link AbstractInvokable} implementation to record and
validate which checkpoint
+ * id is executed and how many checkpoints are executed.
+ */
+ private static final class ValidatingCheckpointInvokable extends
AbstractInvokable {
+
+ private long expectedCheckpointId;
+
+ private int totalNumCheckpoints;
+
+ ValidatingCheckpointInvokable() {
+ super(new DummyEnvironment("test", 1, 0));
+ }
+
+ public void invoke() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void triggerCheckpointOnBarrier(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) {
+ expectedCheckpointId =
checkpointMetaData.getCheckpointId();
+ totalNumCheckpoints++;
+ }
+
+ @Override
+ public <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ runnable.run();
+ }
+
+ long getTriggeredCheckpointId() {
+ return expectedCheckpointId;
+ }
+
+ int getTotalTriggeredCheckpoints() {
+ return totalNumCheckpoints;
+ }
+ }
+
+ /**
+ * Specific {@link CheckpointBarrierUnaligner} implementation to mock
the scenario that the later triggered
+ * checkpoint executes before the preceding triggered checkpoint.
+ */
+ private static final class ValidatingCheckpointBarrierUnaligner extends
CheckpointBarrierUnaligner {
+
+ private ThrowingRunnable waitingRunnable;
+ private boolean firstRunnable = true;
+
+ ValidatingCheckpointBarrierUnaligner(AbstractInvokable
invokable) {
+ super(
+ new int[]{1},
+ new ChannelStateWriter.NoOpChannelStateWriter(),
+ "test",
+ invokable);
+ }
+
+ @Override
+ protected <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ if (firstRunnable) {
+ waitingRunnable = runnable;
+ firstRunnable = false;
+ } else {
+ super.executeInTaskThread(runnable,
"checkpoint");
+ super.executeInTaskThread(waitingRunnable,
"checkpoint");
+ }
+ }
+ }
Review comment:
I think this is not the best way to test this race condition. It's
dubious to override a class that we want to test here
(`ValidatingCheckpointBarrierUnaligner`). Also this is a very tight coupling
and depending on the private implementation details, which assumes
`executeInTaskThread` will be called only once. More over it brakes an
assumption that mails should be executed in order.
I think it would be much better to do in the following manner:
1. straighten up threading model a bit and do not enqueue any emails in
`CheckpointBarrierUnaligner#processBarrier` call, as this is already happening
inside mailbox thread. It introduces unnecessary possibility for the race
conditions and makes our live so much more difficult in this test. Currently by
going through mailbox in `processBarrier` we are avoiding a bit of code
duplication/simplifing a bit `notifyBarrierReceived` method, in an exchange of
bad threading model and actually IMO more complicated code. We should fix this
either way.
2. pass an `AbstractInvokable` instance that would be using
`SteppingMailboxProcessor` to implement
`org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#executeInTaskThread`
3. inside the `testConcurrentProcessBarrierAndNotifyBarrierReceived` test do
the following sequence:
```
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(0),
channelInfo); // (a)
handler.processBarrier(buildCheckpointBarrier(1), 0); // (b)
steppingMailboxExecutor.runMailboxStep(); // (c)
```
(c) would execute mailbox action from (a).
This would test the race condition without braking any contracts (like out
of order mail execution) and without overriding `CheckpointBarrierUnaligner`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]