This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a5e1d407c6f8694f0b61febc651a603b175c0fe7 Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 11:29:15 2018 +0200 [hotfix][checkstyle] Remove suppression for runtime/network.partition --- .../partition/PartitionNotFoundException.java | 3 +++ .../network/partition/PipelinedSubpartition.java | 6 ++--- .../io/network/partition/ResultPartition.java | 26 +++++++++++----------- .../ResultPartitionConsumableNotifier.java | 4 +++- .../io/network/partition/ResultPartitionID.java | 2 +- .../network/partition/ResultPartitionProvider.java | 3 +++ .../io/network/partition/ResultPartitionType.java | 7 ++++-- .../io/network/partition/ResultSubpartition.java | 21 +++++++++-------- .../network/partition/ResultSubpartitionView.java | 1 + .../network/partition/consumer/InputChannel.java | 18 +++++++-------- .../network/partition/consumer/InputChannelID.java | 3 +++ .../io/network/partition/consumer/InputGate.java | 8 +++---- .../partition/consumer/InputGateMetrics.java | 2 +- .../partition/consumer/LocalInputChannel.java | 4 ++-- .../partition/consumer/UnknownInputChannel.java | 6 ++--- .../network/partition/InputChannelTestUtils.java | 4 ++-- .../network/partition/InputGateConcurrentTest.java | 7 ++++-- .../network/partition/InputGateFairnessTest.java | 16 +++++++------ .../LegacyPartialConsumePipelinedResultTest.java | 13 ++++++----- .../PartialConsumePipelinedResultTest.java | 7 ++++-- .../partition/PipelinedSubpartitionTest.java | 7 ++++-- .../partition/ProducerFailedExceptionTest.java | 3 +++ .../partition/consumer/InputChannelTest.java | 4 ++++ .../IteratorWrappingTestSingleInputGate.java | 5 +++++ .../partition/consumer/LocalInputChannelTest.java | 19 ++++++++++------ .../partition/consumer/RemoteInputChannelTest.java | 8 ++++--- .../partition/consumer/SingleInputGateTest.java | 2 +- .../partition/consumer/TestSingleInputGate.java | 1 + .../partition/consumer/UnionInputGateTest.java | 5 ++++- tools/maven/suppressions-runtime.xml | 4 ++-- 30 files changed, 135 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java index 7479862..2f78816 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition; import java.io.IOException; +/** + * Exception for failed partition requests due to non-existing partitions. + */ public class PartitionNotFoundException extends IOException { private static final long serialVersionUID = 0L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index c6f3e15..91e0d4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -188,17 +188,17 @@ class PipelinedSubpartition extends ResultSubpartition { buffer, isAvailableUnsafe(), getBuffersInBacklog(), - _nextBufferIsEvent()); + nextBufferIsEventUnsafe()); } } boolean nextBufferIsEvent() { synchronized (buffers) { - return _nextBufferIsEvent(); + return nextBufferIsEventUnsafe(); } } - private boolean _nextBufferIsEvent() { + private boolean nextBufferIsEventUnsafe() { assert Thread.holdsLock(buffers); return !buffers.isEmpty() && !buffers.peekFirst().isBuffer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 93e5ba1..b32f73f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -48,17 +48,17 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A result partition for data produced by a single task. * - * <p> This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, + * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one * or more {@link ResultSubpartition} instances, which further partition the data depending on the * number of consuming tasks and the data {@link DistributionPattern}. * - * <p> Tasks, which consume a result partition have to request one of its subpartitions. The request + * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) * * <h2>Life-cycle</h2> * - * The life-cycle of each result partition has three (possibly overlapping) phases: + * <p>The life-cycle of each result partition has three (possibly overlapping) phases: * <ol> * <li><strong>Produce</strong>: </li> * <li><strong>Consume</strong>: </li> @@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkState; * * <h2>Lazy deployment and updates of consuming tasks</h2> * - * Before a consuming task can request the result, it has to be deployed. The time of deployment + * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined * results, receivers are deployed as soon as the first buffer is added to the result partition. * With blocking results on the other hand, receivers are deployed after the partition is finished. @@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkState; public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); - + private final String owningTaskName; private final TaskActions taskActions; @@ -174,10 +174,10 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { /** * Registers a buffer pool with this result partition. - * <p> - * There is one pool for each result partition, which is shared by all its sub partitions. - * <p> - * The pool is registered with the partition *after* it as been constructed in order to conform + * + * <p>There is one pool for each result partition, which is shared by all its sub partitions. + * + * <p>The pool is registered with the partition *after* it as been constructed in order to conform * to the life-cycle of task registrations in the {@link TaskManager}. */ public void registerBufferPool(BufferPool bufferPool) { @@ -276,9 +276,9 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { /** * Finishes the result partition. * - * <p> After this operation, it is not possible to add further data to the result partition. + * <p>After this operation, it is not possible to add further data to the result partition. * - * <p> For BLOCKING results, this will trigger the deployment of consuming tasks. + * <p>For BLOCKING results, this will trigger the deployment of consuming tasks. */ public void finish() throws IOException { boolean success = false; @@ -366,7 +366,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { /** * Releases buffers held by this result partition. * - * <p> This is a callback from the buffer pool, which is registered for result partitions, which + * <p>This is a callback from the buffer pool, which is registered for result partitions, which * are back pressure-free. */ @Override @@ -395,7 +395,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { /** * Pins the result partition. * - * <p> The partition can only be released after each subpartition has been consumed once per pin + * <p>The partition can only be released after each subpartition has been consumed once per pin * operation. */ void pin() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java index 02212ce..10eb086 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.io.network.partition; - import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.taskmanager.TaskActions; +/** + * Interface for notifications about consumable partitions. + */ public interface ResultPartitionConsumableNotifier { void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java index b84c33b..cee79a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java @@ -27,7 +27,7 @@ import java.io.Serializable; /** * Runtime identifier of a produced {@link IntermediateResultPartition}. * - * <p> In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely + * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely * identify a result partition. It needs to be associated with the producing task as well to ensure * correct tracking of failed/restarted tasks. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java index db72d63..faeaaf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.io.network.partition; import java.io.IOException; +/** + * Interface for creating result partitions. + */ public interface ResultPartitionProvider { /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java index 256387c..f62dbeeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.io.network.partition; +/** + * Type of a result partition. + */ public enum ResultPartitionType { BLOCKING(false, false, false), @@ -27,12 +30,12 @@ public enum ResultPartitionType { /** * Pipelined partitions with a bounded (local) buffer pool. * - * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much + * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the * overall network buffer pool size, this, however, still allows to be flexible with regards * to the total number of partitions by selecting an appropriately big network buffer pool size. * - * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are + * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are * no checkpoint barriers. */ PIPELINED_BOUNDED(true, true, true); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index adc0ed3..58a1402 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -43,16 +43,16 @@ public abstract class ResultSubpartition { /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>(); - /** The number of non-event buffers currently in this subpartition */ + /** The number of non-event buffers currently in this subpartition. */ @GuardedBy("buffers") private int buffersInBacklog; // - Statistics ---------------------------------------------------------- - /** The total number of buffers (both data and event buffers) */ + /** The total number of buffers (both data and event buffers). */ private long totalNumberOfBuffers; - /** The total number of bytes (both data and event buffers) */ + /** The total number of bytes (both data and event buffers). */ private long totalNumberOfBytes; public ResultSubpartition(int index, ResultPartition parent) { @@ -102,19 +102,19 @@ public abstract class ResultSubpartition { * @throws IOException * thrown in case of errors while adding the buffer */ - abstract public boolean add(BufferConsumer bufferConsumer) throws IOException; + public abstract boolean add(BufferConsumer bufferConsumer) throws IOException; - abstract public void flush(); + public abstract void flush(); - abstract public void finish() throws IOException; + public abstract void finish() throws IOException; - abstract public void release() throws IOException; + public abstract void release() throws IOException; - abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException; + public abstract ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException; abstract int releaseMemory() throws IOException; - abstract public boolean isReleased(); + public abstract boolean isReleased(); /** * Gets the number of non-event buffers in this subpartition. @@ -132,7 +132,7 @@ public abstract class ResultSubpartition { * This method must not acquire locks or interfere with the task and network threads in * any way. */ - abstract public int unsynchronizedGetNumberOfQueuedBuffers(); + public abstract int unsynchronizedGetNumberOfQueuedBuffers(); /** * Decreases the number of non-event buffers by one after fetching a non-event @@ -198,7 +198,6 @@ public abstract class ResultSubpartition { return buffersInBacklog; } - public boolean nextBufferIsEvent() { return nextBufferIsEvent; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index b1ccd63..a755955 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import javax.annotation.Nullable; + import java.io.IOException; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 2a7cedf..a08ecc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -34,8 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * An input channel consumes a single {@link ResultSubpartitionView}. - * <p> - * For each channel, the consumption life cycle is as follows: + * + * <p>For each channel, the consumption life cycle is as follows: * <ol> * <li>{@link #requestSubpartition(int)}</li> * <li>{@link #getNextBuffer()}</li> @@ -66,7 +66,7 @@ public abstract class InputChannel { protected final Counter numBuffersIn; - /** The current backoff (in ms) */ + /** The current backoff (in ms). */ private int currentBackoff; protected InputChannel( @@ -111,12 +111,12 @@ public abstract class InputChannel { /** * Notifies the owning {@link SingleInputGate} that this channel became non-empty. - * + * * <p>This is guaranteed to be called only when a Buffer was added to a previously * empty input channel. The notion of empty is atomically consistent with the flag * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer * from this channel. - * + * * <p><b>Note:</b> When the input channel observes an exception, this * method is called regardless of whether the channel was empty before. That ensures * that the parent InputGate will always be notified about the exception. @@ -132,8 +132,8 @@ public abstract class InputChannel { /** * Requests the queue with the specified index of the source intermediate * result partition. - * <p> - * The queue index to request depends on which sub task the channel belongs + * + * <p>The queue index to request depends on which sub task the channel belongs * to and is specified by the consumer of this channel. */ abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException; @@ -149,8 +149,8 @@ public abstract class InputChannel { /** * Sends a {@link TaskEvent} back to the task producing the consumed result partition. - * <p> - * <strong>Important</strong>: The producing task has to be running to receive backwards events. + * + * <p><strong>Important</strong>: The producing task has to be running to receive backwards events. * This means that the result type needs to be pipelined and the task logic has to ensure that * the producer will wait for all backwards events. Otherwise, this will lead to an Exception * at runtime. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java index ceeb83d..c1886de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java @@ -22,6 +22,9 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +/** + * Identifier for input channels. + */ public class InputChannelID extends AbstractID { private static final long serialVersionUID = 1L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index c78abb5..6e59f91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -26,10 +26,10 @@ import java.util.Optional; /** * An input gate consumes one or more partitions of a single produced intermediate result. * - * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these + * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these * partitions is furthermore partitioned into one or more subpartitions. * - * <p> As an example, consider a map-reduce program, where the map operator produces data and the + * <p>As an example, consider a map-reduce program, where the map operator produces data and the * reduce operator consumes the produced data. * * <pre>{@code @@ -38,7 +38,7 @@ import java.util.Optional; * +-----+ +---------------------+ +--------+ * }</pre> * - * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its + * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more * subpartitions. * @@ -59,7 +59,7 @@ import java.util.Optional; * +-----------------------------------------+ * }</pre> * - * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting + * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two * subpartitions -- one for each parallel reduce subtask. As shown in the Figure, each reduce task * will have an input gate attached to it. This will provide its input, which will consist of one diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java index 69af455..ebb8b9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java @@ -40,7 +40,7 @@ public class InputGateMetrics { // ------------------------------------------------------------------------ - // these methods are package private to make access from the nested classes faster + // these methods are package private to make access from the nested classes faster /** * Iterates over all input channels and collects the total number of queued buffers in a diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 4b3a8ff..e7986bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -57,7 +57,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit /** Task event dispatcher for backwards events. */ private final TaskEventDispatcher taskEventDispatcher; - /** The consumed subpartition */ + /** The consumed subpartition. */ private volatile ResultSubpartitionView subpartitionView; private volatile boolean isReleased; @@ -245,7 +245,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } /** - * Releases the partition reader + * Releases the partition reader. */ @Override void releaseAllResources() throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 20a7aed..c0e9177 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import java.io.IOException; import java.util.Optional; @@ -89,8 +89,8 @@ class UnknownInputChannel extends InputChannel { /** * Returns <code>false</code>. - * <p> - * <strong>Important</strong>: It is important that the method correctly + * + * <p><strong>Important</strong>: It is important that the method correctly * always <code>false</code> for unknown input channels in order to not * finish the consumption of an intermediate result partition early. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index f73ede7..f7db40b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -59,7 +59,7 @@ public class InputChannelTestUtils { return manager; } - + public static ConnectionManager createDummyConnectionManager() throws Exception { final PartitionRequestClient mockClient = mock(PartitionRequestClient.class); @@ -71,6 +71,6 @@ public class InputChannelTestUtils { // ------------------------------------------------------------------------ - /** This class is not meant to be instantiated */ + /** This class is not meant to be instantiated. */ private InputChannelTestUtils() {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index 73f3cfb..5f5728d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -46,6 +46,9 @@ import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; +/** + * Concurrency tests for input gates. + */ public class InputGateConcurrentTest { @Test @@ -192,8 +195,8 @@ public class InputGateConcurrentTest { // testing threads // ------------------------------------------------------------------------ - private static abstract class Source { - + private abstract static class Source { + abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index 82a27cc..6691875 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -58,6 +58,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +/** + * Tests verifying fairness in input gates. + */ public class InputGateFairnessTest { @Test @@ -115,7 +118,7 @@ public class InputGateFairnessTest { max = Math.max(max, size); } - assertTrue(max == min || max == min+1); + assertTrue(max == min || max == (min + 1)); } assertFalse(gate.getNextBufferOrEvent().isPresent()); @@ -207,11 +210,11 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( - gate, i, new ResultPartitionID(), mock(ConnectionID.class), + gate, i, new ResultPartitionID(), mock(ConnectionID.class), connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); channels[i] = channel; - + for (int p = 0; p < buffersPerChannel; p++) { channel.onBuffer(mockBuffer, p, -1); } @@ -233,7 +236,7 @@ public class InputGateFairnessTest { max = Math.max(max, size); } - assertTrue(max == min || max == min+1); + assertTrue(max == min || max == (min + 1)); } assertFalse(gate.getNextBufferOrEvent().isPresent()); @@ -287,7 +290,7 @@ public class InputGateFairnessTest { max = Math.max(max, size); } - assertTrue(max == min || max == min+1); + assertTrue(max == min || max == (min + 1)); if (i % (2 * numChannels) == 0) { // add three buffers to each channel, in random order @@ -336,7 +339,7 @@ public class InputGateFairnessTest { partitions[i].onBuffer(buffer, sequenceNumbers[i]++, -1); } } - + // ------------------------------------------------------------------------ private static class FairnessVerifyingInputGate extends SingleInputGate { @@ -372,7 +375,6 @@ public class InputGateFairnessTest { this.uniquenessChecker = new HashSet<>(); } - @Override public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { synchronized (channelsWithData) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java index aecab75..b83067c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java @@ -40,14 +40,17 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +/** + * Test for consuming a pipelined result only partially. + */ public class LegacyPartialConsumePipelinedResultTest extends TestLogger { // Test configuration - private final static int NUMBER_OF_TMS = 1; - private final static int NUMBER_OF_SLOTS_PER_TM = 1; - private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + private static final int NUMBER_OF_TMS = 1; + private static final int NUMBER_OF_SLOTS_PER_TM = 1; + private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; - private final static int NUMBER_OF_NETWORK_BUFFERS = 128; + private static final int NUMBER_OF_NETWORK_BUFFERS = 128; private static TestingCluster flink; @@ -72,7 +75,7 @@ public class LegacyPartialConsumePipelinedResultTest extends TestLogger { /** * Tests a fix for FLINK-1930. * - * <p> When consuming a pipelined result only partially, is is possible that local channels + * <p>When consuming a pipelined result only partially, is is possible that local channels * release the buffer pool, which is associated with the result partition, too early. If the * producer is still producing data when this happens, it runs into an IllegalStateException, * because of the destroyed buffer pool. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 2eec34c..f6689fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -40,6 +40,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +/** + * Test for consuming a pipelined result only partially. + */ public class PartialConsumePipelinedResultTest extends TestLogger { // Test configuration @@ -78,8 +81,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger { /** * Tests a fix for FLINK-1930. * - * <p> When consuming a pipelined result only partially, is is possible that local channels - * release the buffer pool, which is associated with the result partition, too early. If the + * <p>When consuming a pipelined result only partially, is is possible that local channels + * release the buffer pool, which is associated with the result partition, too early. If the * producer is still producing data when this happens, it runs into an IllegalStateException, * because of the destroyed buffer pool. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index bc66c9d..fc9a643 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -59,10 +59,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for {@link PipelinedSubpartition}. + */ public class PipelinedSubpartitionTest extends SubpartitionTestBase { - /** Executor service for concurrent produce/consume tests */ - private final static ExecutorService executorService = Executors.newCachedThreadPool(); + /** Executor service for concurrent produce/consume tests. */ + private static final ExecutorService executorService = Executors.newCachedThreadPool(); @AfterClass public static void shutdownExecutorService() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java index 6bff0f6..d182f11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java @@ -26,6 +26,9 @@ import org.junit.Test; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link ProducerFailedException}. + */ public class ProducerFailedExceptionTest { @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index d757aa9..2f5a013 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + import org.junit.Test; import java.io.IOException; @@ -31,6 +32,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +/** + * Tests for {@link InputChannel}. + */ public class InputChannelTest { @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index a914733..a67df0b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -36,6 +36,11 @@ import java.util.Optional; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; +/** + * Input gate helper for unit tests. + * + * @param <T> type of the value to handle + */ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate { private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 1ecb67f..2afd6d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -75,12 +75,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for the {@link LocalInputChannel}. + */ public class LocalInputChannelTest { /** * Tests the consumption of multiple subpartitions via local input channels. * - * <p> Multiple producer tasks produce pipelined partitions, which are consumed by multiple + * <p>Multiple producer tasks produce pipelined partitions, which are consumed by multiple * tasks via local input channels. */ @Test @@ -266,20 +269,22 @@ public class LocalInputChannelTest { * Verifies that concurrent release via the SingleInputGate and re-triggering * of a partition request works smoothly. * - * - SingleInputGate acquires its request lock and tries to release all + * <ul> + * <li>SingleInputGate acquires its request lock and tries to release all * registered channels. When releasing a channel, it needs to acquire - * the channel's shared request-release lock. - * - If a LocalInputChannel concurrently retriggers a partition request via + * the channel's shared request-release lock.</li> + * <li>If a LocalInputChannel concurrently retriggers a partition request via * a Timer Thread it acquires the channel's request-release lock and calls * the retrigger callback on the SingleInputGate, which again tries to - * acquire the gate's request lock. + * acquire the gate's request lock.</li> + * </ul> * - * For certain timings this obviously leads to a deadlock. This test reliably + * <p>For certain timings this obviously leads to a deadlock. This test reliably * reproduced such a timing (reported in FLINK-5228). This test is pretty much * testing the buggy implementation and has not much more general value. If it * becomes obsolete at some point (future greatness ;)), feel free to remove it. * - * The fix in the end was to to not acquire the channels lock when releasing it + * <p>The fix in the end was to to not acquire the channels lock when releasing it * and/or not doing any input gate callbacks while holding the channel's lock. * I decided to do both. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 9141b36..ec80459 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -336,9 +336,11 @@ public class RemoteInputChannelTest { * Tests to verify the behaviours of three different processes if the number of available * buffers is less than required buffers. * - * 1. Recycle the floating buffer - * 2. Recycle the exclusive buffer - * 3. Decrease the sender's backlog + * <ol> + * <li>Recycle the floating buffer</li> + * <li>Recycle the exclusive buffer</li> + * <li>Decrease the sender's backlog</li> + * </ol> */ @Test public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 7120327..4bf5b22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -164,7 +164,7 @@ public class SingleInputGateTest { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false,0, false)); + new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), false, 0, false)); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index b0bafd5..33f5709 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 2e01225..96f01fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -32,13 +32,16 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +/** + * Tests for {@link UnionInputGate}. + */ public class UnionInputGateTest { /** * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return * value after receiving all end-of-partition events. * - * <p> For buffer-or-event instances, it is important to verify that they have been set off to + * <p>For buffer-or-event instances, it is important to verify that they have been set off to * the correct logical index. */ @Test(timeout = 120 * 1000) diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 5efc974..cf985a9 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -80,11 +80,11 @@ under the License. files="(.*)test[/\\](.*)runtime[/\\]io[/\\]disk[/\\](.*)" checks="AvoidStarImport|UnusedImports"/> <suppress - files="(.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)" + files="(.*)runtime[/\\]io[/\\]network[/\\](netty|util)[/\\](.*)" checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...] <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory--> <suppress - files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)" + files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|util)[/\\](.*)" checks="AvoidStarImport|UnusedImports"/> <!--Test class copied from the netty project--> <suppress
