pnowojski commented on a change in pull request #12406:
URL: https://github.com/apache/flink/pull/12406#discussion_r432815265
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -639,4 +673,79 @@ public long getLastCanceledCheckpointId() {
return lastCanceledCheckpointId;
}
}
+
+ /**
+ * Specific {@link AbstractInvokable} implementation to record and
validate which checkpoint
+ * id is executed and how many checkpoints are executed.
+ */
+ private static final class ValidatingCheckpointInvokable extends
AbstractInvokable {
+
+ private long expectedCheckpointId;
+
+ private int totalNumCheckpoints;
+
+ ValidatingCheckpointInvokable() {
+ super(new DummyEnvironment("test", 1, 0));
+ }
+
+ public void invoke() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void triggerCheckpointOnBarrier(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) {
+ expectedCheckpointId =
checkpointMetaData.getCheckpointId();
+ totalNumCheckpoints++;
+ }
+
+ @Override
+ public <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ runnable.run();
+ }
+
+ long getTriggeredCheckpointId() {
+ return expectedCheckpointId;
+ }
+
+ int getTotalTriggeredCheckpoints() {
+ return totalNumCheckpoints;
+ }
+ }
+
+ /**
+ * Specific {@link CheckpointBarrierUnaligner} implementation to mock
the scenario that the later triggered
+ * checkpoint executes before the preceding triggered checkpoint.
+ */
+ private static final class ValidatingCheckpointBarrierUnaligner extends
CheckpointBarrierUnaligner {
+
+ private ThrowingRunnable waitingRunnable;
+ private boolean firstRunnable = true;
+
+ ValidatingCheckpointBarrierUnaligner(AbstractInvokable
invokable) {
+ super(
+ new int[]{1},
+ new ChannelStateWriter.NoOpChannelStateWriter(),
+ "test",
+ invokable);
+ }
+
+ @Override
+ protected <E extends Exception> void executeInTaskThread(
+ ThrowingRunnable<E> runnable,
+ String descriptionFormat,
+ Object... descriptionArgs) throws E {
+ if (firstRunnable) {
+ waitingRunnable = runnable;
+ firstRunnable = false;
+ } else {
+ super.executeInTaskThread(runnable,
"checkpoint");
+ super.executeInTaskThread(waitingRunnable,
"checkpoint");
+ }
+ }
+ }
Review comment:
I wasn't meaning to pass a real `AbstractInvocable`, but an equivalent
of `DummyInvokable` that has a `SteppingMailboxProcessor`
(`SteppingMailboxProcessor` already exists in the code base and I'm not even
sure if you could just use plain `MailboxProcessor`)
> For this case, actually the CheckpointBarrierUnaligner component will
interact with AbstractInvokable with internal TaskMailbox model.
SteppingMailboxProcessor is also a re-implemented model to replace the real
component inside AbstractInvokable, so it somehow still relies on the private
implementation inside SteppingMailboxProcessor.
It's not the same.
1. In your case, you are modifying the class that you are testing, which
invalidates the test to some extent. To some extent you are testing your test
implementation and you have to assume by looking at the code, that it doesn't
affect the purpose of the test.
2. The idea of mocking (for example passing `DummyInvokable` with
`SteppingMailboxProcessor`) is that you provide a mock implementation of some
interface, that adheres to the contract of those interfaces (maybe in
limited/restricted scope, but it shouldn't be braking them) and you pass them
to real production code that you intend to test.
For example imagine a change, if `CheckpointBarrierUnaligner` changes and
enqueues one more mail in one of the calls. Or that it starts relying on the
fact, that order of enqueued emails guarantees their execution order - both of
those refactorings would be valid but wouldn't work with your current
implementation, giving false failing test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]