Repository: flink
Updated Branches:
  refs/heads/master 7efa8ad34 -> 12b4185c6


http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index a9d2ddf..5b995c6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -73,6 +72,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Mock {@link Environment}.
+ */
 public class StreamMockEnvironment implements Environment {
 
        private final TaskInfo taskInfo;
@@ -106,7 +108,7 @@ public class StreamMockEnvironment implements Environment {
        private volatile boolean wasFailedExternally = false;
 
        public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, ExecutionConfig executionConfig,
-                                                                long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+                                                               long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this.taskInfo = new TaskInfo(
                        "", /* task name */
                        1, /* num key groups / max parallelism */
@@ -131,7 +133,7 @@ public class StreamMockEnvironment implements Environment {
        }
 
        public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, long memorySize,
-                                                                
MockInputSplitProvider inputSplitProvider, int bufferSize) {
+                                                               
MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this(jobConfig, taskConfig, new ExecutionConfig(), memorySize, 
inputSplitProvider, bufferSize);
        }
 
@@ -183,7 +185,6 @@ public class StreamMockEnvironment implements Environment {
                                }
                        
}).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
 
-
                        outputs.add(mockWriter);
                }
                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index f8d5393..6e3c299 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -41,6 +42,9 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Test checkpoint cancellation barrier.
+ */
 public class StreamTaskCancellationBarrierTest {
 
        /**
@@ -71,9 +75,8 @@ public class StreamTaskCancellationBarrierTest {
        /**
         * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
         * receiving a checkpoint cancellation barrier:
-        *
         *   - send a "decline checkpoint" notification out (to the JobManager)
-        *   - emit a cancellation barrier downstream
+        *   - emit a cancellation barrier downstream.
         */
        @Test
        public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
@@ -115,11 +118,10 @@ public class StreamTaskCancellationBarrierTest {
        }
 
        /**
-        * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
+        * This test verifies (for one input tasks) that the Stream tasks react 
the following way to
         * receiving a checkpoint cancellation barrier:
-        *
         *   - send a "decline checkpoint" notification out (to the JobManager)
-        *   - emit a cancellation barrier downstream
+        *   - emit a cancellation barrier downstream.
         */
        @Test
        public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 546188e..8957255 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -88,6 +87,8 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
+import akka.dispatch.Futures;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -137,13 +138,16 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
+/**
+ * Tests for {@link StreamTask}.
+ */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StreamTask.class)
 @PowerMockIgnore("org.apache.log4j.*")
 @SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
