This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 663b40d320baf94a5f81a30082240afab66ca085
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Wed May 29 22:33:56 2019 +0800

    [FLINK-12603][network] Remove getOwningTaskName method from InputGate
    
    In order to make abstract InputGate simple for extending new 
implementations in shuffle service architecture, we could remove unnecessary 
methods from it.
    InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer 
and StreamInputProcessor. This task name could also be generated in StreamTask
    via Environment#getTaskInfo and Environment#getExecutionId. Then it could 
be passed into the constructors of BarrierBuffer/StreamInputProcessor for use.
    
    This closes #8529.
---
 .../io/network/partition/consumer/InputGate.java   |  2 -
 .../partition/consumer/SingleInputGate.java        |  5 --
 .../network/partition/consumer/UnionInputGate.java |  6 ---
 .../runtime/taskmanager/InputGateWithMetrics.java  |  5 --
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 54 +++++++++-------------
 .../streaming/runtime/io/InputProcessorUtil.java   | 15 ++++--
 .../streaming/runtime/io/StreamInputProcessor.java | 10 +++-
 .../runtime/io/StreamTwoInputProcessor.java        | 10 +++-
 .../runtime/tasks/OneInputStreamTask.java          |  3 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 10 ++++
 .../runtime/tasks/TwoInputStreamTask.java          |  3 +-
 .../io/BarrierBufferAlignmentLimitTest.java        |  4 +-
 .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 -----
 .../flink/streaming/runtime/io/MockInputGate.java  | 12 -----
 14 files changed, 67 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 7b87d32..1c8300c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -76,8 +76,6 @@ public abstract class InputGate implements AutoCloseable {
 
        public abstract int getNumberOfInputChannels();
 
-       public abstract String getOwningTaskName();
-
        public abstract boolean isFinished();
 
        public abstract void requestPartitions() throws IOException, 
InterruptedException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 4e718d7..8360e5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -273,11 +273,6 @@ public class SingleInputGate extends InputGate {
                return 0;
        }
 
-       @Override
-       public String getOwningTaskName() {
-               return owningTaskName;
-       }
-
        public CompletableFuture<Void> getCloseFuture() {
                return closeFuture;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 9777f78..8b4da35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -137,12 +137,6 @@ public class UnionInputGate extends InputGate {
        }
 
        @Override
-       public String getOwningTaskName() {
-               // all input gates have the same owning task
-               return inputGates[0].getOwningTaskName();
-       }
-
-       @Override
        public boolean isFinished() {
                for (InputGate inputGate : inputGates) {
                        if (!inputGate.isFinished()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index e9e3038..631583b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -56,11 +56,6 @@ public class InputGateWithMetrics extends InputGate {
        }
 
        @Override
-       public String getOwningTaskName() {
-               return inputGate.getOwningTaskName();
-       }
-
-       @Override
        public boolean isFinished() {
                return inputGate.isFinished();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index cbeb4ba..41a6eb5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
@@ -79,6 +80,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         */
        private final long maxBufferedBytes;
 
+       private final String taskName;
+
        /**
         * The sequence of buffers/events that has been unblocked and must now 
be consumed before
         * requesting further data from the input gate.
@@ -119,11 +122,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         *
         * @param inputGate The input gate to draw the buffers and events from.
         * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
-        *
-        * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
         */
-       public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) 
throws IOException {
-               this (inputGate, bufferBlocker, -1);
+       @VisibleForTesting
+       BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
+               this (inputGate, bufferBlocker, -1, "Testing: No task 
associated");
        }
 
        /**
@@ -136,11 +138,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * @param inputGate The input gate to draw the buffers and events from.
         * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
         * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
-        *
-        * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
+        * @param taskName The task name for logging.
         */
-       public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, 
long maxBufferedBytes)
-                       throws IOException {
+       BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long 
maxBufferedBytes, String taskName) {
                checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
                this.inputGate = inputGate;
@@ -150,6 +150,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                this.bufferBlocker = checkNotNull(bufferBlocker);
                this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
+
+               this.taskName = taskName;
        }
 
        // 
------------------------------------------------------------------------
@@ -213,7 +215,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        private void completeBufferedSequence() throws IOException {
-               LOG.debug("{}: Finished feeding back buffered data.", 
inputGate.getOwningTaskName());
+               LOG.debug("{}: Finished feeding back buffered data.", taskName);
 
                currentBuffered.cleanup();
                currentBuffered = queuedBuffered.pollFirst();
@@ -249,7 +251,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                // we did not complete the current checkpoint, 
another started before
                                LOG.warn("{}: Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                                "Skipping current checkpoint.",
-                                       inputGate.getOwningTaskName(),
+                                       taskName,
                                        barrierId,
                                        currentCheckpointId);
 
@@ -283,7 +285,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        // actually trigger checkpoint
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("{}: Received all barriers, 
triggering checkpoint {} at {}.",
-                                       inputGate.getOwningTaskName(),
+                                       taskName,
                                        receivedBarrier.getId(),
                                        receivedBarrier.getTimestamp());
                        }
@@ -314,9 +316,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (barrierId == currentCheckpointId) {
                                // cancel this alignment
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("{}: Checkpoint {} canceled, 
aborting alignment.",
-                                               inputGate.getOwningTaskName(),
-                                               barrierId);
+                                       LOG.debug("{}: Checkpoint {} canceled, 
aborting alignment.", taskName, barrierId);
                                }
 
                                releaseBlocksAndResetBarriers();
@@ -326,7 +326,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                // we canceled the next which also cancels the 
current
                                LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                                "Skipping current checkpoint.",
-                                       inputGate.getOwningTaskName(),
+                                       taskName,
                                        barrierId,
                                        currentCheckpointId);
 
@@ -357,9 +357,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        latestAlignmentDurationNanos = 0L;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Checkpoint {} canceled, skipping 
alignment.",
-                                       inputGate.getOwningTaskName(),
-                                       barrierId);
+                               LOG.debug("{}: Checkpoint {} canceled, skipping 
alignment.", taskName, barrierId);
                        }
 
                        notifyAbortOnCancellationBarrier(barrierId);
@@ -414,7 +412,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
                        // exceeded our limit - abort this checkpoint
                        LOG.info("{}: Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded.",
-                               inputGate.getOwningTaskName(),
+                               taskName,
                                currentCheckpointId,
                                maxBufferedBytes);
 
@@ -458,9 +456,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                startOfAlignmentTimestamp = System.nanoTime();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.",
-                               inputGate.getOwningTaskName(),
-                               checkpointId);
+                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.", taskName, checkpointId);
                }
        }
 
@@ -486,9 +482,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        numBarriersReceived++;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Received barrier from channel 
{}.",
-                                       inputGate.getOwningTaskName(),
-                                       channelIndex);
+                               LOG.debug("{}: Received barrier from channel 
{}.", taskName, channelIndex);
                        }
                }
                else {
@@ -501,8 +495,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * Makes sure the just written data is the next to be consumed.
         */
        private void releaseBlocksAndResetBarriers() throws IOException {
-               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.",
-                       inputGate.getOwningTaskName());
+               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.", taskName);
 
                for (int i = 0; i < blockedChannels.length; i++) {
                        blockedChannels[i] = false;
@@ -519,8 +512,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        // uncommon case: buffered data pending
                        // push back the pending data, if we have any
                        LOG.debug("{}: Checkpoint skipped via buffered data:" +
-                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.",
-                               inputGate.getOwningTaskName());
+                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.", taskName);
 
                        // since we did not fully drain the previous sequence, 
we need to allocate a new buffer for this one
                        BufferOrEventSequence bufferedNow = 
bufferBlocker.rollOverWithoutReusingResources();
@@ -534,7 +526,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("{}: Size of buffered data: {} bytes",
-                               inputGate.getOwningTaskName(),
+                               taskName,
                                currentBuffered == null ? 0L : 
currentBuffered.size());
                }
 
@@ -577,7 +569,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        @Override
        public String toString() {
                return String.format("%s: last checkpoint: %d, current 
barriers: %d, closed channels: %d",
-                       inputGate.getOwningTaskName(),
+                       taskName,
                        currentCheckpointId,
                        numBarriersReceived,
                        numClosedChannels);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index b420781..da401a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -41,7 +41,8 @@ public class InputProcessorUtil {
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
                        InputGate inputGate,
-                       Configuration taskManagerConfig) throws IOException {
+                       Configuration taskManagerConfig,
+                       String taskName) throws IOException {
 
                CheckpointBarrierHandler barrierHandler;
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
@@ -53,9 +54,17 @@ public class InputProcessorUtil {
                        }
 
                        if 
(taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
-                               barrierHandler = new BarrierBuffer(inputGate, 
new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
+                               barrierHandler = new BarrierBuffer(
+                                       inputGate,
+                                       new 
CachedBufferBlocker(inputGate.getPageSize()),
+                                       maxAlign,
+                                       taskName);
                        } else {
-                               barrierHandler = new BarrierBuffer(inputGate, 
new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
+                               barrierHandler = new BarrierBuffer(
+                                       inputGate,
+                                       new BufferSpiller(ioManager, 
inputGate.getPageSize()),
+                                       maxAlign,
+                                       taskName);
                        }
                } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        barrierHandler = new BarrierTracker(inputGate);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index a9c64b5..8420ae0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -121,12 +121,18 @@ public class StreamInputProcessor<IN> {
                        StreamStatusMaintainer streamStatusMaintainer,
                        OneInputStreamOperator<IN, ?> streamOperator,
                        TaskIOMetricGroup metrics,
-                       WatermarkGauge watermarkGauge) throws IOException {
+                       WatermarkGauge watermarkGauge,
+                       String taskName) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
                this.barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
-                       checkpointedTask, checkpointMode, ioManager, inputGate, 
taskManagerConfig);
+                       checkpointedTask,
+                       checkpointMode,
+                       ioManager,
+                       inputGate,
+                       taskManagerConfig,
+                       taskName);
 
                this.lock = checkNotNull(lock);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ab4f90d..e8c9c2a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -140,12 +140,18 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
                        TaskIOMetricGroup metrics,
                        WatermarkGauge input1WatermarkGauge,
-                       WatermarkGauge input2WatermarkGauge) throws IOException 
{
+                       WatermarkGauge input2WatermarkGauge,
+                       String taskName) throws IOException {
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
                this.barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
-                       checkpointedTask, checkpointMode, ioManager, inputGate, 
taskManagerConfig);
+                       checkpointedTask,
+                       checkpointMode,
+                       ioManager,
+                       inputGate,
+                       taskManagerConfig,
+                       taskName);
 
                this.lock = checkNotNull(lock);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 7b82d8f..76091ff 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -88,7 +88,8 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        getStreamStatusMaintainer(),
                                        this.headOperator,
                                        
