[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer This closes #1877
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da Branch: refs/heads/master Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530 Parents: 8570b6d Author: Aljoscha Krettek <[email protected]> Authored: Wed Apr 13 11:41:39 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 20:50:49 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumerBase.java | 4 ++-- .../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 12 ++++++------ .../connectors/kafka/KafkaConsumerTestBase.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 0ca8fd5..ed5c72f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) { + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) { checkNotNull(assigner); if (this.periodicWatermarkAssigner != null) { @@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) { + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) { checkNotNull(assigner); if (this.punctuatedWatermarkAssigner != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index f4ef995..9b517df 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest { @Test public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null); + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null); + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null); fail(); } catch (NullPointerException ignored) {} @@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest { final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(); - c1.setPeriodicWatermarkEmitter(periodicAssigner); + c1.assignTimestampsAndWatermarks(periodicAssigner); try { - c1.setPunctuatedWatermarkEmitter(punctuatedAssigner); + c1.assignTimestampsAndWatermarks(punctuatedAssigner); fail(); } catch (IllegalStateException ignored) {} DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(); - c2.setPunctuatedWatermarkEmitter(punctuatedAssigner); + c2.assignTimestampsAndWatermarks(punctuatedAssigner); try { - c2.setPeriodicWatermarkEmitter(periodicAssigner); + c2.assignTimestampsAndWatermarks(periodicAssigner); fail(); } catch (IllegalStateException ignored) {} } http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index a65a411..cc9205c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1662,7 +1662,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer .getConsumer(topics, sourceSchema, standardProps) - .setPunctuatedWatermarkEmitter(new TestPunctuatedTSExtractor()); + .assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor()); DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
