zhijiangW commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432559532



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
                        return lastCanceledCheckpointId;
                }
        }
+
+       /**
+        * Specific {@link AbstractInvokable} implementation to record and 
validate which checkpoint
+        * id is executed and how many checkpoints are executed.
+        */
+       private static final class ValidatingCheckpointInvokable extends 
AbstractInvokable {
+
+               private long expectedCheckpointId;
+
+               private int totalNumCheckpoints;
+
+               ValidatingCheckpointInvokable() {
+                       super(new DummyEnvironment("test", 1, 0));
+               }
+
+               public void invoke() {
+                       throw new UnsupportedOperationException();
+               }
+
+               public void triggerCheckpointOnBarrier(
+                               CheckpointMetaData checkpointMetaData,
+                               CheckpointOptions checkpointOptions,
+                               CheckpointMetrics checkpointMetrics) {
+                       expectedCheckpointId = 
checkpointMetaData.getCheckpointId();
+                       totalNumCheckpoints++;
+               }
+
+               @Override
+               public <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       runnable.run();
+               }
+
+               long getTriggeredCheckpointId() {
+                       return expectedCheckpointId;
+               }
+
+               int getTotalTriggeredCheckpoints() {
+                       return totalNumCheckpoints;
+               }
+       }
+
+       /**
+        * Specific {@link CheckpointBarrierUnaligner} implementation to mock 
the scenario that the later triggered
+        * checkpoint executes before the preceding triggered checkpoint.
+        */
+       private static final class ValidatingCheckpointBarrierUnaligner extends 
CheckpointBarrierUnaligner {
+
+               private ThrowingRunnable waitingRunnable;
+               private boolean firstRunnable = true;
+
+               ValidatingCheckpointBarrierUnaligner(AbstractInvokable 
invokable) {
+                       super(
+                               new int[]{1},
+                               new ChannelStateWriter.NoOpChannelStateWriter(),
+                               "test",
+                               invokable);
+               }
+
+               @Override
+               protected <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       if (firstRunnable) {
+                               waitingRunnable = runnable;
+                               firstRunnable = false;
+                       } else {
+                               super.executeInTaskThread(runnable, 
"checkpoint");
+                               super.executeInTaskThread(waitingRunnable, 
"checkpoint");
+                       }
+               }
+       }

Review comment:
       I indeed considered the way of verifying the race condition via somehow 
real `AbstractInvokable` with `TaskMailbox`, but also thought that these two 
components are a bit far away from `CheckpointBarrierHandler` and they are also 
a bit heavy-weight components from themselves. 
   
   From the aspect of touching less external components in unit tests, i chose 
the current way. Actually I bypassed the mailbox implementation and simulate 
the race condition via executing the runnable in mis-order way. The propose for 
introducing `ValidatingCheckpointInvokable` and 
`ValidatingCheckpointBarrierUnaligner` is just for avoiding relying on external 
components of `AbstractInvokable` and `TaskMailbox` in unit tests.
   
   And this test is for verifying the processes of 
`CheckpointBarrierUnaligner#processBarrier` and `#notifyBarrierReceived`, to 
confirm the new introduced method `CheckpointBarrierUnaligner#notifyCheckpoint` 
really effect in these interactions. All these three methods would be really 
touched in this test.
   
   From another aspect, for the interaction between two components it is better 
to verify the real interactions using two real components without 
re-implementing either sides. Then any internal core changes in either 
component will be reflected in the test. For this case, actually the 
`CheckpointBarrierUnaligner` component will interact with `AbstractInvokable` 
with internal `TaskMailbox` model.  `SteppingMailboxProcessor` is also a 
re-implemented model to replace the real component inside `AbstractInvokable`, 
so it somehow still relies on the private implementation inside 
`SteppingMailboxProcessor`.
   
   All in all, it might be better than my current way, and i can try out to use 
the real model AMAP in this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to