This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3adc3eb2a6ece13f3befddd41f508d01bde067d7 Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Jun 13 15:07:01 2019 +0200 [hotfix][network] Make toNotifyOnCheckpoint field final in ChekpointBarrierHandlers --- .../flink/streaming/runtime/io/BarrierBuffer.java | 27 ++++---- .../streaming/runtime/io/BarrierDiscarder.java | 15 ----- .../flink/streaming/runtime/io/BarrierTracker.java | 19 +++--- .../runtime/io/CheckpointBarrierHandler.java | 9 --- .../streaming/runtime/io/InputProcessorUtil.java | 10 ++- .../io/BarrierBufferAlignmentLimitTest.java | 18 ++++-- .../runtime/io/BarrierBufferTestBase.java | 73 ++++++++++------------ .../streaming/runtime/io/BarrierTrackerTest.java | 59 ++++++++--------- .../runtime/io/CreditBasedBarrierBufferTest.java | 7 ++- .../runtime/io/SpillingBarrierBufferTest.java | 7 ++- 10 files changed, 105 insertions(+), 139 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index ad62360..b2e6ea1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Optional; @@ -86,8 +88,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { */ private BufferOrEventSequence currentBuffered; - /** Handler that receives the checkpoint notifications. */ - private AbstractInvokable toNotifyOnCheckpoint; + @Nullable + private final AbstractInvokable toNotifyOnCheckpoint; /** The ID of the checkpoint for which we expect barriers. */ private long currentCheckpointId = -1L; @@ -127,7 +129,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { */ @VisibleForTesting BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) { - this (inputGate, bufferStorage, -1, "Testing: No task associated"); + this (inputGate, bufferStorage, -1, "Testing: No task associated", null); } /** @@ -141,8 +143,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * @param bufferStorage The storage to hold the buffers and events for blocked channels. * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. * @param taskName The task name for logging. + * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications. */ - BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) { + BarrierBuffer( + InputGate inputGate, + BufferStorage bufferStorage, + long maxBufferedBytes, + String taskName, + @Nullable AbstractInvokable toNotifyOnCheckpoint) { checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); this.inputGate = inputGate; @@ -154,6 +162,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>(); this.taskName = taskName; + this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; } @Override @@ -452,16 +461,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } @Override - public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) { - if (this.toNotifyOnCheckpoint == null) { - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; - } - else { - throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee"); - } - } - - @Override public boolean isEmpty() { return currentBuffered == null; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java index e8d9f34..c33c940 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -45,10 +44,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler { */ private final int totalNumberOfInputChannels; - - /** The listener to be notified on complete checkpoints. */ - private AbstractInvokable toNotifyOnCheckpoint; - // ------------------------------------------------------------------------ public BarrierDiscarder(InputGate inputGate) { @@ -88,16 +83,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler { } @Override - public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) { - if (this.toNotifyOnCheckpoint == null) { - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; - } - else { - throw new IllegalStateException("BarrierDiscarder already has a registered checkpoint notifyee"); - } - } - - @Override public void cleanup() { } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 49d2991..f7629bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayDeque; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -77,7 +79,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints; /** The listener to be notified on complete checkpoints. */ - private AbstractInvokable toNotifyOnCheckpoint; + private final AbstractInvokable toNotifyOnCheckpoint; /** The highest checkpoint ID encountered so far. */ private long latestPendingCheckpointID = -1; @@ -85,9 +87,14 @@ public class BarrierTracker implements CheckpointBarrierHandler { // ------------------------------------------------------------------------ public BarrierTracker(InputGate inputGate) { + this(inputGate, null); + } + + public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) { this.inputGate = inputGate; this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.pendingCheckpoints = new ArrayDeque<>(); + this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; } @Override @@ -127,16 +134,6 @@ public class BarrierTracker implements CheckpointBarrierHandler { } @Override - public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) { - if (this.toNotifyOnCheckpoint == null) { - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; - } - else { - throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee"); - } - } - - @Override public void cleanup() { pendingCheckpoints.clear(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index faffd44..2ee1a97 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.io.AsyncDataInput; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import java.io.IOException; @@ -32,14 +31,6 @@ import java.io.IOException; */ @Internal public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> { - - /** - * Registers the task be notified once all checkpoint barriers have been received for a checkpoint. - * - * @param task The task to notify - */ - void registerCheckpointEventHandler(AbstractInvokable task); - /** * Cleans up all internally held resources. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 289dd1a..ebef48f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -58,13 +58,15 @@ public class InputProcessorUtil { inputGate, new CachedBufferStorage(inputGate.getPageSize()), maxAlign, - taskName); + taskName, + checkpointedTask); } else { barrierHandler = new BarrierBuffer( inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign, - taskName); + taskName, + checkpointedTask); } } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { barrierHandler = new BarrierTracker(inputGate); @@ -72,10 +74,6 @@ public class InputProcessorUtil { throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); } - if (checkpointedTask != null) { - barrierHandler.registerCheckpointEventHandler(checkpointedTask); - } - return barrierHandler; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 0a284e1..8c97938 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -115,10 +115,13 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing"); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + BarrierBuffer buffer = new BarrierBuffer( + gate, + new BufferSpiller(ioManager, gate.getPageSize()), + 1000, + "Testing", + toNotify); // validating the sequence of buffers @@ -210,10 +213,13 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing"); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + BarrierBuffer buffer = new BarrierBuffer( + gate, + new BufferSpiller(ioManager, gate.getPageSize()), + 500, + "Testing", + toNotify); // validating the sequence of buffers long startTs; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index 4bc05ff..908a199 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -41,6 +41,8 @@ import org.hamcrest.Description; import org.junit.After; import org.junit.Test; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Arrays; import java.util.Random; @@ -70,12 +72,23 @@ public abstract class BarrierBufferTestBase { BarrierBuffer buffer; - protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException { + protected BarrierBuffer createBarrierBuffer( + int numberOfChannels, + BufferOrEvent[] sequence, + @Nullable AbstractInvokable toNotify) throws IOException { MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence)); - return createBarrierBuffer(gate); + return createBarrierBuffer(gate, toNotify); + } + + protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException { + return createBarrierBuffer(numberOfChannels, sequence, null); } - abstract BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException; + protected BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException { + return createBarrierBuffer(gate, null); + } + + abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException; abstract void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence); @@ -147,10 +160,9 @@ public abstract class BarrierBufferTestBase { createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(1, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(1, sequence, handler); + handler.setNextExpectedCheckpointId(1L); for (BufferOrEvent boe : sequence) { @@ -198,10 +210,9 @@ public abstract class BarrierBufferTestBase { createBuffer(0, PAGE_SIZE), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2) }; - buffer = createBarrierBuffer(3, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(3, sequence, handler); + handler.setNextExpectedCheckpointId(1L); // pre checkpoint 1 @@ -292,10 +303,9 @@ public abstract class BarrierBufferTestBase { createBarrier(2, 2), createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(3, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(3, sequence, handler); + handler.setNextExpectedCheckpointId(1L); // pre-checkpoint 1 @@ -368,10 +378,9 @@ public abstract class BarrierBufferTestBase { createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(3, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(3, sequence, handler); + handler.setNextExpectedCheckpointId(1L); // around checkpoint 1 @@ -461,10 +470,8 @@ public abstract class BarrierBufferTestBase { createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(3, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(3, sequence, toNotify); long startTs; @@ -548,10 +555,9 @@ public abstract class BarrierBufferTestBase { createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(3, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(3, sequence, handler); + handler.setNextExpectedCheckpointId(1L); // checkpoint 1 @@ -694,10 +700,9 @@ public abstract class BarrierBufferTestBase { createBarrier(2, 2), createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) }; - buffer = createBarrierBuffer(3, sequence); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); + buffer = createBarrierBuffer(3, sequence, handler); + handler.setNextExpectedCheckpointId(1L); // pre-checkpoint 1 @@ -841,10 +846,8 @@ public abstract class BarrierBufferTestBase { createCancellationBarrier(6, 0), createBuffer(0, PAGE_SIZE) }; - buffer = createBarrierBuffer(1, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(1, sequence, toNotify); check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); @@ -902,10 +905,8 @@ public abstract class BarrierBufferTestBase { /* 37 */ createBuffer(0, PAGE_SIZE) }; - buffer = createBarrierBuffer(3, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(3, sequence, toNotify); long startTs; @@ -990,10 +991,8 @@ public abstract class BarrierBufferTestBase { // some more buffers /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) }; - buffer = createBarrierBuffer(3, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(3, sequence, toNotify); long startTs; @@ -1074,10 +1073,8 @@ public abstract class BarrierBufferTestBase { // some more buffers /* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) }; - buffer = createBarrierBuffer(3, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(3, sequence, toNotify); long startTs; @@ -1151,10 +1148,8 @@ public abstract class BarrierBufferTestBase { // some more buffers /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) }; - buffer = createBarrierBuffer(3, sequence); - AbstractInvokable toNotify = mock(AbstractInvokable.class); - buffer.registerCheckpointEventHandler(toNotify); + buffer = createBarrierBuffer(3, sequence, toNotify); long startTs; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 398a95a..cb58837 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -33,6 +33,8 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.junit.After; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -95,11 +97,9 @@ public class BarrierTrackerTest { createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0), createBuffer(0) }; - tracker = createBarrierTracker(1, sequence); - CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6); + tracker = createBarrierTracker(1, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { @@ -119,11 +119,9 @@ public class BarrierTrackerTest { createBarrier(7, 0), createBuffer(0), createBarrier(10, 0), createBuffer(0) }; - tracker = createBarrierTracker(1, sequence); - CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10); + tracker = createBarrierTracker(1, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { @@ -152,11 +150,9 @@ public class BarrierTrackerTest { createBuffer(0) }; - tracker = createBarrierTracker(3, sequence); - CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, 2, 3, 4); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, 2, 3, 4); + tracker = createBarrierTracker(3, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { @@ -189,11 +185,9 @@ public class BarrierTrackerTest { createBuffer(0) }; - tracker = createBarrierTracker(3, sequence); - CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, 2, 4); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, 2, 4); + tracker = createBarrierTracker(3, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { @@ -265,11 +259,9 @@ public class BarrierTrackerTest { // complete checkpoint 10 createBarrier(10, 0), createBarrier(10, 1), }; - tracker = createBarrierTracker(3, sequence); - CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10); + tracker = createBarrierTracker(3, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { @@ -291,12 +283,10 @@ public class BarrierTrackerTest { createCancellationBarrier(6, 0), createBuffer(0) }; - tracker = createBarrierTracker(1, sequence); - // negative values mean an expected cancellation call! CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, 2, -4, 5, -6); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, 2, -4, 5, -6); + tracker = createBarrierTracker(1, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer()) { @@ -342,12 +332,10 @@ public class BarrierTrackerTest { createBuffer(0) }; - tracker = createBarrierTracker(3, sequence); - // negative values mean an expected cancellation call! CheckpointSequenceValidator validator = - new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6); - tracker.registerCheckpointEventHandler(validator); + new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6); + tracker = createBarrierTracker(3, sequence, validator); for (BufferOrEvent boe : sequence) { if (boe.isBuffer()) { @@ -371,11 +359,8 @@ public class BarrierTrackerTest { createCancellationBarrier(2L, 2), createBuffer(0) }; - tracker = createBarrierTracker(3, sequence); - AbstractInvokable statefulTask = mock(AbstractInvokable.class); - - tracker.registerCheckpointEventHandler(statefulTask); + tracker = createBarrierTracker(3, sequence, statefulTask); for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) { @@ -390,10 +375,16 @@ public class BarrierTrackerTest { // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ - private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) { + return createBarrierTracker(numberOfChannels, sequence, null); + } + + private static BarrierTracker createBarrierTracker( + int numberOfChannels, + BufferOrEvent[] sequence, + @Nullable AbstractInvokable toNotifyOnCheckpoint) { MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence)); - return new BarrierTracker(gate); + return new BarrierTracker(gate, toNotifyOnCheckpoint); } private static BufferOrEvent createBarrier(long id, int channel) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java index da88ffb..bbfe8b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java @@ -20,8 +20,9 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import java.io.IOException; +import javax.annotation.Nullable; import static org.junit.Assert.assertEquals; @@ -31,8 +32,8 @@ import static org.junit.Assert.assertEquals; public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase { @Override - public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException { - return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE)); + BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) { + return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), -1, "Testing", toNotify); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java index 546fb62..2101f40 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java @@ -22,10 +22,13 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.junit.AfterClass; import org.junit.BeforeClass; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; @@ -64,8 +67,8 @@ public class SpillingBarrierBufferTest extends BarrierBufferTestBase { } @Override - public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException{ - return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE)); + BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException { + return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), -1, "Testing", toNotify); } @Override
