Repository: flink Updated Branches: refs/heads/master 7efa8ad34 -> 12b4185c6
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index a9d2ddf..5b995c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -27,7 +27,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.event.AbstractEvent; @@ -73,6 +72,9 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Mock {@link Environment}. + */ public class StreamMockEnvironment implements Environment { private final TaskInfo taskInfo; @@ -106,7 +108,7 @@ public class StreamMockEnvironment implements Environment { private volatile boolean wasFailedExternally = false; public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, - long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { + long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this.taskInfo = new TaskInfo( "", /* task name */ 1, /* num key groups / max parallelism */ @@ -131,7 +133,7 @@ public class StreamMockEnvironment implements Environment { } public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, - MockInputSplitProvider inputSplitProvider, int bufferSize) { + MockInputSplitProvider inputSplitProvider, int bufferSize) { this(jobConfig, taskConfig, new ExecutionConfig(), memorySize, inputSplitProvider, bufferSize); } @@ -183,7 +185,6 @@ public class StreamMockEnvironment implements Environment { } }).when(mockWriter).writeBufferToAllChannels(any(Buffer.class)); - outputs.add(mockWriter); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index f8d5393..6e3c299 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; + import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -41,6 +42,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Test checkpoint cancellation barrier. + */ public class StreamTaskCancellationBarrierTest { /** @@ -71,9 +75,8 @@ public class StreamTaskCancellationBarrierTest { /** * This test verifies (for onw input tasks) that the Stream tasks react the following way to * receiving a checkpoint cancellation barrier: - * * - send a "decline checkpoint" notification out (to the JobManager) - * - emit a cancellation barrier downstream + * - emit a cancellation barrier downstream. */ @Test public void testDeclineCallOnCancelBarrierOneInput() throws Exception { @@ -115,11 +118,10 @@ public class StreamTaskCancellationBarrierTest { } /** - * This test verifies (for onw input tasks) that the Stream tasks react the following way to + * This test verifies (for one input tasks) that the Stream tasks react the following way to * receiving a checkpoint cancellation barrier: - * * - send a "decline checkpoint" notification out (to the JobManager) - * - emit a cancellation barrier downstream + * - emit a cancellation barrier downstream. */ @Test public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 546188e..8957255 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import akka.dispatch.Futures; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; @@ -88,6 +87,8 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; + +import akka.dispatch.Futures; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -137,13 +138,16 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; import static org.powermock.api.mockito.PowerMockito.whenNew; +/** + * Tests for {@link StreamTask}. + */ @RunWith(PowerMockRunner.class) @PrepareForTest(StreamTask.class) @PowerMockIgnore("org.apache.log4j.*") @SuppressWarnings("deprecation") public class StreamTaskTest extends TestLogger { - private static OneShotLatch SYNC_LATCH; + private static OneShotLatch syncLatch; /** * This test checks that cancel calls that are issued before the operator is @@ -242,7 +246,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testCancellationNotBlockedOnLock() throws Exception { - SYNC_LATCH = new OneShotLatch(); + syncLatch = new OneShotLatch(); StreamConfig cfg = new StreamConfig(new Configuration()); Task task = createTask(CancelLockingTask.class, cfg, new Configuration()); @@ -251,7 +255,7 @@ public class StreamTaskTest extends TestLogger { // execution state RUNNING is not enough, we need to wait until the stream task's run() method // is entered task.startTaskThread(); - SYNC_LATCH.await(); + syncLatch.await(); // cancel the execution - this should lead to smooth shutdown task.cancelExecution(); @@ -262,7 +266,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testCancellationFailsWithBlockingLock() throws Exception { - SYNC_LATCH = new OneShotLatch(); + syncLatch = new OneShotLatch(); StreamConfig cfg = new StreamConfig(new Configuration()); Task task = createTask(CancelFailingTask.class, cfg, new Configuration()); @@ -271,7 +275,7 @@ public class StreamTaskTest extends TestLogger { // execution state RUNNING is not enough, we need to wait until the stream task's run() method // is entered task.startTaskThread(); - SYNC_LATCH.await(); + syncLatch.await(); // cancel the execution - this should lead to smooth shutdown task.cancelExecution(); @@ -422,7 +426,7 @@ public class StreamTaskTest extends TestLogger { /** * FLINK-5667 * - * Tests that a concurrent cancel operation does not discard the state handles of an + * <p>Tests that a concurrent cancel operation does not discard the state handles of an * acknowledged checkpoint. The situation can only happen if the cancel call is executed * after Environment.acknowledgeCheckpoint() and before the * CloseableRegistry.unregisterClosable() call. @@ -534,7 +538,7 @@ public class StreamTaskTest extends TestLogger { /** * FLINK-5667 * - * Tests that a concurrent cancel operation discards the state handles of a not yet + * <p>Tests that a concurrent cancel operation discards the state handles of a not yet * acknowledged checkpoint and prevents sending an acknowledge message to the * CheckpointCoordinator. The situation can only happen if the cancel call is executed * before Environment.acknowledgeCheckpoint(). @@ -560,11 +564,11 @@ public class StreamTaskTest extends TestLogger { completeSubtask.await(); return new SubtaskState( - (ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0], - (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1], - (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2], - (KeyedStateHandle)invocation.getArguments()[3], - (KeyedStateHandle)invocation.getArguments()[4]); + (ChainedStateHandle<StreamStateHandle>) invocation.getArguments()[0], + (ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[1], + (ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[2], + (KeyedStateHandle) invocation.getArguments()[3], + (KeyedStateHandle) invocation.getArguments()[4]); } }); @@ -643,7 +647,7 @@ public class StreamTaskTest extends TestLogger { /** * FLINK-5985 * - * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This + * <p>This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint. */ @Test @@ -819,7 +823,7 @@ public class StreamTaskTest extends TestLogger { // Test operators // ------------------------------------------------------------------------ - public static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> { + private static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> { private static final long serialVersionUID = 1L; private volatile boolean canceled = false; @@ -955,7 +959,7 @@ public class StreamTaskTest extends TestLogger { } /** - * A task that locks if cancellation attempts to cleanly shut down + * A task that locks if cancellation attempts to cleanly shut down. */ public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> { @@ -973,7 +977,7 @@ public class StreamTaskTest extends TestLogger { latch.await(); // we are at the point where cancelling can happen - SYNC_LATCH.trigger(); + syncLatch.trigger(); // just put this to sleep until it is interrupted try { @@ -999,7 +1003,7 @@ public class StreamTaskTest extends TestLogger { } /** - * A task that locks if cancellation attempts to cleanly shut down + * A task that locks if cancellation attempts to cleanly shut down. */ public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> { @@ -1021,7 +1025,7 @@ public class StreamTaskTest extends TestLogger { latch.await(); // we are at the point where cancelling can happen - SYNC_LATCH.trigger(); + syncLatch.trigger(); // try to acquire the lock - this is not possible as long as the lock holder // thread lives @@ -1050,7 +1054,7 @@ public class StreamTaskTest extends TestLogger { // ------------------------------------------------------------------------ /** - * A thread that holds a lock as long as it lives + * A thread that holds a lock as long as it lives. */ private static final class LockHolder extends Thread implements Closeable { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 0be85b1..a02fe4e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.junit.Assert; @@ -44,24 +44,20 @@ import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; /** * Test harness for testing a {@link StreamTask}. * - * <p> - * This mock Invokable provides the task with a basic runtime context and allows pushing elements + * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements * and events. You are free to modify the retrieved list. * - * <p> - * After setting up everything the Task can be invoked using {@link #invoke()}. This will start + * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task * thread to finish. * - * <p> - * When using this you need to add the following line to your test class to setup Powermock: + * <p>When using this you need to add the following line to your test class to setup Powermock: * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})} */ public class StreamTaskTestHarness<OUT> { @@ -135,7 +131,7 @@ public class StreamTaskTestHarness<OUT> { * if there will only be a single operator to be tested. The method will setup the * outgoing network connection for the operator. * - * For more advanced test cases such as testing chains of multiple operators with the harness, + * <p>For more advanced test cases such as testing chains of multiple operators with the harness, * please manually configure the stream config. */ public void setupOutputForSingletonOperatorChain() { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 1f8638e..890fc23 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -37,6 +37,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link SystemProcessingTimeService}. + */ public class SystemProcessingTimeServiceTest extends TestLogger { @Test @@ -74,7 +77,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger { } /** - * Tests that the schedule at fixed rate callback is called under the given lock + * Tests that the schedule at fixed rate callback is called under the given lock. */ @Test public void testScheduleAtFixedRateHoldsLock() throws Exception { @@ -123,7 +126,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger { * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple * times. */ - @Test(timeout=10000) + @Test(timeout = 10000) public void testScheduleAtFixedRate() throws Exception { final Object lock = new Object(); final AtomicReference<Throwable> errorRef = new AtomicReference<>(); @@ -432,7 +435,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger { } }, 0L, - 100L ); + 100L); latch.await(); assertTrue(exceptionWasThrown.get()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index d465619..66531ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -42,8 +42,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}. * - * <p> - * Note:<br> + * <p>Note:<br> * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all * TwoInputStreamOperators. @@ -123,7 +122,6 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new Watermark(initialTime), 1, 0); - // now the output should still be empty testHarness.waitForInputProcessing(); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -155,7 +153,6 @@ public class TwoInputStreamTaskTest { testHarness.waitForInputProcessing(); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); @@ -279,7 +276,6 @@ public class TwoInputStreamTaskTest { expectedOutput, testHarness.getOutput()); - List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); Assert.assertEquals(4, resultElements.size()); } @@ -346,7 +342,6 @@ public class TwoInputStreamTaskTest { expectedOutput, testHarness.getOutput()); - // Then give the earlier barrier, these should be ignored testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1); testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0); @@ -354,7 +349,6 @@ public class TwoInputStreamTaskTest { testHarness.waitForInputProcessing(); - testHarness.endInput(); testHarness.waitForTaskCompletion(); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index 7ce4ab7..9b9038f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -35,13 +36,11 @@ import java.util.List; /** * Test harness for testing a {@link TwoInputStreamTask}. * - * <p> - * This mock Invokable provides the task with a basic runtime context and allows pushing elements + * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements * and events. You are free to modify the retrieved list. * - * <p> - * After setting up everything the Task can be invoked using {@link #invoke()}. This will start + * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task * thread to finish. Use {@link #processElement} * to send elements to the task. Use @@ -49,8 +48,7 @@ import java.util.List; * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task * that data entry is finished. * - * <p> - * When Elements or Events are offered to the Task they are put into a queue. The input gates + * <p>When Elements or Events are offered to the Task they are put into a queue. The input gates * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all * queues are empty. This must be used after entering some elements before checking the * desired output. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java index 77178d7..b14492e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java @@ -18,19 +18,23 @@ package org.apache.flink.streaming.util; -import com.fasterxml.jackson.databind.util.JSONPObject; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema; +import com.fasterxml.jackson.databind.util.JSONPObject; import org.junit.Test; import java.io.IOException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +/** + * Tests for {@link AbstractDeserializationSchema}. + */ @SuppressWarnings("serial") public class AbstractDeserializationSchemaTest { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 7a8488f..0a517f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; -import org.apache.flink.util.OutputTag; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; @@ -48,9 +48,9 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -64,7 +64,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.FutureUtil; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -88,19 +90,19 @@ import static org.mockito.Mockito.when; */ public class AbstractStreamOperatorTestHarness<OUT> { - final protected StreamOperator<OUT> operator; + protected final StreamOperator<OUT> operator; - final protected ConcurrentLinkedQueue<Object> outputList; + protected final ConcurrentLinkedQueue<Object> outputList; - final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists; + protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists; - final protected StreamConfig config; + protected final StreamConfig config; - final protected ExecutionConfig executionConfig; + protected final ExecutionConfig executionConfig; - final protected TestProcessingTimeService processingTimeService; + protected final TestProcessingTimeService processingTimeService; - final protected StreamTask<?, ?> mockTask; + protected final StreamTask<?, ?> mockTask; final Environment environment; @@ -291,16 +293,14 @@ public class AbstractStreamOperatorTestHarness<OUT> { } /** - * Calls - * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}. */ public void setup() { setup(null); } /** - * Calls - * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}. */ public void setup(TypeSerializer<OUT> outputSerializer) { operator.setup(mockTask, config, new MockOutput(outputSerializer)); @@ -416,17 +416,14 @@ public class AbstractStreamOperatorTestHarness<OUT> { * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test * can change arbitrarily (i.e. be able to scale both up and down). * - * <p> - * After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize + * <p>After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize * a new instance with the resulting state. Bear in mind that for parallelism greater than one, you * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}. * - * <p> - * <b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single + * <p><b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single * operator (i.e. chain length of one). * - * <p> - * For an example of how to use it, have a look at + * <p>For an example of how to use it, have a look at * {@link AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}. * * @param handles the different states to be merged. @@ -540,7 +537,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); - if(operator instanceof StreamCheckpointedOperator) { + if (operator instanceof StreamCheckpointedOperator) { ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); return outStream.closeAndGetHandle(); } else { @@ -549,7 +546,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}. */ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { operator.notifyOfCompletedCheckpoint(checkpointId); @@ -562,7 +559,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { @Deprecated @SuppressWarnings("deprecation") public void restore(StreamStateHandle snapshot) throws Exception { - if(operator instanceof StreamCheckpointedOperator) { + if (operator instanceof StreamCheckpointedOperator) { try (FSDataInputStream in = snapshot.openInputStream()) { ((StreamCheckpointedOperator) operator).restoreState(in); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java index fa68082..bd731b8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java @@ -26,6 +26,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.Collection; +/** + * Collecting {@link SourceFunction.SourceContext}. + */ public class CollectingSourceContext<T extends Serializable> implements SourceFunction.SourceContext<T> { private final Object lock; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java index bd929da..de84860 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java @@ -18,17 +18,20 @@ package org.apache.flink.streaming.util; -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; -import java.io.Serializable; +import java.io.IOException; import java.util.List; +/** + * Collecting {@link Output} for {@link StreamRecord}. + */ public class CollectorOutput<T> implements Output<StreamRecord<T>> { private final List<StreamElement> list; @@ -49,8 +52,13 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> { @Override public void collect(StreamRecord<T> record) { - T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue())); - list.add(record.copy(copied)); + try { + ClassLoader cl = record.getClass().getClassLoader(); + T copied = InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()), cl); + list.add(record.copy(copied)); + } catch (IOException | ClassNotFoundException ex) { + throw new RuntimeException("Unable to deserialize record: " + record, ex); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java index 1745c46..26da5d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java @@ -15,12 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import java.util.Arrays; +/** + * Tests for {@link OutputSelector}. + */ public class EvenOddOutputSelector implements OutputSelector<Integer> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java index f16750d..ca21c0c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java @@ -15,10 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.core.fs.Path; import org.apache.flink.util.OperatingSystem; + import org.junit.Assume; import org.junit.Before; import org.junit.Rule; @@ -33,6 +35,9 @@ import java.io.FileOutputStream; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link HDFSCopyFromLocal} and {@link HDFSCopyToLocal}. + */ public class HDFSCopyUtilitiesTest { @Rule http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index c6d0bce..8f4908a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.api.common.JobID; @@ -28,16 +29,16 @@ import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.util.Migration; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -84,7 +85,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> setupMockTaskCreateKeyedBackend(); } - public KeyedOneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, final KeySelector<IN, K> keySelector, @@ -148,7 +148,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> timestamp, streamFactory, CheckpointOptions.forFullCheckpoint()); - if(!keyedSnapshotRunnable.isDone()) { + if (!keyedSnapshotRunnable.isDone()) { Thread runner = new Thread(keyedSnapshotRunnable); runner.start(); } @@ -181,7 +181,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } - private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) { for (KeyedStateHandle handle : allKeyGroupsHandles) { if (handle instanceof Migration) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 41a083a..10c79d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.api.common.JobID; @@ -24,11 +25,11 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -91,7 +92,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1]; final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; - if(keyedStateBackend != null) { + if (keyedStateBackend != null) { keyedStateBackend.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java index 5d73015..db4fe1c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -17,17 +17,20 @@ package org.apache.flink.streaming.util; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Simple test context for stream operators. + */ public class MockContext<IN, OUT> { private List<OUT> outputs; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java index 8c3226b..f19946c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -17,16 +17,19 @@ package org.apache.flink.streaming.util; -import java.io.Serializable; -import java.util.Collection; - -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; +import java.io.IOException; +import java.util.Collection; + +/** + * Mock {@link Output} for {@link StreamRecord}. + */ public class MockOutput<T> implements Output<StreamRecord<T>> { private Collection<T> outputs; @@ -36,8 +39,13 @@ public class MockOutput<T> implements Output<StreamRecord<T>> { @Override public void collect(StreamRecord<T> record) { - T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue())); - outputs.add(copied); + try { + ClassLoader cl = record.getClass().getClassLoader(); + T copied = InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()), cl); + outputs.add(copied); + } catch (IOException | ClassNotFoundException ex) { + throw new RuntimeException("Unable to deserialize record: " + record, ex); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java index 05de3d0..c82ec7c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java @@ -15,10 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.api.common.functions.MapFunction; +/** + * Identity mapper for {@code Integer}. + */ public class NoOpIntMap implements MapFunction<Integer, Integer> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index ced8cca..652d016 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -86,7 +87,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> processElement(new StreamRecord<>(value, timestamp)); } - public void processElement(StreamRecord<IN> element) throws Exception { operator.setKeyContextElement1(element); oneInputOperator.processElement(element); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 8011279..7e32723 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -15,8 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileInputStream; @@ -26,11 +33,6 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; /** * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles}, @@ -46,7 +48,7 @@ public class OperatorSnapshotUtil { public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); - + try (DataOutputStream dos = new DataOutputStream(out)) { dos.writeInt(state.getOperatorChainIndex()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 36ade8c..5f17467 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -17,11 +17,6 @@ package org.apache.flink.streaming.util; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RichFunction; @@ -33,8 +28,17 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import static org.mockito.Mockito.*; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Utilities for {@link SourceFunction}. + */ public class SourceFunctionUtil { public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java index b46ea66..6489448 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; -import com.google.common.collect.Iterables; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Iterables; import org.junit.Assert; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 5fbe371..d0bbf8f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -25,8 +25,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** * A test harness for testing a {@link TwoInputStreamOperator}. * - * <p> - * This mock task provides the operator with a basic runtime context and allows pushing elements + * <p>This mock task provides the operator with a basic runtime context and allows pushing elements * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements * and watermarks can be retrieved. you are free to modify these. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java index a14d113..317f2e3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java @@ -31,8 +31,12 @@ import java.util.Arrays; import java.util.Date; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +/** + * Tests for {@link TypeInformationSerializationSchema}. + */ public class TypeInformationSerializationSchemaTest { @Test @@ -82,7 +86,7 @@ public class TypeInformationSerializationSchemaTest { // Test data types // ------------------------------------------------------------------------ - public static class MyPOJO { + private static class MyPOJO { public int aField; public List<Date> aList; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java index 5e363e9..637c4ba 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java @@ -22,10 +22,15 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; + import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +/** + * Tests key selectors on arrays. + */ public class ArrayKeySelectorTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java index 74b1d18..6081ed1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java @@ -19,12 +19,14 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.core.testutils.CommonTestUtils; + import org.junit.Test; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; /** * Tests for the {@link SimpleStringSchema}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java index 5e7dd35..2fb7964 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.util.typeutils; -import static org.junit.Assert.*; - import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -30,8 +28,14 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for field accessors. + */ public class FieldAccessorTest { // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. @@ -59,7 +63,6 @@ public class FieldAccessorTest { assertEquals("b", f0.get(t)); assertEquals("b", t.f0); - FieldAccessor<Tuple2<String, Integer>, Integer> f1n = FieldAccessorFactory.getAccessor(tpeInfo, 1, null); assertEquals(7, (int) f1n.get(t)); assertEquals(7, (int) t.f1); @@ -81,11 +84,11 @@ public class FieldAccessorTest { assertEquals("b", t.f0); // This is technically valid (the ".0" is selecting the 0th field of a basic type). - FieldAccessor<Tuple2<String, Integer>, String> f0_0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null); - assertEquals("b", f0_0.get(t)); + FieldAccessor<Tuple2<String, Integer>, String> f0f0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null); + assertEquals("b", f0f0.get(t)); assertEquals("b", t.f0); - t = f0_0.set(t, "cc"); - assertEquals("cc", f0_0.get(t)); + t = f0f0.set(t, "cc"); + assertEquals("cc", f0f0.get(t)); assertEquals("cc", t.f0); } @@ -103,7 +106,7 @@ public class FieldAccessorTest { public void testTupleInTuple() { Tuple2<String, Tuple3<Integer, Long, Double>> t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> tpeInfo = - (TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>)TypeExtractor.getForObject(t); + (TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>) TypeExtractor.getForObject(t); FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, String> f0 = FieldAccessorFactory .getAccessor(tpeInfo, "f0", null); @@ -148,6 +151,9 @@ public class FieldAccessorTest { FieldAccessorFactory.getAccessor(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class), 2, null); } + /** + * POJO. + */ public static class Foo { public int x; public Tuple2<String, Long> t; @@ -203,6 +209,9 @@ public class FieldAccessorTest { FieldAccessorFactory.getAccessor(tpeInfo, "illegal.illegal.illegal", null); } + /** + * POJO for testing field access. + */ public static class Inner { public long x; public boolean b; @@ -220,16 +229,19 @@ public class FieldAccessorTest { @Override public String toString() { - return ((Long)x).toString() + ", " + b; + return ((Long) x).toString() + ", " + b; } } + /** + * POJO containing POJO. + */ public static class Outer { public int a; public Inner i; public short b; - public Outer(){} + public Outer() {} public Outer(int a, Inner i, short b) { this.a = a; @@ -239,13 +251,13 @@ public class FieldAccessorTest { @Override public String toString() { - return a+", "+i.toString()+", "+b; + return a + ", " + i.toString() + ", " + b; } } @Test public void testPojoInPojo() { - Outer o = new Outer(10, new Inner(4L), (short)12); + Outer o = new Outer(10, new Inner(4L), (short) 12); PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>) TypeInformation.of(Outer.class); FieldAccessor<Outer, Long> fix = FieldAccessorFactory.getAccessor(tpeInfo, "i.x", null); @@ -268,21 +280,19 @@ public class FieldAccessorTest { @Test @SuppressWarnings("unchecked") public void testArray() { - int[] a = new int[]{3,5}; + int[] a = new int[]{3, 5}; FieldAccessor<int[], Integer> fieldAccessor = (FieldAccessor<int[], Integer>) (Object) FieldAccessorFactory.getAccessor(PrimitiveArrayTypeInfo.getInfoFor(a.getClass()), 1, null); assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass()); - assertEquals((Integer)a[1], fieldAccessor.get(a)); + assertEquals((Integer) a[1], fieldAccessor.get(a)); a = fieldAccessor.set(a, 6); - assertEquals((Integer)a[1], fieldAccessor.get(a)); - - + assertEquals((Integer) a[1], fieldAccessor.get(a)); - Integer[] b = new Integer[]{3,5}; + Integer[] b = new Integer[]{3, 5}; FieldAccessor<Integer[], Integer> fieldAccessor2 = (FieldAccessor<Integer[], Integer>) (Object) FieldAccessorFactory.getAccessor(BasicArrayTypeInfo.getInfoFor(b.getClass()), 1, null); @@ -295,6 +305,9 @@ public class FieldAccessorTest { assertEquals(b[1], fieldAccessor2.get(b)); } + /** + * POJO with array. + */ public static class ArrayInPojo { public long x; public int[] arr; @@ -311,8 +324,8 @@ public class FieldAccessorTest { @Test public void testArrayInPojo() { - ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12); - PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class); + ArrayInPojo o = new ArrayInPojo(10L, new int[]{3, 4, 5}, 12); + PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>) TypeInformation.of(ArrayInPojo.class); FieldAccessor<ArrayInPojo, Integer> fix = FieldAccessorFactory.getAccessor(tpeInfo, "arr.1", null); assertEquals(4, (int) fix.get(o)); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/tools/maven/strict-checkstyle.xml ---------------------------------------------------------------------- diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml index 9c58917..0c11aa9 100644 --- a/tools/maven/strict-checkstyle.xml +++ b/tools/maven/strict-checkstyle.xml @@ -36,7 +36,7 @@ This file is based on the checkstyle file of Apache Beam. <!--</module>--> <module name="NewlineAtEndOfFile"> - <!-- windows can use \n\r vs \n, so enforce the most used one ie UNIx style --> + <!-- windows can use \r\n vs \n, so enforce the most used one ie UNIx style --> <property name="lineSeparator" value="lf" /> </module> @@ -84,7 +84,7 @@ This file is based on the checkstyle file of Apache Beam. --> <module name="FileLength"> - <property name="max" value="2500"/> + <property name="max" value="3000"/> </module> <!-- All Java AST specific tests live under TreeWalker module. --> @@ -197,9 +197,11 @@ This file is based on the checkstyle file of Apache Beam. <module name="ImportOrder"> <!-- Checks for out of order import statements. --> <property name="severity" value="error"/> - <!-- This ensures that static imports go first. --> - <property name="option" value="top"/> + <!-- Flink imports first, then other imports, then javax and java imports, then static imports. --> + <property name="groups" value="org.apache.flink,*,javax,java"/> + <property name="separated" value="true"/> <property name="sortStaticImportsAlphabetically" value="true"/> + <property name="option" value="bottom"/> <property name="tokens" value="STATIC_IMPORT, IMPORT"/> <message key="import.ordering" value="Import {0} appears after other imports that it should precede"/> @@ -478,6 +480,18 @@ This file is based on the checkstyle file of Apache Beam. --> + <module name="EmptyLineSeparator"> + <!-- Checks for empty line separator between tokens. The only + excluded token is VARIABLE_DEF, allowing class fields to + be declared on consecutive lines. + --> + <property name="allowMultipleEmptyLines" value="false"/> + <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/> + <property name="tokens" value="PACKAGE_DEF, IMPORT, CLASS_DEF, + INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF, + CTOR_DEF"/> + </module> + <module name="WhitespaceAround"> <!-- Checks that various tokens are surrounded by whitespace. This includes most binary operators and keywords followed
