zhijiangW commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432559532
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
return lastCanceledCheckpointId;
}
}
+
+ /**
+ * Specific {@link AbstractInvokable} implementation to record and
validate which checkpoint
+ * id is executed and how many checkpoints are executed.
+ */
+ private static final class ValidatingCheckpointInvokable extends
AbstractInvokable {
+
+ private long expectedCheckpointId;
+
+ private int totalNumCheckpoints;
+
+ ValidatingCheckpointInvokable() {
+ super(new DummyEnvironment("test", 1, 0));
+ }
+
+ public void invoke() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void triggerCheckpointOnBarrier(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) {
+ expectedCheckpointId =
checkpointMetaData.getCheckpointId();
+ totalNumCheckpoints++;
+ }
+
+ @Override
+ public <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ runnable.run();
+ }
+
+ long getTriggeredCheckpointId() {
+ return expectedCheckpointId;
+ }
+
+ int getTotalTriggeredCheckpoints() {
+ return totalNumCheckpoints;
+ }
+ }
+
+ /**
+ * Specific {@link CheckpointBarrierUnaligner} implementation to mock
the scenario that the later triggered
+ * checkpoint executes before the preceding triggered checkpoint.
+ */
+ private static final class ValidatingCheckpointBarrierUnaligner extends
CheckpointBarrierUnaligner {
+
+ private ThrowingRunnable waitingRunnable;
+ private boolean firstRunnable = true;
+
+ ValidatingCheckpointBarrierUnaligner(AbstractInvokable
invokable) {
+ super(
+ new int[]{1},
+ new ChannelStateWriter.NoOpChannelStateWriter(),
+ "test",
+ invokable);
+ }
+
+ @Override
+ protected <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ if (firstRunnable) {
+ waitingRunnable = runnable;
+ firstRunnable = false;
+ } else {
+ super.executeInTaskThread(runnable,
"checkpoint");
+ super.executeInTaskThread(waitingRunnable,
"checkpoint");
+ }
+ }
+ }
Review comment:
I indeed considered the way of verifying the race condition via somehow
real `AbstractInvokable` with `TaskMailbox`, but also thought that these two
components are a bit far away from `CheckpointBarrierHandler` and they are also
a bit heavy-weight components from themselves.
From the aspect of touching less external components in unit tests, i chose
the current way. Actually I bypassed the mailbox implementation and simulate
the race condition via executing the runnable in mis-order way. The propose for
introducing `ValidatingCheckpointInvokable` and
`ValidatingCheckpointBarrierUnaligner` is just for avoiding relying on external
components of `AbstractInvokable` and `TaskMailbox` in unit tests.
And this test is for verifying the processes of
`CheckpointBarrierUnaligner#processBarrier` and `#notifyBarrierReceived`, to
confirm the new introduced method `CheckpointBarrierUnaligner#notifyCheckpoint`
really effect in these interactions. All these three methods would be really
touched in this test.
From another aspect, for the interaction between two components it is better
to verify the real interactions using two real components without
re-implementing either sides. Then any internal core changes in either
component will be reflected in the test. For this case, actually the
`CheckpointBarrierUnaligner` component will interact with `AbstractInvokable`
with internal `TaskMailbox` model. `SteppingMailboxProcessor` is also a
re-implemented model to replace the real component inside `AbstractInvokable`,
so it somehow still relies on the private implementation inside
`SteppingMailboxProcessor`.
All in all, it might be better than my current way, and i can try out to use
the real model AMAP in this test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]