pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
                        return lastCanceledCheckpointId;
                }
        }
+
+       /**
+        * Specific {@link AbstractInvokable} implementation to record and 
validate which checkpoint
+        * id is executed and how many checkpoints are executed.
+        */
+       private static final class ValidatingCheckpointInvokable extends 
AbstractInvokable {
+
+               private long expectedCheckpointId;
+
+               private int totalNumCheckpoints;
+
+               ValidatingCheckpointInvokable() {
+                       super(new DummyEnvironment("test", 1, 0));
+               }
+
+               public void invoke() {
+                       throw new UnsupportedOperationException();
+               }
+
+               public void triggerCheckpointOnBarrier(
+                               CheckpointMetaData checkpointMetaData,
+                               CheckpointOptions checkpointOptions,
+                               CheckpointMetrics checkpointMetrics) {
+                       expectedCheckpointId = 
checkpointMetaData.getCheckpointId();
+                       totalNumCheckpoints++;
+               }
+
+               @Override
+               public <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       runnable.run();
+               }
+
+               long getTriggeredCheckpointId() {
+                       return expectedCheckpointId;
+               }
+
+               int getTotalTriggeredCheckpoints() {
+                       return totalNumCheckpoints;
+               }
+       }
+
+       /**
+        * Specific {@link CheckpointBarrierUnaligner} implementation to mock 
the scenario that the later triggered
+        * checkpoint executes before the preceding triggered checkpoint.
+        */
+       private static final class ValidatingCheckpointBarrierUnaligner extends 
CheckpointBarrierUnaligner {
+
+               private ThrowingRunnable waitingRunnable;
+               private boolean firstRunnable = true;
+
+               ValidatingCheckpointBarrierUnaligner(AbstractInvokable 
invokable) {
+                       super(
+                               new int[]{1},
+                               new ChannelStateWriter.NoOpChannelStateWriter(),
+                               "test",
+                               invokable);
+               }
+
+               @Override
+               protected <E extends Exception> void executeInTaskThread(
+                               ThrowingRunnable<E> runnable,
+                               String descriptionFormat,
+                               Object... descriptionArgs) throws E {
+                       if (firstRunnable) {
+                               waitingRunnable = runnable;
+                               firstRunnable = false;
+                       } else {
+                               super.executeInTaskThread(runnable, 
"checkpoint");
+                               super.executeInTaskThread(waitingRunnable, 
"checkpoint");
+                       }
+               }
+       }

Review comment:
       I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent 
of `DummyInvokable` that has a `SteppingMailboxProcessor` 
(`SteppingMailboxProcessor` already exists in the code base and I'm not even 
sure if you could just use plain `MailboxProcessor`) 
   
   > For this case, actually the CheckpointBarrierUnaligner component will 
interact with AbstractInvokable with internal TaskMailbox model. 
SteppingMailboxProcessor is also a re-implemented model to replace the real 
component inside AbstractInvokable, so it somehow still relies on the private 
implementation inside SteppingMailboxProcessor.
   
   It's not the same. 
   
   1. In your case, you are modifying the class that you are testing, which 
invalidates the test to some extent. To some extent you are testing your test 
implementation and you have to assume by looking at the code, that it doesn't 
affect the purpose of the test.
   
   2. The idea of mocking (for example passing `DummyInvokable` with 
`SteppingMailboxProcessor`) is that you provide a mock implementation of some 
interface, that adheres to the contract of those interfaces (maybe in 
limited/restricted scope, but it shouldn't be braking them) and you pass them 
to real production code that you intend to test.
   
   For example imagine a change, if `CheckpointBarrierUnaligner` changes and 
enqueues one more mail in one of the calls. Or that it starts relying on the 
fact, that order of enqueued emails guarantees their execution order - both of 
those refactorings would be valid but wouldn't work with your current 
implementation, giving false failing test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to