http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 6dcb56b..e14430e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -17,6 +17,7 @@ */ // We have it in this package because we could not mock the methods otherwise + package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -33,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -92,7 +94,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>(); inputChannels[channelIndex] = new TestInputChannel(inputGate, i); - final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() { @Override public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { @@ -178,7 +179,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { return true; } - public static class InputValue<T> { + private static class InputValue<T> { private Object elementOrEvent; private boolean isStreamEnd; private boolean isStreamRecord;
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 7dc889c..acb531d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -17,23 +17,16 @@ package org.apache.flink.streaming.api; -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import com.google.common.collect.ImmutableList; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; @@ -41,8 +34,18 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce; import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.KeySelectorUtil; +import com.google.common.collect.ImmutableList; import org.junit.Test; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AggregationFunction}. + */ public class AggregationFunctionTest { @Test @@ -188,9 +191,9 @@ public class AggregationFunctionTest { // preparing expected outputs List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of( - Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2), - Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2), - Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2)); + Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2), + Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), + Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2)); List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of( Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2), @@ -198,9 +201,9 @@ public class AggregationFunctionTest { Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8)); List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of( - Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0), - Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0), - Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0)); + Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), + Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), + Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0)); List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of( Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), @@ -209,7 +212,7 @@ public class AggregationFunctionTest { // some necessary boiler plate TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor - .getForObject(Tuple3.of(0,0,0)); + .getForObject(Tuple3.of(0, 0, 0)); ExecutionConfig config = new ExecutionConfig(); @@ -351,6 +354,9 @@ public class AggregationFunctionTest { return inputList; } + /** + * POJO. + */ public static class MyPojo implements Serializable { private static final long serialVersionUID = 1L; @@ -380,6 +386,9 @@ public class AggregationFunctionTest { } } + /** + * POJO. + */ public static class MyPojo3 implements Serializable { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index e5fbfda..ea0e139 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.api; -import java.lang.reflect.Method; -import java.util.List; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -57,18 +54,18 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; -import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; +import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -80,8 +77,18 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import static org.junit.Assert.*; +import java.lang.reflect.Method; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Tests for {@link DataStream}. + */ @SuppressWarnings("serial") public class DataStreamTest { @@ -142,7 +149,7 @@ public class DataStreamTest { } }).setParallelism(2); - DataStream<Long> unionDifferingParallelism= input2.union(input3).map(new MapFunction<Long, Long>() { + DataStream<Long> unionDifferingParallelism = input2.union(input3).map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return null; @@ -578,6 +585,7 @@ public class DataStreamTest { @Override public void flatMap1(Long value, Collector<Long> out) throws Exception { } + @Override public void flatMap2(Long value, Collector<Long> out) throws Exception { } @@ -744,7 +752,6 @@ public class DataStreamTest { assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } - @Test public void operatorTest() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -761,7 +768,6 @@ public class DataStreamTest { map.addSink(new DiscardingSink<Integer>()); assertEquals(mapFunction, getFunctionForDataStream(map)); - FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() { private static final long serialVersionUID = 1L; @@ -1090,7 +1096,7 @@ public class DataStreamTest { expectedException.expect(InvalidProgramException.class); expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key.")); - input.keyBy(new KeySelector<Tuple2<Integer[],String>, Tuple2<Integer[],String>>() { + input.keyBy(new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>() { @Override public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception { return value; @@ -1121,6 +1127,9 @@ public class DataStreamTest { }); } + /** + * POJO without hashCode. + */ public static class POJOWithoutHashCode { private int[] id; @@ -1140,6 +1149,9 @@ public class DataStreamTest { } } + /** + * POJO with hashCode. + */ public static class POJOWithHashCode extends POJOWithoutHashCode { public POJOWithHashCode() { @@ -1244,7 +1256,7 @@ public class DataStreamTest { } } - public static class CustomPOJO { + private static class CustomPOJO { private String s; private int i; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java index d6fcd61..b231bea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -23,15 +23,19 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Test; +/** + * Tests for {@link RestartStrategies}. + */ public class RestartStrategyTest extends TestLogger { /** * Tests that in a streaming use case where checkpointing is enabled, a * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart - * strategy has been specified + * strategy has been specified. */ @Test public void testAutomaticRestartingWhenCheckpointing() throws Exception { @@ -53,7 +57,7 @@ public class RestartStrategyTest extends TestLogger { /** * Checks that in a streaming use case where checkpointing is enabled and the number - * of execution retries is set to 0, restarting is deactivated + * of execution retries is set to 0, restarting is deactivated. */ @Test public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() throws Exception { @@ -94,7 +98,7 @@ public class RestartStrategyTest extends TestLogger { Assert.assertNotNull(restartStrategy); Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); - Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getRestartAttempts()); - Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds()); + Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts()); + Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java index dd4ff33..cabc7a1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java @@ -17,18 +17,22 @@ package org.apache.flink.streaming.api; -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.SourceFunctionUtil; + import org.junit.Test; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link SourceFunction}. + */ public class SourceFunctionTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index ff1eaaa..91cbe13 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; + import org.junit.Assert; import org.junit.Test; @@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link StreamExecutionEnvironment}. + */ public class StreamExecutionEnvironmentTest { @Test @@ -143,7 +147,6 @@ public class StreamExecutionEnvironmentTest { } }; - SingleOutputStreamOperator<Object> operator = env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { @@ -227,7 +230,6 @@ public class StreamExecutionEnvironmentTest { // Utilities ///////////////////////////////////////////////////////////// - private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) { StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); StreamGraph streamGraph = env.getStreamGraph(); @@ -242,7 +244,7 @@ public class StreamExecutionEnvironmentTest { return (SourceFunction<T>) operator.getUserFunction(); } - public static class DummySplittableIterator<T> extends SplittableIterator<T> { + private static class DummySplittableIterator<T> extends SplittableIterator<T> { private static final long serialVersionUID = 1312752876092210499L; @SuppressWarnings("unchecked") @@ -272,7 +274,7 @@ public class StreamExecutionEnvironmentTest { } } - public static class ParentClass { + private static class ParentClass { int num; String string; public ParentClass(int num, String string) { @@ -281,7 +283,7 @@ public class StreamExecutionEnvironmentTest { } } - public static class SubClass extends ParentClass{ + private static class SubClass extends ParentClass{ public SubClass(int num, String string) { super(num, string); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index 83fb2f7..5baa980 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -17,10 +17,6 @@ package org.apache.flink.streaming.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - - import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -35,6 +31,12 @@ import org.apache.flink.util.Collector; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link TypeFill}. + */ @SuppressWarnings("serial") public class TypeFillTest { @@ -47,7 +49,6 @@ public class TypeFillTest { fail(); } catch (Exception ignored) {} - DataStream<Long> source = env.generateSequence(1, 10); try { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index 51b9d9a..4d201f4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + import org.junit.Assert; import org.junit.Test; @@ -29,6 +30,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +/** + * Tests for {@link ListCheckpointed}. + */ public class ListCheckpointedTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java index 3194f9e..a2f8ed6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java @@ -17,17 +17,20 @@ package org.apache.flink.streaming.api.collector; -import static org.junit.Assert.assertEquals; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; + +import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; - -import org.junit.Test; +import static org.junit.Assert.assertEquals; +/** + * Tests for {@link OutputSelector}. + */ public class OutputSelectorTest { static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java index d346fdc..c053598 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java @@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; - import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link Flip6LocalStreamEnvironment}. + */ @SuppressWarnings("serial") public class LocalStreamEnvironmentITCase extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java index 2e92807..16f87ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java @@ -19,11 +19,15 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; + import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +/** + * Tests for {@link AscendingTimestampExtractor}. + */ public class AscendingTimestampExtractorTest { @Test @@ -87,8 +91,6 @@ public class AscendingTimestampExtractorTest { assertEquals(500L, extractor.extractTimestamp(500L, 0L)); assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L)); - - } private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java index d9a3812..91931d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/BoundedOutOfOrdernessTimestampExtractorTest.java @@ -15,18 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; + import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link BoundedOutOfOrdernessTimestampExtractor}. + */ public class BoundedOutOfOrdernessTimestampExtractorTest { - @Test public void testInitializationAndRuntime() { Time maxAllowedLateness = Time.milliseconds(10L); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java index 3744eb9..9268ef7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java @@ -41,7 +41,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}. @@ -134,12 +136,12 @@ public class FromElementsFunctionTest { @Test public void testCheckpointAndRestore() { try { - final int NUM_ELEMENTS = 10000; + final int numElements = 10000; - List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS); - List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS); + List<Integer> data = new ArrayList<Integer>(numElements); + List<Integer> result = new ArrayList<Integer>(numElements); - for (int i = 0; i < NUM_ELEMENTS; i++) { + for (int i = 0; i < numElements; i++) { data.add(i); } @@ -171,7 +173,7 @@ public class FromElementsFunctionTest { Thread.sleep(1000); // make a checkpoint - List<Integer> checkpointData = new ArrayList<>(NUM_ELEMENTS); + List<Integer> checkpointData = new ArrayList<>(numElements); OperatorStateHandles handles = null; synchronized (ctx.getCheckpointLock()) { handles = testHarness.snapshot(566, System.currentTimeMillis()); @@ -215,7 +217,7 @@ public class FromElementsFunctionTest { // Test Types // ------------------------------------------------------------------------ - public static class MyPojo { + private static class MyPojo { public long val1; public int val2; @@ -244,7 +246,7 @@ public class FromElementsFunctionTest { } } - public static class SerializationErrorType implements Value { + private static class SerializationErrorType implements Value { private static final long serialVersionUID = -6037206294939421807L; @@ -259,7 +261,7 @@ public class FromElementsFunctionTest { } } - public static class DeserializeTooMuchType implements Value { + private static class DeserializeTooMuchType implements Value { private static final long serialVersionUID = -6037206294939421807L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java index a976453..c1ca9a1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java @@ -19,10 +19,15 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.streaming.api.watermark.Watermark; + import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +/** + * Tests for {@link IngestionTimeExtractor}. + */ public class IngestionTimeExtractorTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java index 55a0e7f..ee666df 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java @@ -36,7 +36,6 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> { private final long delay; - public ListSourceContext(List<T> target) { this(target, 0L); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java index 9030e9d..de9f1c7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + import org.junit.Assert; import org.junit.Test; @@ -35,6 +36,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +/** + * Tests for {@link StatefulSequenceSource}. + */ public class StatefulSequenceSourceTest { @Test @@ -190,7 +194,7 @@ public class StatefulSequenceSourceTest { private final List<Long> localOutput; public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLatch latchToWait, - ConcurrentHashMap<String, List<Long>> output, int elemToFire) { + ConcurrentHashMap<String, List<Long>> output, int elemToFire) { this.name = name; this.lock = new Object(); this.latchToTrigger = latchToTrigger; @@ -225,7 +229,6 @@ public class StatefulSequenceSourceTest { } } - @Override public void emitWatermark(Watermark mark) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index 562883d..224b376 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -32,8 +32,8 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; -import org.junit.Test; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Test cases for {@link RichAsyncFunction} + * Test cases for {@link RichAsyncFunction}. */ public class RichAsyncFunctionTest { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java index d3a9d3d..afd1101 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunctionTest.java @@ -21,9 +21,13 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.io.RichOutputFormat; + import org.junit.Test; import org.mockito.Mockito; +/** + * Tests for {@link OutputFormatSinkFunction}. + */ public class OutputFormatSinkFunctionTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java index 877e707..63e83d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java @@ -18,11 +18,12 @@ package org.apache.flink.streaming.api.functions.sink; -import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.TestLogger; + +import org.apache.commons.io.IOUtils; import org.junit.Test; import java.io.BufferedReader; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java index d81b440..08985bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java @@ -30,8 +30,8 @@ public class FileMonitoringFunctionTest { @Test public void testForEmptyLocation() throws Exception { - final FileMonitoringFunction fileMonitoringFunction - = new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES); + final FileMonitoringFunction fileMonitoringFunction = + new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES); new Thread() { @Override @@ -61,7 +61,9 @@ public class FileMonitoringFunctionTest { public void markAsTemporarilyIdle() {} @Override - public Object getCheckpointLock() { return null; } + public Object getCheckpointLock() { + return null; + } @Override public void close() {} http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index bb80228..b99119e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -34,12 +34,16 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; + import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.util.Collections; +/** + * Tests for {@link InputFormatSourceFunction}. + */ public class InputFormatSourceFunctionTest { @Test @@ -81,8 +85,7 @@ public class InputFormatSourceFunctionTest { Assert.assertTrue(!format.isInputFormatOpen); } - - private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> { + private static class LifeCycleTestInputFormat extends RichInputFormat<Integer, InputSplit> { private static final long serialVersionUID = 7408902249499583273L; private boolean isConfigured = false; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java index 85fa30b..3d14544 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java @@ -18,10 +18,9 @@ package org.apache.flink.streaming.api.functions.source; -import org.apache.commons.io.IOUtils; - import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.commons.io.IOUtils; import org.junit.Test; import java.io.EOFException; @@ -29,7 +28,8 @@ import java.io.OutputStreamWriter; import java.net.ServerSocket; import java.net.Socket; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. @@ -38,7 +38,6 @@ public class SocketTextStreamFunctionTest { private static final String LOCALHOST = "127.0.0.1"; - @Test public void testSocketSourceSimpleOutput() throws Exception { ServerSocket server = new ServerSocket(0); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java index c98a659..a110af0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import static org.junit.Assert.assertEquals; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple10; @@ -45,10 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.tuple.Tuple9; -import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link Tuple} to {@code Array}. + */ public class ArrayFromTupleTest { private String[] testStrings; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java index 3b098c3..bb2a3c3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java @@ -17,12 +17,16 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import static org.junit.Assert.*; - import org.apache.flink.api.java.tuple.Tuple2; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ConcatenatedExtract}. + */ public class ConcatenatedExtractTest { private String[] testStringArray1 = { "1", "2", "3" }; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java index d274f4e..a71c0b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java @@ -21,6 +21,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link FieldFromArray}. + */ public class FieldFromArrayTest { String[] testStringArray = { "0", "1", "2", "3", "4" }; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java index c05f281..89ccc45 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java @@ -43,11 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.tuple.Tuple9; + import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for {@link FieldFromTuple}. + */ public class FieldFromTupleTest { private String[] testStrings; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java index 7a9a716..4404cd8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java @@ -17,11 +17,13 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import static org.junit.Assert.assertEquals; - -import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link FieldsFromArray}. + */ public class FieldsFromArrayTest { String[] testStringArray = { "0", "1", "2", "3", "4" }; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java index 025ed8a..f46d5eb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import static org.junit.Assert.*; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple10; @@ -45,10 +43,15 @@ import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.tuple.Tuple9; -import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple; + import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link FieldsFromTuple}. + */ public class FieldsFromTupleTest { private double[] testDouble; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java index cac59ae..6c8d5d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java @@ -17,20 +17,21 @@ package org.apache.flink.streaming.api.graph; -import static org.junit.Assert.*; - -import java.util.List; - import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.util.TestLogger; + import org.junit.Test; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + /** * This verifies that slot sharing groups are correctly forwarded from user job to JobGraph. * @@ -46,7 +47,9 @@ public class SlotAllocationTest extends TestLogger { FilterFunction<Long> dummyFilter = new FilterFunction<Long>() { @Override - public boolean filter(Long value) { return false; } + public boolean filter(Long value) { + return false; + } }; env.generateSequence(1, 10) @@ -89,7 +92,9 @@ public class SlotAllocationTest extends TestLogger { FilterFunction<Long> dummyFilter = new FilterFunction<Long>() { @Override - public boolean filter(Long value) { return false; } + public boolean filter(Long value) { + return false; + } }; DataStream<Long> src1 = env.generateSequence(1, 10); @@ -127,7 +132,9 @@ public class SlotAllocationTest extends TestLogger { FilterFunction<Long> dummyFilter = new FilterFunction<Long>() { @Override - public boolean filter(Long value) { return false; } + public boolean filter(Long value) { + return false; + } }; DataStream<Long> src1 = env.generateSequence(1, 10).slotSharingGroup("group-1"); @@ -144,7 +151,6 @@ public class SlotAllocationTest extends TestLogger { assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup()); } - @Test public void testCoOperation() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 5fdacd4..8149d24 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; + import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -59,8 +60,7 @@ public class StreamGraphGeneratorTest { /** * This tests whether virtual Transformations behave correctly. * - * <p> - * Verifies that partitioning, output selector, selected names are correctly set in the + * <p>Verifies that partitioning, output selector, selected names are correctly set in the * StreamGraph when they are intermixed. */ @Test @@ -111,7 +111,6 @@ public class StreamGraphGeneratorTest { .select("even") .shuffle(); - SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3) .map(new NoOpIntMap()); @@ -143,7 +142,7 @@ public class StreamGraphGeneratorTest { /** * This tests whether virtual Transformations behave correctly. * - * Checks whether output selector, partitioning works correctly when applied on a union. + * <p>Checks whether output selector, partitioning works correctly when applied on a union. */ @Test public void testVirtualTransformations2() throws Exception { @@ -270,7 +269,7 @@ public class StreamGraphGeneratorTest { } /** - * Tests that the global and operator-wide max parallelism setting is respected + * Tests that the global and operator-wide max parallelism setting is respected. */ @Test public void testMaxParallelismForwarding() { @@ -387,13 +386,13 @@ public class StreamGraphGeneratorTest { env.getConfig().setMaxParallelism(maxParallelism); DataStream<Integer> keyedResult = input1.connect(input2).keyBy( - new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = -6908614081449363419L; + new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = -6908614081449363419L; - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } }, new KeySelector<Integer, Integer>() { private static final long serialVersionUID = 3195683453223164931L; @@ -501,6 +500,5 @@ public class StreamGraphGeneratorTest { return value; } - }; - + } } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 6c5baca..6dd7de7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -47,6 +47,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +/** + * Tests for {@link StreamingJobGraphGenerator}. + */ @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { @@ -158,7 +161,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { /** * Verifies that the resources are merged correctly for chained operators (covers source and sink cases) - * when generating job graph + * when generating job graph. */ @Test public void testResourcesForChainedSourceSink() throws Exception { @@ -230,7 +233,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { /** * Verifies that the resources are merged correctly for chained operators (covers middle chaining and iteration cases) - * when generating job graph + * when generating job graph. */ @Test public void testResourcesForIteration() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 67004ea..798c81f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -35,21 +35,19 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.Assert; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.internal.util.reflection.Whitebox; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -261,18 +259,18 @@ public class AbstractStreamOperatorTest { */ @Test public void testStateAndTimerStateShufflingScalingUp() throws Exception { - final int MAX_PARALLELISM = 10; + final int maxParallelism = 10; // first get two keys that will fall into different key-group ranges that go // to different operator subtasks when we restore // get two sub key-ranges so that we can restore two ranges separately - KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1); - KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1); + KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1); + KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1); // get two different keys, one per sub range - int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM); - int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM); + int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism); + int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism); TestOperator testOperator = new TestOperator(); @@ -281,7 +279,7 @@ public class AbstractStreamOperatorTest { testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 1, /* num subtasks */ 0 /* subtask index */); @@ -312,7 +310,7 @@ public class AbstractStreamOperatorTest { testOperator1, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 2, /* num subtasks */ 0 /* subtask index */); @@ -352,7 +350,7 @@ public class AbstractStreamOperatorTest { testOperator2, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 2, /* num subtasks */ 1 /* subtask index */); @@ -383,18 +381,18 @@ public class AbstractStreamOperatorTest { @Test public void testStateAndTimerStateShufflingScalingDown() throws Exception { - final int MAX_PARALLELISM = 10; + final int maxParallelism = 10; // first get two keys that will fall into different key-group ranges that go // to different operator subtasks when we restore // get two sub key-ranges so that we can restore two ranges separately - KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (MAX_PARALLELISM / 2) - 1); - KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, MAX_PARALLELISM - 1); + KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(0, (maxParallelism / 2) - 1); + KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(subKeyGroupRange1.getEndKeyGroup() + 1, maxParallelism - 1); // get two different keys, one per sub range - int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, MAX_PARALLELISM); - int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, MAX_PARALLELISM); + int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism); + int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism); TestOperator testOperator1 = new TestOperator(); @@ -403,7 +401,7 @@ public class AbstractStreamOperatorTest { testOperator1, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 2, /* num subtasks */ 0 /* subtask index */); @@ -420,11 +418,10 @@ public class AbstractStreamOperatorTest { testOperator2, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 2, /* num subtasks */ 1 /* subtask index */); - testHarness2.setup(); testHarness2.open(); @@ -458,7 +455,7 @@ public class AbstractStreamOperatorTest { testOperator3, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, - MAX_PARALLELISM, + maxParallelism, 1, /* num subtasks */ 0 /* subtask index */); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index d331171..e8b4c9e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest; + import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java index 90470ac..7dba4af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java @@ -53,12 +53,16 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; +/** + * Tests for {@link FoldApplyProcessWindowFunction}. + */ public class FoldApplyProcessWindowFunctionTest { /** @@ -290,7 +294,7 @@ public class FoldApplyProcessWindowFunctionTest { Assert.assertEquals(expected, result); } - public static class DummyKeyedStateStore implements KeyedStateStore { + private static class DummyKeyedStateStore implements KeyedStateStore { @Override public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { @@ -318,7 +322,7 @@ public class FoldApplyProcessWindowFunctionTest { } } - public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index fecd440..7cf18dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -37,12 +37,16 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.util.Collector; -import org.junit.Test; + import org.junit.Assert; +import org.junit.Test; import java.util.ArrayList; import java.util.List; +/** + * Tests for {@link FoldApplyWindowFunction}. + */ public class FoldApplyWindowFunctionTest { /** @@ -138,7 +142,7 @@ public class FoldApplyWindowFunctionTest { Assert.assertEquals(expected, result); } - public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index 680f2ac..5a4f1c4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsList; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,7 +48,13 @@ import java.util.Set; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests for {@link HeapInternalTimerService}. @@ -111,7 +119,7 @@ public class HeapInternalTimerServiceTest { for (int i = 0; i < totalNoOfTimers; i++) { // create the timer to be registered - InternalTimer<Integer, String> timer = new InternalTimer<>(10 + i, i, "hello_world_"+ i); + InternalTimer<Integer, String> timer = new InternalTimer<>(10 + i, i, "hello_world_" + i); int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNoOfKeyGroups); // add it in the adequate expected set of timers per keygroup @@ -298,7 +306,6 @@ public class HeapInternalTimerServiceTest { assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L)); } - @Test public void testCurrentProcessingTime() throws Exception { @@ -673,7 +680,6 @@ public class HeapInternalTimerServiceTest { @SuppressWarnings("unchecked") Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class); - TestKeyContext keyContext1 = new TestKeyContext(); TestKeyContext keyContext2 = new TestKeyContext(); @@ -696,7 +702,6 @@ public class HeapInternalTimerServiceTest { subKeyGroupRange2, maxParallelism); - processingTimeService1.setCurrentTime(10); timerService1.advanceWatermark(10); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java index 32953fc..696acfa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.operators; +package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.concurrent.ConcurrentLinkedQueue; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java index f57eed1..5a7e69e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.concurrent.RunnableFuture; @@ -29,6 +30,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.when; +/** + * Tests for {@link OperatorSnapshotResult}. + */ public class OperatorSnapshotResultTest extends TestLogger { /** http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java index c37fe48..35ab00c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.operators; +package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -26,6 +26,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.concurrent.ConcurrentLinkedQueue; @@ -92,6 +93,7 @@ public class ProcessOperatorTest extends TestLogger { testHarness.close(); } + private static class QueryingProcessFunction extends ProcessFunction<Integer, String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index 9b78b08..1e9b942 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -18,9 +18,6 @@ package org.apache.flink.streaming.api.operators; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.serializers.JavaSerializer; - import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -43,17 +40,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.util.Collector; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.junit.Test; import java.io.File; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; /** * Various tests around the proper passing of state descriptors to the operators * and their serialization. * - * The tests use an arbitrary generic type to validate the behavior. + * <p>The tests use an arbitrary generic type to validate the behavior. */ @SuppressWarnings("serial") public class StateDescriptorPassingTest { @@ -266,7 +265,7 @@ public class StateDescriptorPassingTest { assertTrue(descr instanceof ListStateDescriptor); - ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>)descr; + ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>) descr; // this would be the first statement to fail if state descriptors were not properly initialized TypeSerializer<?> serializer = listDescr.getSerializer(); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 3745031..1ba2e77 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.util.LongArrayList; import org.apache.flink.util.Preconditions; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,6 +55,9 @@ import java.util.Set; import static org.mockito.Mockito.mock; +/** + * Tests for {@link StateInitializationContextImpl}. + */ public class StateInitializationContextImplTest { static final int NUM_HANDLES = 10; @@ -67,7 +71,6 @@ public class StateInitializationContextImplTest { @Before public void setUp() throws Exception { - this.writtenKeyGroups = 0; this.writtenOperatorStates = new HashSet<>(); @@ -204,7 +207,6 @@ public class StateInitializationContextImplTest { int stopCount = NUM_HANDLES / 2; boolean isClosed = false; - try { for (KeyGroupStatePartitionStreamProvider stateStreamProvider : initializationContext.getRawKeyedStateInputs()) { http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java index 277ced5..099f1f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -40,6 +41,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.when; +/** + * Tests for {@link StateSnapshotContextSynchronousImpl}. + */ public class StateSnapshotContextSynchronousImplTest extends TestLogger { private StateSnapshotContextSynchronousImpl snapshotContext; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java index 047aad8..8add242 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.configuration.Configuration; @@ -26,9 +24,12 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link StreamFilter}. These test that: * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java index e4e29c1..8ea1e12 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; @@ -27,9 +25,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link StreamMap}. These test that: * @@ -41,7 +42,7 @@ import org.junit.Test; */ public class StreamFlatMapTest { - public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> { + private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java index 05a910f..3ebb9ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.RichFoldFunction; @@ -33,6 +31,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java index b61c760..f1c9bca 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -31,9 +29,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Tests for {@link StreamGroupedReduce}. These test that: *