-       private static OneShotLatch SYNC_LATCH;
+       private static OneShotLatch syncLatch;
 
        /**
         * This test checks that cancel calls that are issued before the 
operator is
@@ -242,7 +246,7 @@ public class StreamTaskTest extends TestLogger {
 
        @Test
        public void testCancellationNotBlockedOnLock() throws Exception {
-               SYNC_LATCH = new OneShotLatch();
+               syncLatch = new OneShotLatch();
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
@@ -251,7 +255,7 @@ public class StreamTaskTest extends TestLogger {
                // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
                // is entered
                task.startTaskThread();
-               SYNC_LATCH.await();
+               syncLatch.await();
 
                // cancel the execution - this should lead to smooth shutdown
                task.cancelExecution();
@@ -262,7 +266,7 @@ public class StreamTaskTest extends TestLogger {
 
        @Test
        public void testCancellationFailsWithBlockingLock() throws Exception {
-               SYNC_LATCH = new OneShotLatch();
+               syncLatch = new OneShotLatch();
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                Task task = createTask(CancelFailingTask.class, cfg, new 
Configuration());
@@ -271,7 +275,7 @@ public class StreamTaskTest extends TestLogger {
                // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
                // is entered
                task.startTaskThread();
-               SYNC_LATCH.await();
+               syncLatch.await();
 
                // cancel the execution - this should lead to smooth shutdown
                task.cancelExecution();
@@ -422,7 +426,7 @@ public class StreamTaskTest extends TestLogger {
        /**
         * FLINK-5667
         *
-        * Tests that a concurrent cancel operation does not discard the state 
handles of an
+        * <p>Tests that a concurrent cancel operation does not discard the 
state handles of an
         * acknowledged checkpoint. The situation can only happen if the cancel 
call is executed
         * after Environment.acknowledgeCheckpoint() and before the
         * CloseableRegistry.unregisterClosable() call.
@@ -534,7 +538,7 @@ public class StreamTaskTest extends TestLogger {
        /**
         * FLINK-5667
         *
-        * Tests that a concurrent cancel operation discards the state handles 
of a not yet
+        * <p>Tests that a concurrent cancel operation discards the state 
handles of a not yet
         * acknowledged checkpoint and prevents sending an acknowledge message 
to the
         * CheckpointCoordinator. The situation can only happen if the cancel 
call is executed
         * before Environment.acknowledgeCheckpoint().
@@ -560,11 +564,11 @@ public class StreamTaskTest extends TestLogger {
                                completeSubtask.await();
 
                                return new SubtaskState(
-                                       
(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
-                                       
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
-                                       
(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
-                                       
(KeyedStateHandle)invocation.getArguments()[3],
-                                       
(KeyedStateHandle)invocation.getArguments()[4]);
+                                       (ChainedStateHandle<StreamStateHandle>) 
invocation.getArguments()[0],
+                                       
(ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[1],
+                                       
(ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[2],
+                                       (KeyedStateHandle) 
invocation.getArguments()[3],
+                                       (KeyedStateHandle) 
invocation.getArguments()[4]);
                        }
                });
 
@@ -643,7 +647,7 @@ public class StreamTaskTest extends TestLogger {
        /**
         * FLINK-5985
         *
-        * This test ensures that empty snapshots (no op/keyed stated 
whatsoever) will be reported as stateless tasks. This
+        * <p>This test ensures that empty snapshots (no op/keyed stated 
whatsoever) will be reported as stateless tasks. This
         * happens by translating an empty {@link SubtaskState} into reporting 
'null' to #acknowledgeCheckpoint.
         */
        @Test
