[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()

It's not needed because we can get the key serializer from the keyed
state backend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3

Branch: refs/heads/master
Commit: 0b873ac343343fd1d7716d075b54e66324374f47
Parents: 06fb9f1
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 28 14:34:47 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java     |  2 +-
 .../api/operators/AbstractStreamOperator.java     | 13 ++++++-------
 .../api/operators/StreamTimelyFlatMap.java        |  9 ++-------
 .../api/operators/co/CoStreamTimelyFlatMap.java   | 11 ++---------
 .../operators/windowing/WindowOperator.java       |  2 +-
 .../api/operators/AbstractStreamOperatorTest.java |  1 -
 .../api/operators/TimelyFlatMapTest.java          | 18 +++++++++---------
 .../api/operators/co/TimelyCoFlatMapTest.java     | 16 ++++++++--------
 8 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c938f5b..4063b60 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                        TypeInformation<R> outputType) {
 
                StreamTimelyFlatMap<KEY, T, R> operator =
-                               new 
StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), 
clean(flatMapper));
+                               new StreamTimelyFlatMap<>(clean(flatMapper));
 
                return transform("Flat Map", outputType, operator);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7b555b7..839abf8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT>
         *
         * @param name The name of the requested timer service. If no service 
exists under the given
         *             name a new one will be created and returned.
-        * @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
         * @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
         * @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
         *
-        * @param <K> The type of the timer keys.
         * @param <N> The type of the timer namespace.
         */
-       public <K, N> InternalTimerService<N> getInternalTimerService(
+       public <N> InternalTimerService<N> getInternalTimerService(
                        String name,
-                       TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
-                       Triggerable<K, N> triggerable) {
+                       Triggerable<?, N> triggerable) {
                if (getKeyedStateBackend() == null) {
                        throw new UnsupportedOperationException("Timers can 
only be used on keyed operators.");
                }
 
                @SuppressWarnings("unchecked")
-               HeapInternalTimerService<K, N> timerService = 
(HeapInternalTimerService<K, N>) timerServices.get(name);
+               HeapInternalTimerService<Object, N> timerService = 
(HeapInternalTimerService<Object, N>) timerServices.get(name);
 
                if (timerService == null) {
                        timerService = new HeapInternalTimerService<>(
@@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT>
                                getRuntimeContext().getProcessingTimeService());
                        timerServices.put(name, timerService);
                }
-               timerService.startTimerService(keySerializer, 
namespaceSerializer, triggerable);
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               Triggerable rawTriggerable = (Triggerable) triggerable;
+               
timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), 
namespaceSerializer, rawTriggerable);
                return timerService;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index 962f264..d507ba6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 
        private static final long serialVersionUID = 1L;
 
-       private final TypeSerializer<K> keySerializer;
-
        private transient TimestampedCollector<OUT> collector;
 
        private transient TimerService timerService;
 
-       public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, 
TimelyFlatMapFunction<IN, OUT> flatMapper) {
+       public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
                super(flatMapper);
 
-               this.keySerializer = keySerializer;
-
                chainingStrategy = ChainingStrategy.ALWAYS;
        }
 
@@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT>
                collector = new TimestampedCollector<>(output);
 
                InternalTimerService<VoidNamespace> internalTimerService =
-                               getInternalTimerService("user-timers", 
keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+                               getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
 
                this.timerService = new 
SimpleTimerService(internalTimerService);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index df2320f..212aafd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 
        private static final long serialVersionUID = 1L;
 
-       private final TypeSerializer<K> keySerializer;
-
        private transient TimestampedCollector<OUT> collector;
 
        private transient TimerService timerService;
 
-       public CoStreamTimelyFlatMap(
-                       TypeSerializer<K> keySerializer,
-                       TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+       public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> 
flatMapper) {
                super(flatMapper);
-
-               this.keySerializer = keySerializer;
        }
 
        @Override
@@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
                collector = new TimestampedCollector<>(output);
 
                InternalTimerService<VoidNamespace> internalTimerService =
-                               getInternalTimerService("user-timers", 
keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+                               getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
 
                this.timerService = new 
SimpleTimerService(internalTimerService);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c465767..229d97d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                timestampedCollector = new TimestampedCollector<>(output);
 
                internalTimerService =
-                               getInternalTimerService("window-timers", 
keySerializer, windowSerializer, this);
+                               getInternalTimerService("window-timers", 
windowSerializer, this);
 
                context = new Context(null, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index fd05353..2fb0089 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest {
 
                        this.timerService = getInternalTimerService(
                                        "test-timers",
-                                       IntSerializer.INSTANCE,
                                        VoidNamespaceSerializer.INSTANCE,
                                        this);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index f3b09eb..46b52ee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testCurrentEventTime() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+                               new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testCurrentProcessingTime() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+                               new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testEventTimeTimers() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+                               new StreamTimelyFlatMap<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testProcessingTimeTimers() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+                               new StreamTimelyFlatMap<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testEventTimeTimerWithState() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+                               new StreamTimelyFlatMap<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testProcessingTimeTimerWithState() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+                               new StreamTimelyFlatMap<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger {
        public void testSnapshotAndRestore() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
BothTriggeringFlatMapFunction());
+                               new StreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger {
 
                testHarness.close();
 
-               operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, 
new BothTriggeringFlatMapFunction());
+               operator = new StreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
 
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
 
@@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<Integer> state =
-                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE,  null);
 
                private final TimeDomain timeDomain;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index 25808f4..cb5d6c2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testCurrentEventTime() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
WatermarkQueryingFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testCurrentProcessingTime() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeQueryingFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testEventTimeTimers() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
EventTimeTriggeringFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
EventTimeTriggeringFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testProcessingTimeTimers() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
ProcessingTimeTriggeringFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeTriggeringFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testEventTimeTimerWithState() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
EventTimeTriggeringStatefulFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
EventTimeTriggeringStatefulFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testProcessingTimeTimerWithState() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
ProcessingTimeTriggeringStatefulFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeTriggeringStatefulFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        public void testSnapshotAndRestore() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
BothTriggeringFlatMapFunction());
+                               new CoStreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
 
                TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 
                testHarness.close();
 
-               operator = new 
CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new 
BothTriggeringFlatMapFunction());
+               operator = new CoStreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
 
                testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
                                operator,

Reply via email to