This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3adc3eb2a6ece13f3befddd41f508d01bde067d7
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Jun 13 15:07:01 2019 +0200

    [hotfix][network] Make toNotifyOnCheckpoint field final in 
ChekpointBarrierHandlers
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 27 ++++----
 .../streaming/runtime/io/BarrierDiscarder.java     | 15 -----
 .../flink/streaming/runtime/io/BarrierTracker.java | 19 +++---
 .../runtime/io/CheckpointBarrierHandler.java       |  9 ---
 .../streaming/runtime/io/InputProcessorUtil.java   | 10 ++-
 .../io/BarrierBufferAlignmentLimitTest.java        | 18 ++++--
 .../runtime/io/BarrierBufferTestBase.java          | 73 ++++++++++------------
 .../streaming/runtime/io/BarrierTrackerTest.java   | 59 ++++++++---------
 .../runtime/io/CreditBasedBarrierBufferTest.java   |  7 ++-
 .../runtime/io/SpillingBarrierBufferTest.java      |  7 ++-
 10 files changed, 105 insertions(+), 139 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index ad62360..b2e6ea1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Optional;
@@ -86,8 +88,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         */
        private BufferOrEventSequence currentBuffered;
 
-       /** Handler that receives the checkpoint notifications. */
-       private AbstractInvokable toNotifyOnCheckpoint;
+       @Nullable
+       private final AbstractInvokable toNotifyOnCheckpoint;
 
        /** The ID of the checkpoint for which we expect barriers. */
        private long currentCheckpointId = -1L;