getEnvironment().getMetricGroup().getIOMetricGroup(),
-                                       inputWatermarkGauge);
+                                       inputWatermarkGauge,
+                                       getTaskNameWithSubtaskAndId());
                }
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
                // wrap watermark gauge since registered metrics must be unique
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2df565d..3927e46 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -600,6 +600,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        /**
+        * Gets the name of the task, appended with the subtask indicator and 
execution id.
+        *
+        * @return The name of the task, with subtask indicator and execution 
id.
+        */
+       String getTaskNameWithSubtaskAndId() {
+               return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() 
+
+                       " (" + getEnvironment().getExecutionId() + ')';
+       }
+
+       /**
         * Gets the lock object on which all operations that involve data and 
state mutation have to lock.
         * @return The checkpoint lock object.
         */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 934f2cb..2092c45 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -98,7 +98,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                this.headOperator,
                                
getEnvironment().getMetricGroup().getIOMetricGroup(),
                                input1WatermarkGauge,
-                               input2WatermarkGauge);
+                               input2WatermarkGauge,
+                               getTaskNameWithSubtaskAndId());
 
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 1c7ff35..91f2be4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -115,7 +115,7 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 1000);
+               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing");
 
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
                buffer.registerCheckpointEventHandler(toNotify);
@@ -209,7 +209,7 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 500);
+               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing");
 
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
                buffer.registerCheckpointEventHandler(toNotify);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 4284074..9d0d705 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -138,18 +138,11 @@ public class BarrierBufferMassiveRandomTest {
                private int currentChannel = 0;
                private long c = 0;
 
-               private final String owningTaskName;
-
                public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens) {
-                       this(bufferPools, barrierGens, "TestTask");
-               }
-
-               public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens, String owningTaskName) {
                        this.numberOfChannels = bufferPools.length;
                        this.currentBarriers = new int[numberOfChannels];
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
-                       this.owningTaskName = owningTaskName;
                        this.isAvailable = AVAILABLE;
                }
 
