[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer

This closes #1877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da

Branch: refs/heads/master
Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530
Parents: 8570b6d
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Apr 13 11:41:39 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java        |  4 ++--
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java    | 12 ++++++------
 .../connectors/kafka/KafkaConsumerTestBase.java         |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0ca8fd5..ed5c72f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * @param assigner The timestamp assigner / watermark generator to use.
         * @return The consumer object, to allow function chaining.   
         */
-       public FlinkKafkaConsumerBase<T> 
setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
                checkNotNull(assigner);
                
                if (this.periodicWatermarkAssigner != null) {
@@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * @param assigner The timestamp assigner / watermark generator to use.
         * @return The consumer object, to allow function chaining.   
         */
-       public FlinkKafkaConsumerBase<T> 
setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
                checkNotNull(assigner);
                
                if (this.punctuatedWatermarkAssigner != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f4ef995..9b517df 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest {
        @Test
        public void testEitherWatermarkExtractor() {
                try {
-                       new 
DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>)
 null);
                        fail();
                } catch (NullPointerException ignored) {}
 
                try {
-                       new 
DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>)
 null);
                        fail();
                } catch (NullPointerException ignored) {}
                
@@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest {
                final AssignerWithPunctuatedWatermarks<String> 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
                
                DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>();
-               c1.setPeriodicWatermarkEmitter(periodicAssigner);
+               c1.assignTimestampsAndWatermarks(periodicAssigner);
                try {
-                       c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+                       c1.assignTimestampsAndWatermarks(punctuatedAssigner);
                        fail();
                } catch (IllegalStateException ignored) {}
 
                DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>();
-               c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+               c2.assignTimestampsAndWatermarks(punctuatedAssigner);
                try {
-                       c2.setPeriodicWatermarkEmitter(periodicAssigner);
+                       c2.assignTimestampsAndWatermarks(periodicAssigner);
                        fail();
                } catch (IllegalStateException ignored) {}
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index a65a411..cc9205c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1662,7 +1662,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = 
kafkaServer
                        .getConsumer(topics, sourceSchema, standardProps)
-                       .setPunctuatedWatermarkEmitter(new 
TestPunctuatedTSExtractor());
+                       .assignTimestampsAndWatermarks(new 
TestPunctuatedTSExtractor());
 
                DataStreamSource<Tuple2<Long, Integer>> consuming = 
env.setParallelism(1).addSource(source);
 

Reply via email to