This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 663b40d320baf94a5f81a30082240afab66ca085 Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Wed May 29 22:33:56 2019 +0800 [FLINK-12603][network] Remove getOwningTaskName method from InputGate In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it. InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could also be generated in StreamTask via Environment#getTaskInfo and Environment#getExecutionId. Then it could be passed into the constructors of BarrierBuffer/StreamInputProcessor for use. This closes #8529. --- .../io/network/partition/consumer/InputGate.java | 2 - .../partition/consumer/SingleInputGate.java | 5 -- .../network/partition/consumer/UnionInputGate.java | 6 --- .../runtime/taskmanager/InputGateWithMetrics.java | 5 -- .../flink/streaming/runtime/io/BarrierBuffer.java | 54 +++++++++------------- .../streaming/runtime/io/InputProcessorUtil.java | 15 ++++-- .../streaming/runtime/io/StreamInputProcessor.java | 10 +++- .../runtime/io/StreamTwoInputProcessor.java | 10 +++- .../runtime/tasks/OneInputStreamTask.java | 3 +- .../flink/streaming/runtime/tasks/StreamTask.java | 10 ++++ .../runtime/tasks/TwoInputStreamTask.java | 3 +- .../io/BarrierBufferAlignmentLimitTest.java | 4 +- .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ----- .../flink/streaming/runtime/io/MockInputGate.java | 12 ----- 14 files changed, 67 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 7b87d32..1c8300c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -76,8 +76,6 @@ public abstract class InputGate implements AutoCloseable { public abstract int getNumberOfInputChannels(); - public abstract String getOwningTaskName(); - public abstract boolean isFinished(); public abstract void requestPartitions() throws IOException, InterruptedException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 4e718d7..8360e5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -273,11 +273,6 @@ public class SingleInputGate extends InputGate { return 0; } - @Override - public String getOwningTaskName() { - return owningTaskName; - } - public CompletableFuture<Void> getCloseFuture() { return closeFuture; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 9777f78..8b4da35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -137,12 +137,6 @@ public class UnionInputGate extends InputGate { } @Override - public String getOwningTaskName() { - // all input gates have the same owning task - return inputGates[0].getOwningTaskName(); - } - - @Override public boolean isFinished() { for (InputGate inputGate : inputGates) { if (!inputGate.isFinished()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index e9e3038..631583b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -56,11 +56,6 @@ public class InputGateWithMetrics extends InputGate { } @Override - public String getOwningTaskName() { - return inputGate.getOwningTaskName(); - } - - @Override public boolean isFinished() { return inputGate.isFinished(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index cbeb4ba..41a6eb5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; @@ -79,6 +80,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { */ private final long maxBufferedBytes; + private final String taskName; + /** * The sequence of buffers/events that has been unblocked and must now be consumed before * requesting further data from the input gate. @@ -119,11 +122,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * * @param inputGate The input gate to draw the buffers and events from. * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. - * - * @throws IOException Thrown, when the spilling to temp files cannot be initialized. */ - public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IOException { - this (inputGate, bufferBlocker, -1); + @VisibleForTesting + BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) { + this (inputGate, bufferBlocker, -1, "Testing: No task associated"); } /** @@ -136,11 +138,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * @param inputGate The input gate to draw the buffers and events from. * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. - * - * @throws IOException Thrown, when the spilling to temp files cannot be initialized. + * @param taskName The task name for logging. */ - public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes) - throws IOException { + BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) { checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); this.inputGate = inputGate; @@ -150,6 +150,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.bufferBlocker = checkNotNull(bufferBlocker); this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>(); + + this.taskName = taskName; } // ------------------------------------------------------------------------ @@ -213,7 +215,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } private void completeBufferedSequence() throws IOException { - LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName()); + LOG.debug("{}: Finished feeding back buffered data.", taskName); currentBuffered.cleanup(); currentBuffered = queuedBuffered.pollFirst(); @@ -249,7 +251,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // we did not complete the current checkpoint, another started before LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", - inputGate.getOwningTaskName(), + taskName, barrierId, currentCheckpointId); @@ -283,7 +285,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", - inputGate.getOwningTaskName(), + taskName, receivedBarrier.getId(), receivedBarrier.getTimestamp()); } @@ -314,9 +316,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (barrierId == currentCheckpointId) { // cancel this alignment if (LOG.isDebugEnabled()) { - LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", - inputGate.getOwningTaskName(), - barrierId); + LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId); } releaseBlocksAndResetBarriers(); @@ -326,7 +326,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // we canceled the next which also cancels the current LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", - inputGate.getOwningTaskName(), + taskName, barrierId, currentCheckpointId); @@ -357,9 +357,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { latestAlignmentDurationNanos = 0L; if (LOG.isDebugEnabled()) { - LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", - inputGate.getOwningTaskName(), - barrierId); + LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", taskName, barrierId); } notifyAbortOnCancellationBarrier(barrierId); @@ -414,7 +412,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) { // exceeded our limit - abort this checkpoint LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", - inputGate.getOwningTaskName(), + taskName, currentCheckpointId, maxBufferedBytes); @@ -458,9 +456,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { - LOG.debug("{}: Starting stream alignment for checkpoint {}.", - inputGate.getOwningTaskName(), - checkpointId); + LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId); } } @@ -486,9 +482,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { numBarriersReceived++; if (LOG.isDebugEnabled()) { - LOG.debug("{}: Received barrier from channel {}.", - inputGate.getOwningTaskName(), - channelIndex); + LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex); } } else { @@ -501,8 +495,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * Makes sure the just written data is the next to be consumed. */ private void releaseBlocksAndResetBarriers() throws IOException { - LOG.debug("{}: End of stream alignment, feeding buffered data back.", - inputGate.getOwningTaskName()); + LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; @@ -519,8 +512,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // uncommon case: buffered data pending // push back the pending data, if we have any LOG.debug("{}: Checkpoint skipped via buffered data:" + - "Pushing back current alignment buffers and feeding back new alignment data first.", - inputGate.getOwningTaskName()); + "Pushing back current alignment buffers and feeding back new alignment data first.", taskName); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); @@ -534,7 +526,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (LOG.isDebugEnabled()) { LOG.debug("{}: Size of buffered data: {} bytes", - inputGate.getOwningTaskName(), + taskName, currentBuffered == null ? 0L : currentBuffered.size()); } @@ -577,7 +569,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public String toString() { return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", - inputGate.getOwningTaskName(), + taskName, currentCheckpointId, numBarriersReceived, numClosedChannels); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index b420781..da401a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -41,7 +41,8 @@ public class InputProcessorUtil { CheckpointingMode checkpointMode, IOManager ioManager, InputGate inputGate, - Configuration taskManagerConfig) throws IOException { + Configuration taskManagerConfig, + String taskName) throws IOException { CheckpointBarrierHandler barrierHandler; if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { @@ -53,9 +54,17 @@ public class InputProcessorUtil { } if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) { - barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign); + barrierHandler = new BarrierBuffer( + inputGate, + new CachedBufferBlocker(inputGate.getPageSize()), + maxAlign, + taskName); } else { - barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign); + barrierHandler = new BarrierBuffer( + inputGate, + new BufferSpiller(ioManager, inputGate.getPageSize()), + maxAlign, + taskName); } } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { barrierHandler = new BarrierTracker(inputGate); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index a9c64b5..8420ae0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -121,12 +121,18 @@ public class StreamInputProcessor<IN> { StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> streamOperator, TaskIOMetricGroup metrics, - WatermarkGauge watermarkGauge) throws IOException { + WatermarkGauge watermarkGauge, + String taskName) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( - checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig); + checkpointedTask, + checkpointMode, + ioManager, + inputGate, + taskManagerConfig, + taskName); this.lock = checkNotNull(lock); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index ab4f90d..e8c9c2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -140,12 +140,18 @@ public class StreamTwoInputProcessor<IN1, IN2> { TwoInputStreamOperator<IN1, IN2, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge input1WatermarkGauge, - WatermarkGauge input2WatermarkGauge) throws IOException { + WatermarkGauge input2WatermarkGauge, + String taskName) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( - checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig); + checkpointedTask, + checkpointMode, + ioManager, + inputGate, + taskManagerConfig, + taskName); this.lock = checkNotNull(lock); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 7b82d8f..76091ff 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -88,7 +88,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO getStreamStatusMaintainer(), this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), - inputWatermarkGauge); + inputWatermarkGauge, + getTaskNameWithSubtaskAndId()); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); // wrap watermark gauge since registered metrics must be unique diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2df565d..3927e46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -600,6 +600,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } /** + * Gets the name of the task, appended with the subtask indicator and execution id. + * + * @return The name of the task, with subtask indicator and execution id. + */ + String getTaskNameWithSubtaskAndId() { + return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + + " (" + getEnvironment().getExecutionId() + ')'; + } + + /** * Gets the lock object on which all operations that involve data and state mutation have to lock. * @return The checkpoint lock object. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 934f2cb..2092c45 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -98,7 +98,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), input1WatermarkGauge, - input2WatermarkGauge); + input2WatermarkGauge, + getTaskNameWithSubtaskAndId()); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 1c7ff35..91f2be4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -115,7 +115,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000); + BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing"); AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer.registerCheckpointEventHandler(toNotify); @@ -209,7 +209,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500); + BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing"); AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer.registerCheckpointEventHandler(toNotify); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 4284074..9d0d705 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -138,18 +138,11 @@ public class BarrierBufferMassiveRandomTest { private int currentChannel = 0; private long c = 0; - private final String owningTaskName; - public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) { - this(bufferPools, barrierGens, "TestTask"); - } - - public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) { this.numberOfChannels = bufferPools.length; this.currentBarriers = new int[numberOfChannels]; this.bufferPools = bufferPools; this.barrierGens = barrierGens; - this.owningTaskName = owningTaskName; this.isAvailable = AVAILABLE; } @@ -159,11 +152,6 @@ public class BarrierBufferMassiveRandomTest { } @Override - public String getOwningTaskName() { - return owningTaskName; - } - - @Override public boolean isFinished() { return false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index b37dee4..1a4c5b7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -43,18 +43,11 @@ public class MockInputGate extends InputGate { private int closedChannels; - private final String owningTaskName; - public MockInputGate(int pageSize, int numberOfChannels, List<BufferOrEvent> bufferOrEvents) { - this(pageSize, numberOfChannels, bufferOrEvents, "MockTask"); - } - - public MockInputGate(int pageSize, int numberOfChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) { this.pageSize = pageSize; this.numberOfChannels = numberOfChannels; this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numberOfChannels]; - this.owningTaskName = owningTaskName; isAvailable = AVAILABLE; } @@ -74,11 +67,6 @@ public class MockInputGate extends InputGate { } @Override - public String getOwningTaskName() { - return owningTaskName; - } - - @Override public boolean isFinished() { return bufferOrEvents.isEmpty(); }