http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java index ce85b6a..71706d6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; @@ -26,9 +24,12 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link StreamMap}. These test that: *
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 8a5c997..50dc4d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -39,12 +39,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; + import org.junit.Assert; import org.junit.Test; import java.io.InputStream; import java.util.BitSet; +/** + * Tests for {@link StreamOperator} snapshot restoration. + */ public class StreamOperatorSnapshotRestoreTest { private static final int MAX_PARALLELISM = 10; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java index 28cdf9d..2bae429 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; @@ -34,6 +32,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link StreamProject}. These test that: * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java index 3695120..d2cf2e6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.CollectorOutput; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -39,19 +40,22 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link StreamSource} awareness of source idleness. + */ @RunWith(Parameterized.class) public class StreamSourceContextIdleDetectionTests { /** The tests in this class will be parameterized with these enumerations.*/ private enum TestMethod { - /** test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method */ + // test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method COLLECT, - /** test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */ + // test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method COLLECT_WITH_TIMESTAMP, - /** test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method */ + // test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method EMIT_WATERMARK } @@ -73,7 +77,7 @@ public class StreamSourceContextIdleDetectionTests { * (7) Advance time to 510 and trigger idleness detection. Since no records were collected in-between the two * idleness detections, status should have been toggle back to IDLE. * - * Inline comments will refer to the corresponding tested steps in the scenario. + * <p>Inline comments will refer to the corresponding tested steps in the scenario. */ @Test public void testManualWatermarkContext() throws Exception { @@ -103,12 +107,12 @@ public class StreamSourceContextIdleDetectionTests { assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); // corresponds to step (3) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 2*idleTimeout); - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout); + processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); // corresponds to step (4) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + idleTimeout / 10); switch (testMethod) { case COLLECT: context.collect("msg"); @@ -123,7 +127,7 @@ public class StreamSourceContextIdleDetectionTests { assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); // corresponds to step (5) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 2*idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 2 * idleTimeout / 10); switch (testMethod) { case COLLECT: context.collect("msg"); @@ -138,11 +142,11 @@ public class StreamSourceContextIdleDetectionTests { assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); // corresponds to step (6) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + idleTimeout / 10); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); // corresponds to step (7) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 5*idleTimeout + idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 5 * idleTimeout + idleTimeout / 10); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); } @@ -159,7 +163,7 @@ public class StreamSourceContextIdleDetectionTests { * should have been "piggy-backed" in the task, allowing the status to be toggled to IDLE before the next * actual idle detection task at 530. * - * Inline comments will refer to the corresponding tested steps in the scenario. + * <p>Inline comments will refer to the corresponding tested steps in the scenario. */ @Test public void testAutomaticWatermarkContext() throws Exception { @@ -189,24 +193,24 @@ public class StreamSourceContextIdleDetectionTests { // corresponds to step (2) of scenario (please see method-level Javadoc comment) processingTimeService.setCurrentTime(initialTime + watermarkInterval); expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); - processingTimeService.setCurrentTime(initialTime + 2*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 2 * watermarkInterval); expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); processingTimeService.setCurrentTime(initialTime + idleTimeout); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); assertEquals(expectedOutput, output); // corresponds to step (3) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3*watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 4*watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 2*idleTimeout); - processingTimeService.setCurrentTime(initialTime + 6*watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 7*watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout); + processingTimeService.setCurrentTime(initialTime + 3 * watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 4 * watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout); + processingTimeService.setCurrentTime(initialTime + 6 * watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 7 * watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); assertEquals(expectedOutput, output); // corresponds to step (4) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + idleTimeout / 10); switch (testMethod) { case COLLECT: context.collect("msg"); @@ -232,8 +236,8 @@ public class StreamSourceContextIdleDetectionTests { } // corresponds to step (5) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 8*watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 3*idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 8 * watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 3 * idleTimeout / 10); switch (testMethod) { case COLLECT: context.collect("msg"); @@ -266,7 +270,7 @@ public class StreamSourceContextIdleDetectionTests { assertEquals(expectedOutput, output); } - processingTimeService.setCurrentTime(initialTime + 10*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 10 * watermarkInterval); switch (testMethod) { case COLLECT: case COLLECT_WITH_TIMESTAMP: @@ -280,7 +284,7 @@ public class StreamSourceContextIdleDetectionTests { } // corresponds to step (6) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10); + processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + idleTimeout / 10); switch (testMethod) { case COLLECT: case COLLECT_WITH_TIMESTAMP: @@ -293,7 +297,7 @@ public class StreamSourceContextIdleDetectionTests { } // corresponds to step (7) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 11*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 11 * watermarkInterval); assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); assertEquals(expectedOutput, output); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 5ca9cb4..2e24f4c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -33,8 +33,8 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; @@ -48,6 +48,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemoryStateBackend; + import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; @@ -66,6 +67,9 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for {@link StreamingRuntimeContext}. + */ public class StreamingRuntimeContextTest { @Test @@ -245,7 +249,7 @@ public class StreamingRuntimeContextTest { AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); - KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class); + KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class); DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config); @@ -271,7 +275,7 @@ public class StreamingRuntimeContextTest { AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); ExecutionConfig config = new ExecutionConfig(); - KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class); + KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class); DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config); @@ -307,7 +311,7 @@ public class StreamingRuntimeContextTest { AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); ExecutionConfig config = new ExecutionConfig(); - KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class); + KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class); DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java index a03a4c5..f8b095c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java index b675cc5..6c93894 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + import org.junit.Assert; import org.junit.Test; @@ -38,9 +39,11 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +/** + * Test snapshot state with {@link WrappingFunction}. + */ public class WrappingFunctionSnapshotRestoreTest { - @Test public void testSnapshotAndRestoreWrappedCheckpointedFunction() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 1d83229..f9a1cd0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -72,6 +73,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import javax.annotation.Nonnull; + import java.util.ArrayDeque; import java.util.Collections; import java.util.Comparator; @@ -91,7 +93,14 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link AsyncWaitOperator}. These test that: @@ -361,7 +370,7 @@ public class AsyncWaitOperatorTest extends TestLogger { } /** - * Tests that the AsyncWaitOperator works together with chaining + * Tests that the AsyncWaitOperator works together with chaining. */ @Test public void testOperatorChainWithProcessingTime() throws Exception { @@ -601,7 +610,6 @@ public class AsyncWaitOperatorTest extends TestLogger { super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize); } - @Override public void acknowledgeCheckpoint( long checkpointId, @@ -702,7 +710,7 @@ public class AsyncWaitOperatorTest extends TestLogger { * emitter is currently waiting on the checkpoint lock (e.g. in the case of two chained async * wait operators where the latter operator's queue is currently full). * - * Note that this test does not enforce the exact strict ordering because with the fix it is no + * <p>Note that this test does not enforce the exact strict ordering because with the fix it is no * longer possible. However, it provokes the described situation without the fix. */ @Test(timeout = 10000L) @@ -880,7 +888,7 @@ public class AsyncWaitOperatorTest extends TestLogger { /** * FLINK-6435 * - * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until + * <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until * another StreamElementQueueEntry is properly completed before it is collected. */ @Test(timeout = 2000) @@ -891,7 +899,7 @@ public class AsyncWaitOperatorTest extends TestLogger { /** * FLINK-6435 * - * Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until + * <p>Tests that a user exception triggers the completion of a StreamElementQueueEntry and does not wait to until * another StreamElementQueueEntry is properly completed before it is collected. */ @Test(timeout = 2000) @@ -945,7 +953,7 @@ public class AsyncWaitOperatorTest extends TestLogger { /** * FLINK-6435 * - * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that + * <p>Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that * a StreamElementQueueEntry is completed in case of a timeout exception. */ @Test @@ -956,7 +964,7 @@ public class AsyncWaitOperatorTest extends TestLogger { /** * FLINK-6435 * - * Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that + * <p>Tests that timeout exceptions are properly handled in ordered output mode. The proper handling means that * a StreamElementQueueEntry is completed in case of a timeout exception. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java index c3a47aa..da2d795 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.CollectorOutput; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -46,6 +47,9 @@ import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +/** + * Tests for {@link Emitter}. + */ public class EmitterTest extends TestLogger { private static final long timeout = 10000L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java index 0380512..5832b89 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,7 +45,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; /** - * {@link OrderedStreamElementQueue} specific tests + * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java index c9e59c7..fe9db95 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java index 0a57f92..ba6ce42 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.async.OperatorActions; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -45,7 +46,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; /** - * {@link UnorderedStreamElementQueue} specific tests + * {@link UnorderedStreamElementQueue} specific tests. */ public class UnorderedStreamElementQueueTest extends TestLogger { private static final long timeout = 10000L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java index c19eb37..beb5bf5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java @@ -15,23 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.operators.co; +package org.apache.flink.streaming.api.operators.co; -import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.concurrent.ConcurrentLinkedQueue; -import static org.junit.Assert.assertEquals; - /** * Tests {@link CoProcessOperator}. */ @@ -97,7 +95,6 @@ public class CoProcessOperatorTest extends TestLogger { testHarness.close(); } - private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java index 4dbf7b8..6f7d097 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java @@ -17,11 +17,6 @@ package org.apache.flink.streaming.api.operators.co; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; @@ -30,9 +25,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; +import java.io.Serializable; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link CoStreamFlatMap}. These test that: * @@ -45,7 +44,7 @@ import org.junit.Test; public class CoStreamFlatMapTest implements Serializable { private static final long serialVersionUID = 1L; - private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> { + private static final class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java index 28ae664..6826c96 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; + import org.junit.Assert; import org.junit.Test; @@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class CoStreamMapTest implements Serializable { private static final long serialVersionUID = 1L; - private final static class MyCoMap implements CoMapFunction<Double, Integer, String> { + private static final class MyCoMap implements CoMapFunction<Double, Integer, String> { private static final long serialVersionUID = 1L; @Override @@ -56,7 +57,6 @@ public class CoStreamMapTest implements Serializable { } } - @Test public void testCoMap() throws Exception { CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java index d8c9a61..3f590ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.operators.co; +package org.apache.flink.streaming.api.operators.co; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.concurrent.ConcurrentLinkedQueue; @@ -324,7 +325,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger { testHarness.close(); } - private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> { private static final long serialVersionUID = 1L; @@ -413,7 +413,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger { ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } - @Override public void onTimer( long timestamp, @@ -494,7 +493,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger { ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } - @Override public void onTimer( long timestamp, @@ -519,7 +517,6 @@ public class KeyedCoProcessOperatorTest extends TestLogger { ctx.timerService().registerProcessingTimeTimer(5); } - @Override public void onTimer( long timestamp, http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index 4b0f5ab..7657ce7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; @@ -42,7 +41,9 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.util.Collector; + import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -66,6 +67,9 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +/** + * Tests for {@link InternalWindowFunction}. + */ public class InternalWindowFunctionTest { @SuppressWarnings("unchecked") @@ -98,12 +102,12 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); - Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Iterable<Long> i = (Iterable<Long>) mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.process(((byte)0), w, ctx, i, c); + windowFunction.process(((byte) 0), w, ctx, i, c); verify(mock).apply(w, i, c); // check close @@ -141,11 +145,11 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); - Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Iterable<Long> i = (Iterable<Long>) mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.process(((byte)0), w, ctx, i, c); + windowFunction.process(((byte) 0), w, ctx, i, c); verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c)); // check close @@ -183,7 +187,7 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); - Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Iterable<Long> i = (Iterable<Long>) mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); @@ -225,7 +229,7 @@ public class InternalWindowFunctionTest { // check apply TimeWindow w = mock(TimeWindow.class); - Iterable<Long> i = (Iterable<Long>)mock(Iterable.class); + Iterable<Long> i = (Iterable<Long>) mock(Iterable.class); Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); @@ -288,7 +292,7 @@ public class InternalWindowFunctionTest { InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); windowFunction.process(42L, w, ctx, 23L, c); - verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + verify(mock).apply(eq(42L), eq(w), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close windowFunction.close(); @@ -329,8 +333,8 @@ public class InternalWindowFunctionTest { Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.process(((byte)0), w, ctx, 23L, c); - verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + windowFunction.process(((byte) 0), w, ctx, 23L, c); + verify(mock).apply(eq(w), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close windowFunction.close(); @@ -371,8 +375,8 @@ public class InternalWindowFunctionTest { Collector<String> c = (Collector<String>) mock(Collector.class); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.process(((byte)0), w, ctx, 23L, c); - verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + windowFunction.process(((byte) 0), w, ctx, 23L, c); + verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); // check close windowFunction.close(); @@ -424,9 +428,9 @@ public class InternalWindowFunctionTest { return null; } }).when(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), - (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); + (Iterable<Long>) argThat(IsIterableContainingInOrder.contains(23L)), eq(c)); - windowFunction.process(42L, w, ctx,23L, c); + windowFunction.process(42L, w, ctx, 23L, c); verify(ctx).currentProcessingTime(); verify(ctx).currentWatermark(); verify(ctx).windowState(); @@ -597,7 +601,7 @@ public class InternalWindowFunctionTest { args.add(24L); InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class); - windowFunction.process(((byte)0), w, ctx, args, c); + windowFunction.process(((byte) 0), w, ctx, args, c); verify(mock).process( (AggregateProcessAllWindowFunctionMock.Context) anyObject(), (Iterable) argThat(containsInAnyOrder(allOf( @@ -610,7 +614,7 @@ public class InternalWindowFunctionTest { verify(mock).close(); } - public static class ProcessWindowFunctionMock + private static class ProcessWindowFunctionMock extends ProcessWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -624,7 +628,7 @@ public class InternalWindowFunctionTest { } } - public static class AggregateProcessWindowFunctionMock + private static class AggregateProcessWindowFunctionMock extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -638,7 +642,7 @@ public class InternalWindowFunctionTest { } } - public static class AggregateProcessAllWindowFunctionMock + private static class AggregateProcessAllWindowFunctionMock extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow> implements OutputTypeConfigurable<String> { @@ -651,7 +655,7 @@ public class InternalWindowFunctionTest { public void process(Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { } } - public static class WindowFunctionMock + private static class WindowFunctionMock extends RichWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { @@ -664,7 +668,7 @@ public class InternalWindowFunctionTest { public void apply(Long aLong, TimeWindow w, Iterable<Long> input, Collector<String> out) throws Exception { } } - public static class AllWindowFunctionMock + private static class AllWindowFunctionMock extends RichAllWindowFunction<Long, String, TimeWindow> implements OutputTypeConfigurable<String> { @@ -677,7 +681,7 @@ public class InternalWindowFunctionTest { public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { } } - public static class ProcessAllWindowFunctionMock + private static class ProcessAllWindowFunctionMock extends ProcessAllWindowFunction<Long, String, TimeWindow> implements OutputTypeConfigurable<String> { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java index 6ee7d38..781a216 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java @@ -17,14 +17,17 @@ package org.apache.flink.streaming.api.streamtask; -import java.util.ArrayList; - -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.operators.DataSourceTask; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +/** + * Mock {@link RecordWriter}. + */ public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> { public ArrayList<Integer> emittedRecords; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java index dafdeed..4eda5ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java @@ -24,8 +24,11 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +/** + * Tests for {@link StreamIterationHead}. + */ public class StreamIterationHeadTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java index b640d6f..bedab97 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java @@ -17,11 +17,15 @@ package org.apache.flink.streaming.api.windowing.deltafunction; -import static org.junit.Assert.*; - import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CosineDistance}. + */ public class CosineDistanceTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -29,18 +33,18 @@ public class CosineDistanceTest { public void testCosineDistance() { //Reference calculated using wolfram alpha - double[][][] testdata={ - {{0,0,0},{0,0,0}}, - {{0,0,0},{1,2,3}}, - {{1,2,3},{0,0,0}}, - {{1,2,3},{4,5,6}}, - {{1,2,3},{-4,-5,-6}}, - {{1,2,-3},{-4,5,-6}}, - {{1,2,3,4},{5,6,7,8}}, - {{1,2},{3,4}}, - {{1},{2}}, + double[][][] testdata = { + {{0, 0, 0}, {0, 0, 0}}, + {{0, 0, 0}, {1, 2, 3}}, + {{1, 2, 3}, {0, 0, 0}}, + {{1, 2, 3}, {4, 5, 6}}, + {{1, 2, 3}, {-4, -5, -6}}, + {{1, 2, -3}, {-4, 5, -6}}, + {{1, 2, 3, 4}, {5, 6, 7, 8}}, + {{1, 2}, {3, 4}}, + {{1}, {2}}, }; - double[] referenceSolutions={ + double[] referenceSolutions = { 0, 0, 0, @@ -60,12 +64,15 @@ public class CosineDistanceTest { } private String arrayToString(double[] in){ - if (in.length==0) return "{}"; - String result="{"; - for (double d:in){ - result+=d+","; + if (in.length == 0) { + return "{}"; + } + + String result = "{"; + for (double d:in) { + result += d + ","; } - return result.substring(0, result.length()-1)+"}"; + return result.substring(0, result.length() - 1) + "}"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java index 1ba5f84..c534bf8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java @@ -17,11 +17,15 @@ package org.apache.flink.streaming.api.windowing.deltafunction; -import static org.junit.Assert.*; - import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link EuclideanDistance}. + */ public class EuclideanDistanceTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -29,18 +33,18 @@ public class EuclideanDistanceTest { public void testEuclideanDistance() { //Reference calculated using wolfram alpha - double[][][] testdata={ - {{0,0,0},{0,0,0}}, - {{0,0,0},{1,2,3}}, - {{1,2,3},{0,0,0}}, - {{1,2,3},{4,5,6}}, - {{1,2,3},{-4,-5,-6}}, - {{1,2,-3},{-4,5,-6}}, - {{1,2,3,4},{5,6,7,8}}, - {{1,2},{3,4}}, - {{1},{2}}, + double[][][] testdata = { + {{0, 0, 0}, {0, 0, 0}}, + {{0, 0, 0}, {1, 2, 3}}, + {{1, 2, 3}, {0, 0, 0}}, + {{1, 2, 3}, {4, 5, 6}}, + {{1, 2, 3}, {-4, -5, -6}}, + {{1, 2, -3}, {-4, 5, -6}}, + {{1, 2, 3, 4}, {5, 6, 7, 8}}, + {{1, 2}, {3, 4}}, + {{1}, {2}}, }; - double[] referenceSolutions={ + double[] referenceSolutions = { 0, 3.741657, 3.741657, @@ -61,12 +65,15 @@ public class EuclideanDistanceTest { } private String arrayToString(double[] in){ - if (in.length==0) return "{}"; - String result="{"; + if (in.length == 0) { + return "{}"; + } + + String result = "{"; for (double d:in){ - result+=d+","; + result += d + ","; } - return result.substring(0, result.length()-1)+"}"; + return result.substring(0, result.length() - 1) + "}"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index a897674..58898d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Test; @@ -166,7 +167,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { * B (unchained): [ (src0) ] -> [ (map) -> (filter) -> (sink) ] * </pre> * - * The hashes for the single vertex in A and the source vertex in B need to be different. + * <p>The hashes for the single vertex in A and the source vertex in B need to be different. */ @Test public void testNodeHashAfterSourceUnchaining() throws Exception { @@ -208,7 +209,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { * B (unchained): [ (src0) ] -> [ (map) -> (filter) -> (sink) ] * </pre> * - * The hashes for the single vertex in A and the source vertex in B need to be different. + * <p>The hashes for the single vertex in A and the source vertex in B need to be different. */ @Test public void testNodeHashAfterIntermediateUnchaining() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java index 4175f18..e02ccdd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java @@ -26,8 +26,12 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +/** + * Test translation of {@link CheckpointingMode}. + */ @SuppressWarnings("serial") public class TranslationTest { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java index 8065cf1..5d606ee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java @@ -28,17 +28,20 @@ import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; - import org.apache.flink.util.SerializedValue; + import org.junit.Test; import javax.annotation.Nullable; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; import static java.util.Arrays.asList; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests that when sources implement {@link WithMasterCheckpointHook} the hooks are properly http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 4322748..b5ea866 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -20,10 +20,10 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -48,7 +47,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import static org.mockito.Mockito.any; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.eq; @@ -57,7 +55,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; /** - * Tests for the barrier buffer's maximum limit of buffered/spilled bytes + * Tests for the barrier buffer's maximum limit of buffered/spilled bytes. */ public class BarrierBufferAlignmentLimitTest { @@ -65,7 +63,7 @@ public class BarrierBufferAlignmentLimitTest { private static final Random RND = new Random(); - private static IOManager IO_MANAGER; + private static IOManager ioManager; // ------------------------------------------------------------------------ // Setup @@ -73,12 +71,12 @@ public class BarrierBufferAlignmentLimitTest { @BeforeClass public static void setup() { - IO_MANAGER = new IOManagerAsync(); + ioManager = new IOManagerAsync(); } @AfterClass public static void shutdownIOManager() { - IO_MANAGER.shutdown(); + ioManager.shutdown(); } // ------------------------------------------------------------------------ @@ -86,7 +84,7 @@ public class BarrierBufferAlignmentLimitTest { // ------------------------------------------------------------------------ /** - * This tests that a single alignment that buffers too much data cancels + * This tests that a single alignment that buffers too much data cancels. */ @Test public void testBreakCheckpointAtAlignmentLimit() throws Exception { @@ -116,7 +114,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 1000); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -173,7 +171,7 @@ public class BarrierBufferAlignmentLimitTest { * - an alignment starts * - barriers from a second checkpoint queue before the first completes * - together they are larger than the threshold - * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes + * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes. */ @Test public void testAlignmentLimitWithQueuedAlignments() throws Exception { @@ -210,7 +208,7 @@ public class BarrierBufferAlignmentLimitTest { // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 500); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -315,7 +313,7 @@ public class BarrierBufferAlignmentLimitTest { private static void checkNoTempFilesRemain() { // validate that all temp files have been removed - for (File dir : IO_MANAGER.getSpillingDirectories()) { + for (File dir : ioManager.getSpillingDirectories()) { for (String file : dir.list()) { if (file != null && !(file.equals(".") || file.equals(".."))) { fail("barrier buffer did not clean up temp files. remaining file: " + file); @@ -325,7 +323,7 @@ public class BarrierBufferAlignmentLimitTest { } /** - * A validation matcher for checkpoint metadata against checkpoint IDs + * A validation matcher for checkpoint metadata against checkpoint IDs. */ private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 3514f56..49d07b1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; + import org.junit.Test; import java.io.IOException; @@ -82,11 +83,11 @@ public class BarrierBufferMassiveRandomTest { // Mocks and Generators // ------------------------------------------------------------------------ - protected interface BarrierGenerator { - public boolean isNextBarrier(); + private interface BarrierGenerator { + boolean isNextBarrier(); } - protected static class RandomBarrier implements BarrierGenerator { + private static class RandomBarrier implements BarrierGenerator { private static final Random rnd = new Random(); @@ -117,7 +118,7 @@ public class BarrierBufferMassiveRandomTest { } } - protected static class RandomGeneratingInputGate implements InputGate { + private static class RandomGeneratingInputGate implements InputGate { private final int numChannels; private final BufferPool[] bufferPools; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index fedf4fc..c2cf7f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.TaskStateHandles; + import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.AfterClass; @@ -67,19 +68,19 @@ public class BarrierBufferTest { private static final int PAGE_SIZE = 512; - private static int SIZE_COUNTER = 0; + private static int sizeCounter = 0; - private static IOManager IO_MANAGER; + private static IOManager ioManager; @BeforeClass public static void setup() { - IO_MANAGER = new IOManagerAsync(); - SIZE_COUNTER = 1; + ioManager = new IOManagerAsync(); + sizeCounter = 1; } @AfterClass public static void shutdownIOManager() { - IO_MANAGER.shutdown(); + ioManager.shutdown(); } // ------------------------------------------------------------------------ @@ -99,7 +100,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); for (BufferOrEvent boe : sequence) { assertEquals(boe, buffer.getNextNonBlocked()); @@ -134,7 +135,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); for (BufferOrEvent boe : sequence) { assertEquals(boe, buffer.getNextNonBlocked()); @@ -173,7 +174,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -239,7 +240,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -348,7 +349,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -439,7 +440,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -547,7 +548,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -647,7 +648,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -752,7 +753,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); // checkpoint 1 check(sequence[0], buffer.getNextNonBlocked()); @@ -823,7 +824,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); @@ -892,7 +893,7 @@ public class BarrierBufferTest { MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); // pre checkpoint 2 check(sequence[0], buffer.getNextNonBlocked()); @@ -958,7 +959,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); // data after first checkpoint check(sequence[3], buffer.getNextNonBlocked()); @@ -1002,7 +1003,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -1066,7 +1067,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -1160,7 +1161,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -1214,7 +1215,7 @@ public class BarrierBufferTest { * This tests the where a replay of queued checkpoint barriers meets * a canceled checkpoint. * - * The replayed newer checkpoint barrier must not try to cancel the + * <p>The replayed newer checkpoint barrier must not try to cancel the * already canceled checkpoint. */ @Test @@ -1251,7 +1252,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -1336,7 +1337,7 @@ public class BarrierBufferTest { }; MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); - BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + BarrierBuffer buffer = new BarrierBuffer(gate, ioManager); StatefulTask toNotify = mock(StatefulTask.class); buffer.registerCheckpointEventHandler(toNotify); @@ -1398,7 +1399,7 @@ public class BarrierBufferTest { } private static BufferOrEvent createBuffer(int channel) { - final int size = SIZE_COUNTER++; + final int size = sizeCounter++; byte[] bytes = new byte[size]; RND.nextBytes(bytes); @@ -1436,7 +1437,7 @@ public class BarrierBufferTest { private static void checkNoTempFilesRemain() { // validate that all temp files have been removed - for (File dir : IO_MANAGER.getSpillingDirectories()) { + for (File dir : ioManager.getSpillingDirectories()) { for (String file : dir.list()) { if (file != null && !(file.equals(".") || file.equals(".."))) { fail("barrier buffer did not clean up temp files. remaining file: " + file); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 8c66205..847db5c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -263,7 +263,7 @@ public class BarrierTrackerTest { * discard a pending checkpoint as soon as it sees a barrier from a * later checkpoint from some channel. * - * This behavior is crucial, otherwise topologies where different inputs + * <p>This behavior is crucial, otherwise topologies where different inputs * have different latency (and that latency is close to or higher than the * checkpoint interval) may skip many checkpoints, or fail to complete a * checkpoint all together. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index 905bc59..4edb665 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -31,7 +31,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,16 +39,23 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Random; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import static org.junit.Assert.*; - +/** + * Tests for {@link BufferSpiller}. + */ public class BufferSpillerTest { private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class); private static final int PAGE_SIZE = 4096; - private static IOManager IO_MANAGER; + private static IOManager ioManager; private BufferSpiller spiller; @@ -60,18 +66,18 @@ public class BufferSpillerTest { @BeforeClass public static void setupIOManager() { - IO_MANAGER = new IOManagerAsync(); + ioManager = new IOManagerAsync(); } @AfterClass public static void shutdownIOManager() { - IO_MANAGER.shutdown(); + ioManager.shutdown(); } @Before public void createSpiller() { try { - spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE); + spiller = new BufferSpiller(ioManager, PAGE_SIZE); } catch (Exception e) { e.printStackTrace(); @@ -205,7 +211,7 @@ public class BufferSpillerTest { int currentNumRecordAndEvents = 0; // do multiple spilling / rolling over rounds - for (int round = 0; round < 2*sequences; round++) { + for (int round = 0; round < 2 * sequences; round++) { if (round % 2 == 1) { // make this an empty sequence @@ -392,7 +398,7 @@ public class BufferSpillerTest { private static void checkNoTempFilesRemain() { // validate that all temp files have been removed - for (File dir : IO_MANAGER.getSpillingDirectories()) { + for (File dir : ioManager.getSpillingDirectories()) { for (String file : dir.list()) { if (file != null && !(file.equals(".") || file.equals(".."))) { fail("barrier buffer did not clean up temp files. remaining file: " + file); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 3e2a75a..77c938a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -28,6 +28,9 @@ import java.util.ArrayDeque; import java.util.List; import java.util.Queue; +/** + * Mock {@link InputGate}. + */ public class MockInputGate implements InputGate { private final int pageSize; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java index a6e834c..adbe240 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java @@ -37,7 +37,11 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Random; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation, @@ -46,12 +50,11 @@ import static org.junit.Assert.*; public class SpilledBufferOrEventSequenceTest { private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN); - private final int pageSize = 32*1024; + private final int pageSize = 32 * 1024; private File tempFile; private FileChannel fileChannel; - @Before public void initTempChannel() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java index 54cd186..d114139 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java @@ -29,14 +29,16 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.types.LongValue; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * This test uses the PowerMockRunner runner to work around the fact that the @@ -99,7 +101,6 @@ public class StreamRecordWriterTest { when(mockWriter.getBufferProvider()).thenReturn(mockProvider); when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions); - return mockWriter; } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java index 466ca65..2d0855a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; + import org.junit.Assert; import org.junit.Test; @@ -41,6 +42,9 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; +/** + * Test processing files during rescaling. + */ public class ContinuousFileProcessingRescalingTest { @Test @@ -83,7 +87,6 @@ public class ContinuousFileProcessingRescalingTest { testHarness1.getOutput().clear(); testHarness2.getOutput().clear(); - // 2) and take the snapshots from the previous instances and merge them // into a new one which will be then used to initialize a third instance OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java index 9bcd2e6..5085eb4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java @@ -33,6 +33,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +/** + * Tests for {@link GenericWriteAheadSink}. + */ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> { @Override @@ -50,7 +53,6 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int return new Tuple1<>(counter); } - @Override protected void verifyResultsIdealCircumstances(ListSink sink) { @@ -182,7 +184,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } } - public static class SimpleCommitter extends CheckpointCommitter { + private static class SimpleCommitter extends CheckpointCommitter { private static final long serialVersionUID = 1L; private List<Tuple2<Long, Integer>> checkpoints; @@ -232,7 +234,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } } - public static class FailingCommitter extends CheckpointCommitter { + private static class FailingCommitter extends CheckpointCommitter { private static final long serialVersionUID = 1L; private List<Tuple2<Long, Integer>> checkpoints; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index c95a85e..8d99acd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; @@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorChain; import org.apache.flink.streaming.runtime.tasks.StreamTask; + import org.junit.Assert; import org.junit.Test; @@ -80,7 +82,7 @@ public class StreamOperatorChainingTest { private void testMultiChaining(StreamExecutionEnvironment env) throws Exception { // the actual elements will not be used - DataStream<Integer> input = env.fromElements(1,2,3); + DataStream<Integer> input = env.fromElements(1, 2, 3); sink1Results = new ArrayList<>(); sink2Results = new ArrayList<>(); @@ -188,7 +190,7 @@ public class StreamOperatorChainingTest { private void testMultiChainingWithSplit(StreamExecutionEnvironment env) throws Exception { // the actual elements will not be used - DataStream<Integer> input = env.fromElements(1,2,3); + DataStream<Integer> input = env.fromElements(1, 2, 3); sink1Results = new ArrayList<>(); sink2Results = new ArrayList<>();
