[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService() It's not needed because we can get the key serializer from the keyed state backend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3 Branch: refs/heads/master Commit: 0b873ac343343fd1d7716d075b54e66324374f47 Parents: 06fb9f1 Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 28 14:34:47 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 7 16:25:57 2016 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/KeyedStream.java | 2 +- .../api/operators/AbstractStreamOperator.java | 13 ++++++------- .../api/operators/StreamTimelyFlatMap.java | 9 ++------- .../api/operators/co/CoStreamTimelyFlatMap.java | 11 ++--------- .../operators/windowing/WindowOperator.java | 2 +- .../api/operators/AbstractStreamOperatorTest.java | 1 - .../api/operators/TimelyFlatMapTest.java | 18 +++++++++--------- .../api/operators/co/TimelyCoFlatMapTest.java | 16 ++++++++-------- 8 files changed, 29 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index c938f5b..4063b60 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { TypeInformation<R> outputType) { StreamTimelyFlatMap<KEY, T, R> operator = - new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper)); + new StreamTimelyFlatMap<>(clean(flatMapper)); return transform("Flat Map", outputType, operator); } http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 7b555b7..839abf8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT> * * @param name The name of the requested timer service. If no service exists under the given * name a new one will be created and returned. - * @param keySerializer {@code TypeSerializer} for the keys of the timers. * @param namespaceSerializer {@code TypeSerializer} for the timer namespace. * @param triggerable The {@link Triggerable} that should be invoked when timers fire * - * @param <K> The type of the timer keys. * @param <N> The type of the timer namespace. */ - public <K, N> InternalTimerService<N> getInternalTimerService( + public <N> InternalTimerService<N> getInternalTimerService( String name, - TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, - Triggerable<K, N> triggerable) { + Triggerable<?, N> triggerable) { if (getKeyedStateBackend() == null) { throw new UnsupportedOperationException("Timers can only be used on keyed operators."); } @SuppressWarnings("unchecked") - HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name); + HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name); if (timerService == null) { timerService = new HeapInternalTimerService<>( @@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT> getRuntimeContext().getProcessingTimeService()); timerServices.put(name, timerService); } - timerService.startTimerService(keySerializer, namespaceSerializer, triggerable); + @SuppressWarnings({"unchecked", "rawtypes"}) + Triggerable rawTriggerable = (Triggerable) triggerable; + timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable); return timerService; } http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java index 962f264..d507ba6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.SimpleTimerService; @@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT> private static final long serialVersionUID = 1L; - private final TypeSerializer<K> keySerializer; - private transient TimestampedCollector<OUT> collector; private transient TimerService timerService; - public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) { + public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); - this.keySerializer = keySerializer; - chainingStrategy = ChainingStrategy.ALWAYS; } @@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT> collector = new TimestampedCollector<>(output); InternalTimerService<VoidNamespace> internalTimerService = - getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this); + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); this.timerService = new SimpleTimerService(internalTimerService); } http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java index df2320f..212aafd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators.co; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.SimpleTimerService; @@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> private static final long serialVersionUID = 1L; - private final TypeSerializer<K> keySerializer; - private transient TimestampedCollector<OUT> collector; private transient TimerService timerService; - public CoStreamTimelyFlatMap( - TypeSerializer<K> keySerializer, - TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) { + public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) { super(flatMapper); - - this.keySerializer = keySerializer; } @Override @@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> collector = new TimestampedCollector<>(output); InternalTimerService<VoidNamespace> internalTimerService = - getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this); + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); this.timerService = new SimpleTimerService(internalTimerService); } http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index c465767..229d97d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> timestampedCollector = new TimestampedCollector<>(output); internalTimerService = - getInternalTimerService("window-timers", keySerializer, windowSerializer, this); + getInternalTimerService("window-timers", windowSerializer, this); context = new Context(null, null); http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index fd05353..2fb0089 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest { this.timerService = getInternalTimerService( "test-timers", - IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE, this); } http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java index f3b09eb..46b52ee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java @@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testCurrentEventTime() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); + new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testCurrentProcessingTime() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); + new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testEventTimeTimers() throws Exception { StreamTimelyFlatMap<Integer, Integer, Integer> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); + new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testProcessingTimeTimers() throws Exception { StreamTimelyFlatMap<Integer, Integer, Integer> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); + new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testEventTimeTimerWithState() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); + new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testProcessingTimeTimerWithState() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); + new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger { public void testSnapshotAndRestore() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger { testHarness.close(); - operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); @@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; private final ValueStateDescriptor<Integer> state = - new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); + new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); private final TimeDomain timeDomain; http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java index 25808f4..cb5d6c2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java @@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testCurrentEventTime() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testCurrentProcessingTime() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testEventTimeTimers() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testProcessingTimeTimers() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testEventTimeTimerWithState() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testProcessingTimeTimerWithState() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger { public void testSnapshotAndRestore() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger { testHarness.close(); - operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( operator,