@@ -819,7 +823,7 @@ public class StreamTaskTest extends TestLogger {
        //  Test operators
        // 
------------------------------------------------------------------------
 
-       public static class SlowlyDeserializingOperator extends 
StreamSource<Long, SourceFunction<Long>> {
+       private static class SlowlyDeserializingOperator extends 
StreamSource<Long, SourceFunction<Long>> {
                private static final long serialVersionUID = 1L;
 
                private volatile boolean canceled = false;
@@ -955,7 +959,7 @@ public class StreamTaskTest extends TestLogger {
        }
 
        /**
-        * A task that locks if cancellation attempts to cleanly shut down
+        * A task that locks if cancellation attempts to cleanly shut down.
         */
        public static class CancelLockingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
 
@@ -973,7 +977,7 @@ public class StreamTaskTest extends TestLogger {
                        latch.await();
 
                        // we are at the point where cancelling can happen
-                       SYNC_LATCH.trigger();
+                       syncLatch.trigger();
 
                        // just put this to sleep until it is interrupted
                        try {
@@ -999,7 +1003,7 @@ public class StreamTaskTest extends TestLogger {
        }
 
        /**
-        * A task that locks if cancellation attempts to cleanly shut down
+        * A task that locks if cancellation attempts to cleanly shut down.
         */
        public static class CancelFailingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
 
@@ -1021,7 +1025,7 @@ public class StreamTaskTest extends TestLogger {
                                latch.await();
 
                                // we are at the point where cancelling can 
happen
-                               SYNC_LATCH.trigger();
+                               syncLatch.trigger();
 
                                // try to acquire the lock - this is not 
possible as long as the lock holder
                                // thread lives
@@ -1050,7 +1054,7 @@ public class StreamTaskTest extends TestLogger {
        // 
------------------------------------------------------------------------
 
        /**
-        * A thread that holds a lock as long as it lives
+        * A thread that holds a lock as long as it lives.
         */
        private static final class LockHolder extends Thread implements 
Closeable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0be85b1..a02fe4e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
 import org.junit.Assert;
 
@@ -44,24 +44,20 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Test harness for testing a {@link StreamTask}.
  *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
+ * <p>This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the 
emitted elements
  * and events. You are free to modify the retrieved list.
  *
- * <p>
- * After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
+ * <p>After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to 
wait for the Task
  * thread to finish.
  *
- * <p>
- * When using this you need to add the following line to your test class to 
setup Powermock:
+ * <p>When using this you need to add the following line to your test class to 
setup Powermock:
  * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})}
  */
 public class StreamTaskTestHarness<OUT> {
@@ -135,7 +131,7 @@ public class StreamTaskTestHarness<OUT> {
         * if there will only be a single operator to be tested. The method 
will setup the
         * outgoing network connection for the operator.
         *
-        * For more advanced test cases such as testing chains of multiple 
operators with the harness,
+        * <p>For more advanced test cases such as testing chains of multiple 
operators with the harness,
         * please manually configure the stream config.
         */
        public void setupOutputForSingletonOperatorChain() {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 1f8638e..890fc23 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -37,6 +37,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link SystemProcessingTimeService}.
+ */
 public class SystemProcessingTimeServiceTest extends TestLogger {
 
        @Test
@@ -74,7 +77,7 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
        }
 
        /**
-        * Tests that the schedule at fixed rate callback is called under the 
given lock
+        * Tests that the schedule at fixed rate callback is called under the 
given lock.
         */
        @Test
        public void testScheduleAtFixedRateHoldsLock() throws Exception {
@@ -123,7 +126,7 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
         * Tests that SystemProcessingTimeService#scheduleAtFixedRate is 
actually triggered multiple
         * times.
         */
-       @Test(timeout=10000)
+       @Test(timeout = 10000)
        public void testScheduleAtFixedRate() throws Exception {
                final Object lock = new Object();
                final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
@@ -432,7 +435,7 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                        }
                },
                        0L,
-                       100L    );
+                       100L);
 
                latch.await();
                assertTrue(exceptionWasThrown.get());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index d465619..66531ac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -42,8 +42,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  * Tests for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
  * implicitly also test the {@link 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
  *
- * <p>
- * Note:<br>
+ * <p>Note:<br>
  * We only use a {@link CoStreamMap} operator here. We also test the 
individual operators but Map is
  * used as a representative to test TwoInputStreamTask, since 
TwoInputStreamTask is used for all
  * TwoInputStreamOperators.
@@ -123,7 +122,6 @@ public class TwoInputStreamTaskTest {
 
                testHarness.processElement(new Watermark(initialTime), 1, 0);
 
-
                // now the output should still be empty
                testHarness.waitForInputProcessing();
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
@@ -155,7 +153,6 @@ public class TwoInputStreamTaskTest {
                testHarness.waitForInputProcessing();
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
-
                // advance watermark from one of the inputs, now we should get 
a new one since the
                // minimum increases
                testHarness.processElement(new Watermark(initialTime + 4), 1, 
1);
@@ -279,7 +276,6 @@ public class TwoInputStreamTaskTest {
                                expectedOutput,
                                testHarness.getOutput());
 
-
                List<String> resultElements = 
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
                Assert.assertEquals(4, resultElements.size());
        }
@@ -346,7 +342,6 @@ public class TwoInputStreamTaskTest {
                                expectedOutput,
                                testHarness.getOutput());
 
-
                // Then give the earlier barrier, these should be ignored
                testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 0, 1);
                testHarness.processEvent(new CheckpointBarrier(0, 0, 
CheckpointOptions.forFullCheckpoint()), 1, 0);
@@ -354,7 +349,6 @@ public class TwoInputStreamTaskTest {
 
                testHarness.waitForInputProcessing();
 
-
                testHarness.endInput();
 
                testHarness.waitForTaskCompletion();

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 7ce4ab7..9b9038f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,13 +36,11 @@ import java.util.List;
 /**
  * Test harness for testing a {@link TwoInputStreamTask}.
  *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
+ * <p>This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the 
emitted elements
  * and events. You are free to modify the retrieved list.
  *
- * <p>
- * After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
+ * <p>After setting up everything the Task can be invoked using {@link 
#invoke()}. This will start
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to 
wait for the Task
  * thread to finish. Use {@link #processElement}
  * to send elements to the task. Use
@@ -49,8 +48,7 @@ import java.util.List;
  * Before waiting for the task to finish you must call {@link #endInput()} to 
signal to the task
  * that data entry is finished.
  *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. 
The input gates
+ * <p>When Elements or Events are offered to the Task they are put into a 
queue. The input gates
  * of the Task read from this queue. Use {@link #waitForInputProcessing()} to 
wait until all
  * queues are empty. This must be used after entering some elements before 
checking the
  * desired output.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
index 77178d7..b14492e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -18,19 +18,23 @@
 
 package org.apache.flink.streaming.util;
 
-import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
 
+import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.junit.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link AbstractDeserializationSchema}.
+ */
 @SuppressWarnings("serial")
 public class AbstractDeserializationSchemaTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7a8488f..0a517f0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -48,9 +48,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -64,7 +64,9 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -88,19 +90,19 @@ import static org.mockito.Mockito.when;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-       final protected StreamOperator<OUT> operator;
+       protected final StreamOperator<OUT> operator;
 
-       final protected ConcurrentLinkedQueue<Object> outputList;
+       protected final ConcurrentLinkedQueue<Object> outputList;
 
-       final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> 
sideOutputLists;
+       protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> 
sideOutputLists;
 
-       final protected StreamConfig config;
+       protected final StreamConfig config;
 
-       final protected ExecutionConfig executionConfig;
+       protected final ExecutionConfig executionConfig;
 
-       final protected TestProcessingTimeService processingTimeService;
+       protected final TestProcessingTimeService processingTimeService;
 
-       final protected StreamTask<?, ?> mockTask;
+       protected final StreamTask<?, ?> mockTask;
 
        final Environment environment;
 
@@ -291,16 +293,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        }
 
        /**
-        * Calls
-        * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+        * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} 
()}.
         */
        public void setup() {
                setup(null);
        }
 
        /**
-        * Calls
-        * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+        * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} 
()}.
         */
        public void setup(TypeSerializer<OUT> outputSerializer) {
                operator.setup(mockTask, config, new 
MockOutput(outputSerializer));
@@ -416,17 +416,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
         * and repacks them into a single {@link OperatorStateHandles} so that 
the parallelism of the test
         * can change arbitrarily (i.e. be able to scale both up and down).
         *
-        * <p>
-        * After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
+        * <p>After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
         * a new instance with the resulting state. Bear in mind that for 
parallelism greater than one, you
         * have to use the constructor {@link 
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
         *
-        * <p>
-        * <b>NOTE: </b> each of the {@code handles} in the argument list is 
assumed to be from a single task of a single
+        * <p><b>NOTE: </b> each of the {@code handles} in the argument list is 
assumed to be from a single task of a single
         * operator (i.e. chain length of one).
         *
-        * <p>
-        * For an example of how to use it, have a look at
+        * <p>For an example of how to use it, have a look at
         * {@link 
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
         *
         * @param handles the different states to be merged.
@@ -540,7 +537,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
                                new JobID(),
                                
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-               if(operator instanceof StreamCheckpointedOperator) {
+               if (operator instanceof StreamCheckpointedOperator) {
                        ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
                        return outStream.closeAndGetHandle();
                } else {
@@ -549,7 +546,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
 ()}
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
 ()}.
         */
        public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
                operator.notifyOfCompletedCheckpoint(checkpointId);
@@ -562,7 +559,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        @Deprecated
        @SuppressWarnings("deprecation")
        public void restore(StreamStateHandle snapshot) throws Exception {
-               if(operator instanceof StreamCheckpointedOperator) {
+               if (operator instanceof StreamCheckpointedOperator) {
                        try (FSDataInputStream in = snapshot.openInputStream()) 
{
                                ((StreamCheckpointedOperator) 
operator).restoreState(in);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index fa68082..bd731b8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -26,6 +26,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * Collecting {@link SourceFunction.SourceContext}.
+ */
 public class CollectingSourceContext<T extends Serializable> implements 
SourceFunction.SourceContext<T> {
 
        private final Object lock;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index bd929da..de84860 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 
-import java.io.Serializable;
+import java.io.IOException;
 import java.util.List;
 
+/**
+ * Collecting {@link Output} for {@link StreamRecord}.
+ */
 public class CollectorOutput<T> implements Output<StreamRecord<T>> {
 
        private final List<StreamElement> list;
@@ -49,8 +52,13 @@ public class CollectorOutput<T> implements 
Output<StreamRecord<T>> {
 
        @Override
        public void collect(StreamRecord<T> record) {
-               T copied = 
SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) 
record.getValue()));
-               list.add(record.copy(copied));
+               try {
+                       ClassLoader cl = record.getClass().getClassLoader();
+                       T copied = 
InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()),
 cl);
+                       list.add(record.copy(copied));
+               } catch (IOException | ClassNotFoundException ex) {
+                       throw new RuntimeException("Unable to deserialize 
record: " + record, ex);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
index 1745c46..26da5d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 
 import java.util.Arrays;
 
+/**
+ * Tests for {@link OutputSelector}.
+ */
 public class EvenOddOutputSelector implements OutputSelector<Integer> {
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
index f16750d..ca21c0c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
@@ -33,6 +35,9 @@ import java.io.FileOutputStream;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link HDFSCopyFromLocal} and {@link HDFSCopyToLocal}.
+ */
 public class HDFSCopyUtilitiesTest {
 
        @Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index c6d0bce..8f4908a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobID;
@@ -28,16 +29,16 @@ import 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Migration;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -84,7 +85,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                setupMockTaskCreateKeyedBackend();
        }
 
-
        public KeyedOneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        final KeySelector<IN, K> keySelector,
@@ -148,7 +148,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                                        timestamp,
                                        streamFactory,
                                        CheckpointOptions.forFullCheckpoint());
-                       if(!keyedSnapshotRunnable.isDone()) {
+                       if (!keyedSnapshotRunnable.isDone()) {
                                Thread runner = new 
Thread(keyedSnapshotRunnable);
                                runner.start();
                        }
@@ -181,7 +181,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                }
        }
 
-
        private static boolean hasMigrationHandles(Collection<KeyedStateHandle> 
allKeyGroupsHandles) {
                for (KeyedStateHandle handle : allKeyGroupsHandles) {
                        if (handle instanceof Migration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 41a083a..10c79d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobID;
@@ -24,11 +25,11 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -91,7 +92,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
                                        final int numberOfKeyGroups = (Integer) 
invocationOnMock.getArguments()[1];
                                        final KeyGroupRange keyGroupRange = 
(KeyGroupRange) invocationOnMock.getArguments()[2];
 
-                                       if(keyedStateBackend != null) {
+                                       if (keyedStateBackend != null) {
                                                keyedStateBackend.close();
                                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 5d73015..db4fe1c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -17,17 +17,20 @@
 
 package org.apache.flink.streaming.util;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Simple test context for stream operators.
+ */
 public class MockContext<IN, OUT> {
 
        private List<OUT> outputs;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 8c3226b..f19946c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -17,16 +17,19 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Mock {@link Output} for {@link StreamRecord}.
+ */
 public class MockOutput<T> implements Output<StreamRecord<T>> {
        private Collection<T> outputs;
 
@@ -36,8 +39,13 @@ public class MockOutput<T> implements 
Output<StreamRecord<T>> {
 
        @Override
        public void collect(StreamRecord<T> record) {
-               T copied = 
SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) 
record.getValue()));
-               outputs.add(copied);
+               try {
+                       ClassLoader cl = record.getClass().getClassLoader();
+                       T copied = 
InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()),
 cl);
+                       outputs.add(copied);
+               } catch (IOException | ClassNotFoundException ex) {
+                       throw new RuntimeException("Unable to deserialize 
record: " + record, ex);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
index 05de3d0..c82ec7c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 
+/**
+ * Identity mapper for {@code Integer}.
+ */
 public class NoOpIntMap implements MapFunction<Integer, Integer> {
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index ced8cca..652d016 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -86,7 +87,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                processElement(new StreamRecord<>(value, timestamp));
        }
 
-
        public void processElement(StreamRecord<IN> element) throws Exception {
                operator.setKeyContextElement1(element);
                oneInputOperator.processElement(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 8011279..7e32723 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -15,8 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
@@ -26,11 +33,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 /**
  * Util for writing/reading {@link 
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
@@ -46,7 +48,7 @@ public class OperatorSnapshotUtil {
 
        public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
                FileOutputStream out = new FileOutputStream(path);
-               
+
                try (DataOutputStream dos = new DataOutputStream(out)) {
 
                        dos.writeInt(state.getOperatorChainIndex());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 36ade8c..5f17467 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -33,8 +28,17 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import static org.mockito.Mockito.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Utilities for {@link SourceFunction}.
+ */
 public class SourceFunctionUtil {
 
        public static <T extends Serializable> List<T> 
runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index b46ea66..6489448 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
-import com.google.common.collect.Iterables;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Iterables;
 import org.junit.Assert;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 5fbe371..d0bbf8f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -25,8 +25,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * A test harness for testing a {@link TwoInputStreamOperator}.
  *
- * <p>
- * This mock task provides the operator with a basic runtime context and 
allows pushing elements
+ * <p>This mock task provides the operator with a basic runtime context and 
allows pushing elements
  * and watermarks into the operator. {@link java.util.Deque}s containing the 
emitted elements
  * and watermarks can be retrieved. you are free to modify these.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index a14d113..317f2e3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -31,8 +31,12 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link TypeInformationSerializationSchema}.
+ */
 public class TypeInformationSerializationSchemaTest {
 
        @Test
@@ -82,7 +86,7 @@ public class TypeInformationSerializationSchemaTest {
        //  Test data types
        // 
------------------------------------------------------------------------
 
-       public static class MyPOJO {
+       private static class MyPOJO {
 
                public int aField;
                public List<Date> aList;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
index 5e363e9..637c4ba 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
@@ -22,10 +22,15 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests key selectors on arrays.
+ */
 public class ArrayKeySelectorTest {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
index 74b1d18..6081ed1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+
 import org.junit.Test;
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for the {@link SimpleStringSchema}.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
index 5e7dd35..2fb7964 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.util.typeutils;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -30,8 +28,14 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for field accessors.
+ */
 public class FieldAccessorTest {
 
        // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
@@ -59,7 +63,6 @@ public class FieldAccessorTest {
                assertEquals("b", f0.get(t));
                assertEquals("b", t.f0);
 
-
                FieldAccessor<Tuple2<String, Integer>, Integer> f1n = 
FieldAccessorFactory.getAccessor(tpeInfo, 1, null);
                assertEquals(7, (int) f1n.get(t));
                assertEquals(7, (int) t.f1);
@@ -81,11 +84,11 @@ public class FieldAccessorTest {
                assertEquals("b", t.f0);
 
                // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
-               FieldAccessor<Tuple2<String, Integer>, String> f0_0 = 
FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null);
-               assertEquals("b", f0_0.get(t));
+               FieldAccessor<Tuple2<String, Integer>, String> f0f0 = 
FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null);
+               assertEquals("b", f0f0.get(t));
                assertEquals("b", t.f0);
-               t = f0_0.set(t, "cc");
-               assertEquals("cc", f0_0.get(t));
+               t = f0f0.set(t, "cc");
+               assertEquals("cc", f0f0.get(t));
                assertEquals("cc", t.f0);
 
        }
@@ -103,7 +106,7 @@ public class FieldAccessorTest {
        public void testTupleInTuple() {
                Tuple2<String, Tuple3<Integer, Long, Double>> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
                TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> 
tpeInfo =
-                               (TupleTypeInfo<Tuple2<String, Tuple3<Integer, 
Long, Double>>>)TypeExtractor.getForObject(t);
+                               (TupleTypeInfo<Tuple2<String, Tuple3<Integer, 
Long, Double>>>) TypeExtractor.getForObject(t);
 
                FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, 
String> f0 = FieldAccessorFactory
                        .getAccessor(tpeInfo, "f0", null);
@@ -148,6 +151,9 @@ public class FieldAccessorTest {
                
FieldAccessorFactory.getAccessor(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
 Integer.class), 2, null);
        }
 
+       /**
+        * POJO.
+        */
        public static class Foo {
                public int x;
                public Tuple2<String, Long> t;
@@ -203,6 +209,9 @@ public class FieldAccessorTest {
                FieldAccessorFactory.getAccessor(tpeInfo, 
"illegal.illegal.illegal", null);
        }
 
+       /**
+        * POJO for testing field access.
+        */
        public static class Inner {
                public long x;
                public boolean b;
@@ -220,16 +229,19 @@ public class FieldAccessorTest {
 
                @Override
                public String toString() {
-                       return ((Long)x).toString() + ", " + b;
+                       return ((Long) x).toString() + ", " + b;
                }
        }
 
+       /**
+        * POJO containing POJO.
+        */
        public static class Outer {
                public int a;
                public Inner i;
                public short b;
 
-               public Outer(){}
+               public Outer() {}
 
                public Outer(int a, Inner i, short b) {
                        this.a = a;
@@ -239,13 +251,13 @@ public class FieldAccessorTest {
 
                @Override
                public String toString() {
-                       return a+", "+i.toString()+", "+b;
+                       return a + ", " + i.toString() + ", " + b;
                }
        }
 
        @Test
        public void testPojoInPojo() {
-               Outer o = new Outer(10, new Inner(4L), (short)12);
+               Outer o = new Outer(10, new Inner(4L), (short) 12);
                PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>) 
TypeInformation.of(Outer.class);
 
                FieldAccessor<Outer, Long> fix = 
FieldAccessorFactory.getAccessor(tpeInfo, "i.x", null);
@@ -268,21 +280,19 @@ public class FieldAccessorTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testArray() {
-               int[] a = new int[]{3,5};
+               int[] a = new int[]{3, 5};
                FieldAccessor<int[], Integer> fieldAccessor =
                                (FieldAccessor<int[], Integer>) (Object)
                                                
FieldAccessorFactory.getAccessor(PrimitiveArrayTypeInfo.getInfoFor(a.getClass()),
 1, null);
 
                assertEquals(Integer.class, 
fieldAccessor.getFieldType().getTypeClass());
 
-               assertEquals((Integer)a[1], fieldAccessor.get(a));
+               assertEquals((Integer) a[1], fieldAccessor.get(a));
 
                a = fieldAccessor.set(a, 6);
-               assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-
+               assertEquals((Integer) a[1], fieldAccessor.get(a));
 
-               Integer[] b = new Integer[]{3,5};
+               Integer[] b = new Integer[]{3, 5};
                FieldAccessor<Integer[], Integer> fieldAccessor2 =
                                (FieldAccessor<Integer[], Integer>) (Object)
                                                
FieldAccessorFactory.getAccessor(BasicArrayTypeInfo.getInfoFor(b.getClass()), 
1, null);
@@ -295,6 +305,9 @@ public class FieldAccessorTest {
                assertEquals(b[1], fieldAccessor2.get(b));
        }
 
+       /**
+        * POJO with array.
+        */
        public static class ArrayInPojo {
                public long x;
                public int[] arr;
@@ -311,8 +324,8 @@ public class FieldAccessorTest {
 
        @Test
        public void testArrayInPojo() {
-               ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12);
-               PojoTypeInfo<ArrayInPojo> tpeInfo = 
(PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class);
+               ArrayInPojo o = new ArrayInPojo(10L, new int[]{3, 4, 5}, 12);
+               PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>) 
TypeInformation.of(ArrayInPojo.class);
 
                FieldAccessor<ArrayInPojo, Integer> fix = 
FieldAccessorFactory.getAccessor(tpeInfo, "arr.1", null);
                assertEquals(4, (int) fix.get(o));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/tools/maven/strict-checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/strict-checkstyle.xml 
b/tools/maven/strict-checkstyle.xml
index 9c58917..0c11aa9 100644
--- a/tools/maven/strict-checkstyle.xml
+++ b/tools/maven/strict-checkstyle.xml
@@ -36,7 +36,7 @@ This file is based on the checkstyle file of Apache Beam.
   <!--</module>-->
 
   <module name="NewlineAtEndOfFile">
-    <!-- windows can use \n\r vs \n, so enforce the most used one ie UNIx 
style -->
+    <!-- windows can use \r\n vs \n, so enforce the most used one ie UNIx 
style -->
     <property name="lineSeparator" value="lf" />
   </module>
 
@@ -84,7 +84,7 @@ This file is based on the checkstyle file of Apache Beam.
   -->
 
   <module name="FileLength">
-    <property name="max" value="2500"/>
+    <property name="max" value="3000"/>
   </module>
 
   <!-- All Java AST specific tests live under TreeWalker module. -->
@@ -197,9 +197,11 @@ This file is based on the checkstyle file of Apache Beam.
     <module name="ImportOrder">
       <!-- Checks for out of order import statements. -->
       <property name="severity" value="error"/>
-      <!-- This ensures that static imports go first. -->
-      <property name="option" value="top"/>
+      <!-- Flink imports first, then other imports, then javax and java 
imports, then static imports. -->
+      <property name="groups" value="org.apache.flink,*,javax,java"/>
+      <property name="separated" value="true"/>
       <property name="sortStaticImportsAlphabetically" value="true"/>
+      <property name="option" value="bottom"/>
       <property name="tokens" value="STATIC_IMPORT, IMPORT"/>
       <message key="import.ordering"
                value="Import {0} appears after other imports that it should 
precede"/>
@@ -478,6 +480,18 @@ This file is based on the checkstyle file of Apache Beam.
 
     -->
 
+    <module name="EmptyLineSeparator">
+         <!-- Checks for empty line separator between tokens. The only
+           excluded token is VARIABLE_DEF, allowing class fields to
+           be declared on consecutive lines.
+      -->
+         <property name="allowMultipleEmptyLines" value="false"/>
+         <property name="allowMultipleEmptyLinesInsideClassMembers" 
value="false"/>
+         <property name="tokens" value="PACKAGE_DEF, IMPORT, CLASS_DEF,
+           INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
+           CTOR_DEF"/>
+    </module>
+
     <module name="WhitespaceAround">
       <!-- Checks that various tokens are surrounded by whitespace.
            This includes most binary operators and keywords followed

Reply via email to