[hotfix] [streaming api] Remove obsolete and unused InputTypeSerializer from 
WindowOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd8d4f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd8d4f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd8d4f4

Branch: refs/heads/master
Commit: 1cd8d4f418a707790c091fed2428627eae9da423
Parents: 47e4977
Author: Stephan Ewen <[email protected]>
Authored: Tue Oct 4 23:49:54 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 20:04:34 2016 +0200

----------------------------------------------------------------------
 .../operators/windowing/WindowOperator.java     | 20 +------
 .../windowing/EvictingWindowOperatorTest.java   |  6 ---
 .../operators/windowing/WindowOperatorTest.java | 56 +-------------------
 .../streaming/util/WindowingTestHarness.java    |  2 -
 4 files changed, 3 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index de316e7..c5f1ca2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -34,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -98,7 +96,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
        extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, 
K, W>>
-       implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
+       implements OneInputStreamOperator<IN, OUT>, Triggerable {
 
        private static final long serialVersionUID = 1L;
 
@@ -115,12 +113,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor;
 
        /**
-        * This is used to copy the incoming element because it can be put into 
several window
-        * buffers.
-        */
-       protected TypeSerializer<IN> inputSerializer;
-
-       /**
         * For serializing the key in checkpoints.
         */
        protected final TypeSerializer<K> keySerializer;
@@ -211,21 +203,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       @SuppressWarnings("unchecked")
-       public final void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
-               inputSerializer = (TypeSerializer<IN>) 
type.createSerializer(executionConfig);
-       }
-
-       @Override
        public final void open() throws Exception {
                super.open();
 
                timestampedCollector = new TimestampedCollector<>(output);
 
-               if (inputSerializer == null) {
-                       throw new IllegalStateException("Input serializer was 
not set.");
-               }
-
                // these could already be initialized from restoreState()
                if (watermarkTimers == null) {
                        watermarkTimers = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 681a334..8f3af15 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -82,8 +82,6 @@ public class EvictingWindowOperatorTest {
                                CountEvictor.of(WINDOW_SIZE),
                                0);
 
-               operator.setInputType(inputType, new ExecutionConfig());
-
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
@@ -152,8 +150,6 @@ public class EvictingWindowOperatorTest {
                        CountEvictor.of(WINDOW_SIZE),
                        0);
 
-               operator.setInputType(inputType, new ExecutionConfig());
-
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
@@ -221,8 +217,6 @@ public class EvictingWindowOperatorTest {
                        CountEvictor.of(WINDOW_SIZE),
                        0);
 
-               operator.setInputType(inputType, new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index e98bc91..cda6524 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -180,8 +180,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(inputType, new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -216,8 +214,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(inputType, new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -317,8 +313,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -351,8 +345,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -388,8 +380,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -462,8 +452,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -538,8 +526,6 @@ public class WindowOperatorTest extends TestLogger {
                                PurgingTrigger.of(CountTrigger.of(4)),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
                
@@ -643,8 +629,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
                
@@ -703,8 +687,6 @@ public class WindowOperatorTest extends TestLogger {
                                
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -793,9 +775,6 @@ public class WindowOperatorTest extends TestLogger {
                                PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
                                0);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(
-                               "Tuple2<String, Integer>"), new 
ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -839,9 +818,6 @@ public class WindowOperatorTest extends TestLogger {
 
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(
-                               "Tuple2<String, Integer>"), new 
ExecutionConfig());
-
                testHarness.setup();
                testHarness.restore(snapshot);
                testHarness.open();
@@ -876,7 +852,6 @@ public class WindowOperatorTest extends TestLogger {
                final int WINDOW_SLIDE = 1;
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-               TestTimeServiceProvider timer = new TestTimeServiceProvider();
 
                TestTimeServiceProvider timer = new TestTimeServiceProvider();
 
@@ -957,7 +932,6 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1017,7 +991,6 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1090,7 +1063,6 @@ public class WindowOperatorTest extends TestLogger {
                                ProcessingTimeTrigger.create(), 0);
 
                TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1159,8 +1131,7 @@ public class WindowOperatorTest extends TestLogger {
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
-               operator.setInputType(inputType, new ExecutionConfig());
+               
                testHarness.open();
                
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -1220,8 +1191,7 @@ public class WindowOperatorTest extends TestLogger {
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
-
-               operator.setInputType(inputType, new ExecutionConfig());
+               
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -1288,7 +1258,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -1349,7 +1318,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
                
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -1422,8 +1390,6 @@ public class WindowOperatorTest extends TestLogger {
                                PurgingTrigger.of(EventTimeTrigger.create()),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1514,8 +1480,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1600,8 +1564,6 @@ public class WindowOperatorTest extends TestLogger {
                                PurgingTrigger.of(EventTimeTrigger.create()),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1686,8 +1648,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1781,8 +1741,6 @@ public class WindowOperatorTest extends TestLogger {
                                PurgingTrigger.of(EventTimeTrigger.create()),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1868,8 +1826,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -1958,7 +1914,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -2013,7 +1968,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -2060,7 +2014,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -2116,7 +2069,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -2162,7 +2114,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
@@ -2204,8 +2155,6 @@ public class WindowOperatorTest extends TestLogger {
                                EventTimeTrigger.create(),
                                LATENESS);
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
@@ -2261,7 +2210,6 @@ public class WindowOperatorTest extends TestLogger {
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(operator, 
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
 
-               operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
 
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/1cd8d4f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index d47136c..ab8b70f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -80,8 +80,6 @@ public class WindowingTestHarness<K, IN, W extends Window> {
                                trigger,
                                allowedLateness);
 
-               operator.setInputType(inputType, executionConfig);
-
                timeServiceProvider = new TestTimeServiceProvider();
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, 
timeServiceProvider, keySelector, keyType);
        }

Reply via email to