pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432413567



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -164,33 +158,32 @@ public void processBarrier(
                        hasInflightBuffers[channelIndex] = false;
                        numBarrierConsumed++;
                }
-               // processBarrier is called from task thread and can actually 
happen before notifyBarrierReceived on empty
-               // buffer queues
-               // to avoid replicating any logic, we simply call 
notifyBarrierReceived here as well

Review comment:
       nit: isn't this comment still valid and worth keeping?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -284,21 +280,18 @@ private int getFlattenedChannelIndex(InputChannelInfo 
channelInfo) {
                 */
                private long currentReceivedCheckpointId = -1L;
 
-               /** The number of opened channels. */
+               /** The number of open channels. */

Review comment:
       nit: just drop the comment as it only adds words "the" and "of" to the 
variable name

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
                        return lastCanceledCheckpointId;
                }
        }
+
+       /**
+        * Specific {@link AbstractInvokable} implementation to record and 
validate which checkpoint
+        * id is executed and how many checkpoints are executed.
+        */
+       private static final class ValidatingCheckpointInvokable extends 
AbstractInvokable {
+
+               private long expectedCheckpointId;
+
+               private int totalNumCheckpoints;
+
+               ValidatingCheckpointInvokable() {
+                       super(new DummyEnvironment("test", 1, 0));
+               }
+
+               public void invoke() {
+                       throw new UnsupportedOperationException();
+               }
+
+               public void triggerCheckpointOnBarrier(
+                               CheckpointMetaData checkpointMetaData,
+                               CheckpointOptions checkpointOptions,
+                               CheckpointMetrics checkpointMetrics) {
+                       expectedCheckpointId = 
checkpointMetaData.getCheckpointId();
+                       totalNumCheckpoints++;
+               }
+
+               @Override
+               public <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       runnable.run();
+               }
+
+               long getTriggeredCheckpointId() {
+                       return expectedCheckpointId;
+               }
+
+               int getTotalTriggeredCheckpoints() {
+                       return totalNumCheckpoints;
+               }
+       }
+
+       /**
+        * Specific {@link CheckpointBarrierUnaligner} implementation to mock 
the scenario that the later triggered
+        * checkpoint executes before the preceding triggered checkpoint.
+        */
+       private static final class ValidatingCheckpointBarrierUnaligner extends 
CheckpointBarrierUnaligner {
+
+               private ThrowingRunnable waitingRunnable;
+               private boolean firstRunnable = true;
+
+               ValidatingCheckpointBarrierUnaligner(AbstractInvokable 
invokable) {
+                       super(
+                               new int[]{1},
+                               new ChannelStateWriter.NoOpChannelStateWriter(),
+                               "test",
+                               invokable);
+               }
+
+               @Override
+               protected <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       if (firstRunnable) {
+                               waitingRunnable = runnable;
+                               firstRunnable = false;
+                       } else {
+                               super.executeInTaskThread(runnable, 
"checkpoint");
+                               super.executeInTaskThread(waitingRunnable, 
"checkpoint");
+                       }
+               }
+       }

Review comment:
       I think this is not the best way to test this race condition. It's 
dubious to override a class that we want to test here 
(`ValidatingCheckpointBarrierUnaligner`). Also this is a very tight coupling 
and depending on the private implementation details, which assumes 
`executeInTaskThread` will be called only once. More over it brakes an 
assumption that mails should be executed in order.
   
   I think it would be much better to do in the following manner:
   
   1. straighten up threading model a bit and do not enqueue any emails in 
`CheckpointBarrierUnaligner#processBarrier` call, as this is already happening 
inside mailbox thread. It introduces unnecessary possibility for the race 
conditions and makes our live so much more difficult in this test. Currently by 
going through mailbox in `processBarrier` we are avoiding a bit of code 
duplication/simplifing a bit `notifyBarrierReceived` method, in an exchange of 
bad threading model and actually IMO more complicated code. We should fix this 
either way.
   
   2. pass an `AbstractInvokable` instance that would be using 
`SteppingMailboxProcessor` to implement 
`org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#executeInTaskThread` 
   
   3. inside the `testConcurrentProcessBarrierAndNotifyBarrierReceived` test do 
the following sequence:
   ```
   
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(0),
 channelInfo); // (a)
   handler.processBarrier(buildCheckpointBarrier(1), 0); // (b)
   steppingMailboxExecutor.runMailboxStep(); // (c)
   ```
   (c) would execute mailbox action from (a). 
   
   This would test the race condition without braking any contracts (like out 
of order mail execution) and without overriding `CheckpointBarrierUnaligner`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to