@@ -159,11 +152,6 @@ public class BarrierBufferMassiveRandomTest {
                }
 
                @Override
-               public String getOwningTaskName() {
-                       return owningTaskName;
-               }
-
-               @Override
                public boolean isFinished() {
                        return false;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index b37dee4..1a4c5b7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -43,18 +43,11 @@ public class MockInputGate extends InputGate {
 
        private int closedChannels;
 
-       private final String owningTaskName;
-
        public MockInputGate(int pageSize, int numberOfChannels, 
List<BufferOrEvent> bufferOrEvents) {
-               this(pageSize, numberOfChannels, bufferOrEvents, "MockTask");
-       }
-
-       public MockInputGate(int pageSize, int numberOfChannels, 
List<BufferOrEvent> bufferOrEvents, String owningTaskName) {
                this.pageSize = pageSize;
                this.numberOfChannels = numberOfChannels;
                this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numberOfChannels];
-               this.owningTaskName = owningTaskName;
 
                isAvailable = AVAILABLE;
        }
@@ -74,11 +67,6 @@ public class MockInputGate extends InputGate {
        }
 
        @Override
-       public String getOwningTaskName() {
-               return owningTaskName;
-       }
-
-       @Override
        public boolean isFinished() {
                return bufferOrEvents.isEmpty();
        }

Reply via email to