[hotfix] [streaming api] Remove obsolete and unused InputTypeSerializer from WindowOperator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd8d4f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd8d4f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd8d4f4 Branch: refs/heads/master Commit: 1cd8d4f418a707790c091fed2428627eae9da423 Parents: 47e4977 Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 23:49:54 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 20:04:34 2016 +0200 ---------------------------------------------------------------------- .../operators/windowing/WindowOperator.java | 20 +------ .../windowing/EvictingWindowOperatorTest.java | 6 --- .../operators/windowing/WindowOperatorTest.java | 56 +------------------- .../streaming/util/WindowingTestHarness.java | 2 - 4 files changed, 3 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index de316e7..c5f1ca2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AppendingState; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -34,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.core.fs.FSDataInputStream; @@ -98,7 +96,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> - implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { + implements OneInputStreamOperator<IN, OUT>, Triggerable { private static final long serialVersionUID = 1L; @@ -115,12 +113,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; /** - * This is used to copy the incoming element because it can be put into several window - * buffers. - */ - protected TypeSerializer<IN> inputSerializer; - - /** * For serializing the key in checkpoints. */ protected final TypeSerializer<K> keySerializer; @@ -211,21 +203,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - @SuppressWarnings("unchecked") - public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { - inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig); - } - - @Override public final void open() throws Exception { super.open(); timestampedCollector = new TimestampedCollector<>(output); - if (inputSerializer == null) { - throw new IllegalStateException("Input serializer was not set."); - } - // these could already be initialized from restoreState() if (watermarkTimers == null) { watermarkTimers = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 681a334..8f3af15 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -82,8 +82,6 @@ public class EvictingWindowOperatorTest { CountEvictor.of(WINDOW_SIZE), 0); - operator.setInputType(inputType, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -152,8 +150,6 @@ public class EvictingWindowOperatorTest { CountEvictor.of(WINDOW_SIZE), 0); - operator.setInputType(inputType, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -221,8 +217,6 @@ public class EvictingWindowOperatorTest { CountEvictor.of(WINDOW_SIZE), 0); - operator.setInputType(inputType, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index e98bc91..cda6524 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -180,8 +180,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(inputType, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -216,8 +214,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(inputType, new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -317,8 +313,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -351,8 +345,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -388,8 +380,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -462,8 +452,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -538,8 +526,6 @@ public class WindowOperatorTest extends TestLogger { PurgingTrigger.of(CountTrigger.of(4)), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -643,8 +629,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -703,8 +687,6 @@ public class WindowOperatorTest extends TestLogger { ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -793,9 +775,6 @@ public class WindowOperatorTest extends TestLogger { PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)), 0); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( - "Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -839,9 +818,6 @@ public class WindowOperatorTest extends TestLogger { testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( - "Tuple2<String, Integer>"), new ExecutionConfig()); - testHarness.setup(); testHarness.restore(snapshot); testHarness.open(); @@ -876,7 +852,6 @@ public class WindowOperatorTest extends TestLogger { final int WINDOW_SLIDE = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - TestTimeServiceProvider timer = new TestTimeServiceProvider(); TestTimeServiceProvider timer = new TestTimeServiceProvider(); @@ -957,7 +932,6 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -1017,7 +991,6 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -1090,7 +1063,6 @@ public class WindowOperatorTest extends TestLogger { ProcessingTimeTrigger.create(), 0); TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -1159,8 +1131,7 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - - operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -1220,8 +1191,7 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - - operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -1288,7 +1258,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -1349,7 +1318,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -1422,8 +1390,6 @@ public class WindowOperatorTest extends TestLogger { PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1514,8 +1480,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1600,8 +1564,6 @@ public class WindowOperatorTest extends TestLogger { PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1686,8 +1648,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1781,8 +1741,6 @@ public class WindowOperatorTest extends TestLogger { PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1868,8 +1826,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -1958,7 +1914,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -2013,7 +1968,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -2060,7 +2014,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -2116,7 +2069,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -2162,7 +2114,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); @@ -2204,8 +2155,6 @@ public class WindowOperatorTest extends TestLogger { EventTimeTrigger.create(), LATENESS); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; @@ -2261,7 +2210,6 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java index d47136c..ab8b70f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java @@ -80,8 +80,6 @@ public class WindowingTestHarness<K, IN, W extends Window> { trigger, allowedLateness); - operator.setInputType(inputType, executionConfig); - timeServiceProvider = new TestTimeServiceProvider(); testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType); }