@@ -127,7 +129,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         */
        @VisibleForTesting
        BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-               this (inputGate, bufferStorage, -1, "Testing: No task 
associated");
+               this (inputGate, bufferStorage, -1, "Testing: No task 
associated", null);
        }
 
        /**
@@ -141,8 +143,14 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
         * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
         * @param taskName The task name for logging.
+        * @param toNotifyOnCheckpoint optional Handler that receives the 
checkpoint notifications.
         */
-       BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long 
maxBufferedBytes, String taskName) {
+       BarrierBuffer(
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       long maxBufferedBytes,
+                       String taskName,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
                checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
                this.inputGate = inputGate;
@@ -154,6 +162,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
 
                this.taskName = taskName;
+               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
        }
 
        @Override
@@ -452,16 +461,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        @Override
-       public void registerCheckpointEventHandler(AbstractInvokable 
toNotifyOnCheckpoint) {
-               if (this.toNotifyOnCheckpoint == null) {
-                       this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-               }
-               else {
-                       throw new IllegalStateException("BarrierBuffer already 
has a registered checkpoint notifyee");
-               }
-       }
-
-       @Override
        public boolean isEmpty() {
                return currentBuffered == null;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
index e8d9f34..c33c940 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -45,10 +44,6 @@ public class BarrierDiscarder implements 
CheckpointBarrierHandler {
         */
        private final int totalNumberOfInputChannels;
 
-
-       /** The listener to be notified on complete checkpoints. */
-       private AbstractInvokable toNotifyOnCheckpoint;
-
        // 
------------------------------------------------------------------------
 
        public BarrierDiscarder(InputGate inputGate) {
@@ -88,16 +83,6 @@ public class BarrierDiscarder implements 
CheckpointBarrierHandler {
        }
 
        @Override
-       public void registerCheckpointEventHandler(AbstractInvokable 
toNotifyOnCheckpoint) {
-               if (this.toNotifyOnCheckpoint == null) {
-                       this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-               }
-               else {
-                       throw new IllegalStateException("BarrierDiscarder 
already has a registered checkpoint notifyee");
-               }
-       }
-
-       @Override
        public void cleanup() {
 
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 49d2991..f7629bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -77,7 +79,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 
        /** The listener to be notified on complete checkpoints. */
-       private AbstractInvokable toNotifyOnCheckpoint;
+       private final AbstractInvokable toNotifyOnCheckpoint;
 
        /** The highest checkpoint ID encountered so far. */
        private long latestPendingCheckpointID = -1;
@@ -85,9 +87,14 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        // 
------------------------------------------------------------------------
 
        public BarrierTracker(InputGate inputGate) {
+               this(inputGate, null);
+       }
+
+       public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable 
toNotifyOnCheckpoint) {
                this.inputGate = inputGate;
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.pendingCheckpoints = new ArrayDeque<>();
+               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
        }
 
        @Override
@@ -127,16 +134,6 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        }
 
        @Override
-       public void registerCheckpointEventHandler(AbstractInvokable 
toNotifyOnCheckpoint) {
-               if (this.toNotifyOnCheckpoint == null) {
-                       this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-               }
-               else {
-                       throw new IllegalStateException("BarrierTracker already 
has a registered checkpoint notifyee");
-               }
-       }
-
-       @Override
        public void cleanup() {
                pendingCheckpoints.clear();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index faffd44..2ee1a97 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.AsyncDataInput;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import java.io.IOException;
 
@@ -32,14 +31,6 @@ import java.io.IOException;
  */
 @Internal
 public interface CheckpointBarrierHandler extends 
AsyncDataInput<BufferOrEvent> {
-
-       /**
-        * Registers the task be notified once all checkpoint barriers have 
been received for a checkpoint.
-        *
-        * @param task The task to notify
-        */
-       void registerCheckpointEventHandler(AbstractInvokable task);
-
        /**
         * Cleans up all internally held resources.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 289dd1a..ebef48f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -58,13 +58,15 @@ public class InputProcessorUtil {
                                        inputGate,
                                        new 
CachedBufferStorage(inputGate.getPageSize()),
                                        maxAlign,
-                                       taskName);
+                                       taskName,
+                                       checkpointedTask);
                        } else {
                                barrierHandler = new BarrierBuffer(
                                        inputGate,
                                        new BufferSpiller(ioManager, 
inputGate.getPageSize()),
                                        maxAlign,
-                                       taskName);
+                                       taskName,
+                                       checkpointedTask);
                        }
                } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        barrierHandler = new BarrierTracker(inputGate);
@@ -72,10 +74,6 @@ public class InputProcessorUtil {
                        throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
                }
 
-               if (checkpointedTask != null) {
-                       
barrierHandler.registerCheckpointEventHandler(checkpointedTask);
-               }
-
                return barrierHandler;
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 0a284e1..8c97938 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -115,10 +115,13 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing");
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               BarrierBuffer buffer = new BarrierBuffer(
+                       gate,
+                       new BufferSpiller(ioManager, gate.getPageSize()),
+                       1000,
+                       "Testing",
+                       toNotify);
 
                // validating the sequence of buffers
 
@@ -210,10 +213,13 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing");
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               BarrierBuffer buffer = new BarrierBuffer(
+                       gate,
+                       new BufferSpiller(ioManager, gate.getPageSize()),
+                       500,
+                       "Testing",
+                       toNotify);
 
                // validating the sequence of buffers
                long startTs;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 4bc05ff..908a199 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -41,6 +41,8 @@ import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
@@ -70,12 +72,23 @@ public abstract class BarrierBufferTestBase {
 
        BarrierBuffer buffer;
 
-       protected BarrierBuffer createBarrierBuffer(int numberOfChannels, 
BufferOrEvent[] sequence) throws IOException {
+       protected BarrierBuffer createBarrierBuffer(
+               int numberOfChannels,
+               BufferOrEvent[] sequence,
+               @Nullable AbstractInvokable toNotify) throws IOException {
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 
numberOfChannels, Arrays.asList(sequence));
-               return createBarrierBuffer(gate);
+               return createBarrierBuffer(gate, toNotify);
+       }
+
+       protected BarrierBuffer createBarrierBuffer(int numberOfChannels, 
BufferOrEvent[] sequence) throws IOException {
+               return createBarrierBuffer(numberOfChannels, sequence, null);
        }
 
-       abstract BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException;
+       protected BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException {
+               return createBarrierBuffer(gate, null);
+       }
+
+       abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) throws IOException;
 
        abstract void validateAlignmentBuffered(long actualBytesBuffered, 
BufferOrEvent... sequence);
 
@@ -147,10 +160,9 @@ public abstract class BarrierBufferTestBase {
                        createBarrier(4, 0), createBarrier(5, 0), 
createBarrier(6, 0),
                        createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(1, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(1, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                for (BufferOrEvent boe : sequence) {
@@ -198,10 +210,9 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0, PAGE_SIZE),
                        createEndOfPartition(0), createEndOfPartition(1), 
createEndOfPartition(2)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(3, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                // pre checkpoint 1
@@ -292,10 +303,9 @@ public abstract class BarrierBufferTestBase {
                        createBarrier(2, 2),
                        createBuffer(2, PAGE_SIZE), createEndOfPartition(2), 
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(3, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                // pre-checkpoint 1
@@ -368,10 +378,9 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
                        createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(3, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                // around checkpoint 1
@@ -461,10 +470,8 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
                        createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
@@ -548,10 +555,9 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
                        createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(3, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                // checkpoint 1
@@ -694,10 +700,9 @@ public abstract class BarrierBufferTestBase {
                        createBarrier(2, 2),
                        createBuffer(2, PAGE_SIZE), createEndOfPartition(2), 
createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer.registerCheckpointEventHandler(handler);
+               buffer = createBarrierBuffer(3, sequence, handler);
+
                handler.setNextExpectedCheckpointId(1L);
 
                // pre-checkpoint 1
@@ -841,10 +846,8 @@ public abstract class BarrierBufferTestBase {
                        createCancellationBarrier(6, 0),
                        createBuffer(0, PAGE_SIZE)
                };
-               buffer = createBarrierBuffer(1, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(1, sequence, toNotify);
 
                check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
@@ -902,10 +905,8 @@ public abstract class BarrierBufferTestBase {
 
                        /* 37 */ createBuffer(0, PAGE_SIZE)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
@@ -990,10 +991,8 @@ public abstract class BarrierBufferTestBase {
                                // some more buffers
                        /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, 
PAGE_SIZE), createBuffer(2, PAGE_SIZE)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
@@ -1074,10 +1073,8 @@ public abstract class BarrierBufferTestBase {
                                // some more buffers
                        /* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, 
PAGE_SIZE), createBuffer(2, PAGE_SIZE)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
@@ -1151,10 +1148,8 @@ public abstract class BarrierBufferTestBase {
                                // some more buffers
                        /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, 
PAGE_SIZE), createBuffer(2, PAGE_SIZE)
                };
-               buffer = createBarrierBuffer(3, sequence);
-
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer.registerCheckpointEventHandler(toNotify);
+               buffer = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 398a95a..cb58837 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
@@ -95,11 +97,9 @@ public class BarrierTrackerTest {
                                createBarrier(4, 0), createBarrier(5, 0), 
createBarrier(6, 0),
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(1, sequence);
-
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, 2, 3, 4, 5, 
6);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
+               tracker = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
@@ -119,11 +119,9 @@ public class BarrierTrackerTest {
                                createBarrier(7, 0), createBuffer(0), 
createBarrier(10, 0),
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(1, sequence);
-
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, 3, 4, 6, 7, 
10);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
+               tracker = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
@@ -152,11 +150,9 @@ public class BarrierTrackerTest {
 
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(3, sequence);
-
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, 2, 3, 4);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, 2, 3, 4);
+               tracker = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
@@ -189,11 +185,9 @@ public class BarrierTrackerTest {
 
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(3, sequence);
-
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, 2, 4);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, 2, 4);
+               tracker = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
@@ -265,11 +259,9 @@ public class BarrierTrackerTest {
                                // complete checkpoint 10
                                createBarrier(10, 0), createBarrier(10, 1),
                };
-               tracker = createBarrierTracker(3, sequence);
-
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(2, 3, 4, 5, 7, 
8, 9, 10);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 
10);
+               tracker = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
@@ -291,12 +283,10 @@ public class BarrierTrackerTest {
                                createCancellationBarrier(6, 0),
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(1, sequence);
-
                // negative values mean an expected cancellation call!
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, 2, -4, 5, 
-6);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+               tracker = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer()) {
@@ -342,12 +332,10 @@ public class BarrierTrackerTest {
 
                                createBuffer(0)
                };
-               tracker = createBarrierTracker(3, sequence);
-
                // negative values mean an expected cancellation call!
                CheckpointSequenceValidator validator =
-                               new CheckpointSequenceValidator(1, -2, 3, -4, 
5, -6);
-               tracker.registerCheckpointEventHandler(validator);
+                       new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+               tracker = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer()) {
@@ -371,11 +359,8 @@ public class BarrierTrackerTest {
                        createCancellationBarrier(2L, 2),
                        createBuffer(0)
                };
-               tracker = createBarrierTracker(3, sequence);
-
                AbstractInvokable statefulTask = mock(AbstractInvokable.class);
-
-               tracker.registerCheckpointEventHandler(statefulTask);
+               tracker = createBarrierTracker(3, sequence, statefulTask);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
@@ -390,10 +375,16 @@ public class BarrierTrackerTest {
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
-
        private static BarrierTracker createBarrierTracker(int 
numberOfChannels, BufferOrEvent[] sequence) {
+               return createBarrierTracker(numberOfChannels, sequence, null);
+       }
+
+       private static BarrierTracker createBarrierTracker(
+                       int numberOfChannels,
+                       BufferOrEvent[] sequence,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 
numberOfChannels, Arrays.asList(sequence));
-               return new BarrierTracker(gate);
+               return new BarrierTracker(gate, toNotifyOnCheckpoint);
        }
 
        private static BufferOrEvent createBarrier(long id, int channel) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index da88ffb..bbfe8b6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -20,8 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
 
 import static org.junit.Assert.assertEquals;
 
@@ -31,8 +32,8 @@ import static org.junit.Assert.assertEquals;
 public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
 
        @Override
-       public BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException {
-               return new BarrierBuffer(gate, new 
CachedBufferStorage(PAGE_SIZE));
+       BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) {
+               return new BarrierBuffer(gate, new 
CachedBufferStorage(PAGE_SIZE), -1, "Testing", toNotify);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
index 546fb62..2101f40 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
@@ -22,10 +22,13 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -64,8 +67,8 @@ public class SpillingBarrierBufferTest extends 
BarrierBufferTestBase {
        }
 
        @Override
-       public BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException{
-               return new BarrierBuffer(gate, new BufferSpiller(ioManager, 
PAGE_SIZE));
+       BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) throws IOException {
+               return new BarrierBuffer(gate, new BufferSpiller(ioManager, 
PAGE_SIZE), -1, "Testing", toNotify);
        }
 
        @Override

Reply via email to