[hotfix] [Kafka Consumer] Clean up some code confusion and style in the 
Fetchers for Kafka 0.9/0.10


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa1864c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa1864c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa1864c7

Branch: refs/heads/master
Commit: fa1864c7a6eadea55eb2d7e8fd2b72e043841671
Parents: 611412c
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 9 17:58:54 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  6 ++
 .../kafka/internal/Kafka010Fetcher.java         | 39 +++++--------
 .../connectors/kafka/Kafka010FetcherTest.java   |  1 -
 .../kafka/internals/SimpleConsumerThread.java   |  2 +-
 .../kafka/internal/Kafka09Fetcher.java          | 25 +++++---
 .../kafka/internals/AbstractFetcher.java        | 60 ++++++++++++++------
 .../AbstractFetcherTimestampsTest.java          | 53 +++++++++--------
 7 files changed, 107 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 8108afc..04019f8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -48,6 +48,12 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
                        <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
                </dependency>
 
                <!-- Add Kafka 0.10.x as a dependency -->

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 4a1f5f6..024cd38 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -38,6 +38,9 @@ import java.util.Properties;
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer 
API.
  * 
+ * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It 
only additionally
+ * takes the KafkaRecord-attached timestamp and attaches it to the Flink 
records.
+ * 
  * @param <T> The type of elements produced by the fetcher.
  */
 public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -76,37 +79,23 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
        }
 
        @Override
-       protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> 
consumer, List<TopicPartition> topicPartitions) {
-               consumer.assign(topicPartitions);
-       }
+       protected void emitRecord(
+                       T record,
+                       KafkaTopicPartitionState<TopicPartition> partition,
+                       long offset,
+                       ConsumerRecord<?, ?> consumerRecord) throws Exception {
 
-       @Override
-       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
-               // get timestamp from provided ConsumerRecord (only possible 
with kafka 0.10.x)
-               super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+               // we attach the Kafka 0.10 timestamp here
+               emitRecordWithTimestamp(record, partition, offset, 
consumerRecord.timestamp());
        }
 
        /**
-        * Emit record Kafka-timestamp aware.
+        * This method needs to be overridden because Kafka broke binary 
compatibility between 0.9 and 0.10,
+        * changing the List in the signature to a Collection.
         */
        @Override
-       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long 
timestamp) throws Exception {
-               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-                       // fast path logic, in case there are no watermarks
-
-                       // emit the record, using the checkpoint lock to 
guarantee
-                       // atomicity of record emission and offset state update
-                       synchronized (checkpointLock) {
-                               sourceContext.collectWithTimestamp(record, 
timestamp);
-                               partitionState.setOffset(offset);
-                       }
-               }
-               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, timestamp);
-               }
-               else {
-                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, timestamp);
-               }
+       protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) {
+               consumer.assign(topicPartitions);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 718db48..037d25b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -114,7 +114,6 @@ public class Kafka010FetcherTest {
         SourceContext<String> sourceContext = mock(SourceContext.class);
         List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
         KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                 sourceContext,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 1302348..35e491a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
                                                                continue 
partitionsLoop;
                                                        }
                                                        
-                                                       owner.emitRecord(value, 
currentPartition, offset, Long.MIN_VALUE);
+                                                       owner.emitRecord(value, 
currentPartition, offset);
                                                }
                                                else {
                                                        // no longer running

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index a8c0397..acdcb61 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -201,7 +201,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                try {
                        assignPartitionsToConsumer(consumer, 
convertKafkaPartitions(subscribedPartitions()));
 
-
                        if (useMetrics) {
                                final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
                                addOffsetStateGauge(kafkaMetricGroup);
@@ -306,14 +305,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                }
        }
 
-       // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting 
the timestamp and passing it to the emitRecord() method.
-       protected void emitRecord(T record, 
KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
-               emitRecord(record, partition, offset, Long.MIN_VALUE);
+       // 
------------------------------------------------------------------------
+       //  The below methods are overridden in the 0.10 fetcher, which 
otherwise
+       //   reuses most of the 0.9 fetcher behavior
+       // 
------------------------------------------------------------------------
+
+       protected void emitRecord(
+                       T record,
+                       KafkaTopicPartitionState<TopicPartition> partition,
+                       long offset,
+                       @SuppressWarnings("UnusedParameters") ConsumerRecord<?, 
?> consumerRecord) throws Exception {
+
+               // the 0.9 Fetcher does not try to extract a timestamp
+               emitRecord(record, partition, offset);
        }
-       /**
-        * Protected method to make the partition assignment pluggable, for 
different Kafka versions.
-        */
-       protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> 
consumer, List<TopicPartition> topicPartitions) {
+
+       protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) {
                consumer.assign(topicPartitions);
        }
 
@@ -322,7 +329,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        }
 
        // 
------------------------------------------------------------------------
-       //  Kafka 0.9 specific fetcher behavior
+       //  Implement Methods of the AbstractFetcher
        // 
------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3350b06..cf39606 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -205,32 +205,60 @@ public abstract class AbstractFetcher<T, KPH> {
                        }
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //  emitting records
        // 
------------------------------------------------------------------------
 
        /**
+        * Emits a record without attaching an existing timestamp to it.
+        * 
         * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
         * That makes the fast path efficient, the extended paths are called as 
separate methods.
+        * 
         * @param record The record to emit
         * @param partitionState The state of the Kafka partition from which 
the record was fetched
         * @param offset The offset of the record
-        * @param timestamp The record's event-timestamp
         */
-       protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset, long timestamp) throws Exception {
+       protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) throws Exception {
                if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                        // fast path logic, in case there are no watermarks
 
                        // emit the record, using the checkpoint lock to 
guarantee
                        // atomicity of record emission and offset state update
                        synchronized (checkpointLock) {
-                               if(timestamp != Long.MIN_VALUE) {
-                                       // this case is true for Kafka 0.10
-                                       
sourceContext.collectWithTimestamp(record, timestamp);
-                               } else {
-                                       sourceContext.collect(record);
-                               }
+                               sourceContext.collect(record);
+                               partitionState.setOffset(offset);
+                       }
+               }
+               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
+               }
+               else {
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
+               }
+       }
+
+       /**
+        * Emits a record attaching a timestamp to it.
+        *
+        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
+        * That makes the fast path efficient, the extended paths are called as 
separate methods.
+        *
+        * @param record The record to emit
+        * @param partitionState The state of the Kafka partition from which 
the record was fetched
+        * @param offset The offset of the record
+        */
+       protected void emitRecordWithTimestamp(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long timestamp) throws Exception {
+
+               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+                       // fast path logic, in case there are no watermarks 
generated in the fetcher
+
+                       // emit the record, using the checkpoint lock to 
guarantee
+                       // atomicity of record emission and offset state update
+                       synchronized (checkpointLock) {
+                               sourceContext.collectWithTimestamp(record, 
timestamp);
                                partitionState.setOffset(offset);
                        }
                }
@@ -285,14 +313,14 @@ public abstract class AbstractFetcher<T, KPH> {
                // from the punctuated extractor
                final long timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
                final Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-                       
+
                // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
                // atomicity of record emission and offset state update 
                synchronized (checkpointLock) {
                        sourceContext.collectWithTimestamp(record, timestamp);
                        partitionState.setOffset(offset);
                }
-               
+
                // if we also have a new per-partition watermark, check if that 
is also a
                // new cross-partition watermark
                if (newWatermark != null) {
@@ -306,7 +334,7 @@ public abstract class AbstractFetcher<T, KPH> {
        private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
                if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
                        long newMin = Long.MAX_VALUE;
-                       
+
                        for (KafkaTopicPartitionState<?> state : allPartitions) 
{
                                @SuppressWarnings("unchecked")
                                final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
@@ -314,7 +342,7 @@ public abstract class AbstractFetcher<T, KPH> {
                                
                                newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
                        }
-                       
+
                        // double-check locking pattern
                        if (newMin > maxWatermarkSoFar) {
                                synchronized (checkpointLock) {
@@ -416,7 +444,7 @@ public abstract class AbstractFetcher<T, KPH> {
                // add current offsets to gage
                MetricGroup currentOffsets = 
metricGroup.addGroup("current-offsets");
                MetricGroup committedOffsets = 
metricGroup.addGroup("committed-offsets");
-               for(KafkaTopicPartitionState ktp: subscribedPartitions()){
+               for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
                        currentOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
                        committedOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
                }
@@ -435,10 +463,10 @@ public abstract class AbstractFetcher<T, KPH> {
         */
        private static class OffsetGauge implements Gauge<Long> {
 
-               private final KafkaTopicPartitionState ktp;
+               private final KafkaTopicPartitionState<?> ktp;
                private final OffsetGaugeType gaugeType;
 
-               public OffsetGauge(KafkaTopicPartitionState ktp, 
OffsetGaugeType gaugeType) {
+               public OffsetGauge(KafkaTopicPartitionState<?> ktp, 
OffsetGaugeType gaugeType) {
                        this.ktp = ktp;
                        this.gaugeType = gaugeType;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 5801c24..0b3507a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -67,22 +66,22 @@ public class AbstractFetcherTimestampsTest {
                // elements generate a watermark if the timestamp is a multiple 
of three
                
                // elements for partition 1
-               fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-               fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-               fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
                assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
                assertFalse(sourceContext.hasWatermark());
 
                // elements for partition 2
-               fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(12L, part2, 1L);
                assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
                assertFalse(sourceContext.hasWatermark());
 
                // elements for partition 3
-               fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-               fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
                assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
                
@@ -91,25 +90,25 @@ public class AbstractFetcherTimestampsTest {
                assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
                
                // advance partition 3
-               fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-               fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-               fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
                assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
 
                // advance partition 1 beyond partition 2 - this bumps the 
watermark
-               fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+               fetcher.emitRecord(30L, part1, 4L);
                assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
                assertTrue(sourceContext.hasWatermark());
                assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
 
                // advance partition 2 again - this bumps the watermark
-               fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(13L, part2, 2L);
                assertFalse(sourceContext.hasWatermark());
-               fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(14L, part2, 3L);
                assertFalse(sourceContext.hasWatermark());
-               fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(15L, part2, 3L);
                assertTrue(sourceContext.hasWatermark());
                assertEquals(15L, 
sourceContext.getLatestWatermark().getTimestamp());
        }
@@ -141,20 +140,20 @@ public class AbstractFetcherTimestampsTest {
                // elements generate a watermark if the timestamp is a multiple 
of three
 
                // elements for partition 1
-               fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-               fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-               fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
                assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
 
                // elements for partition 2
-               fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(12L, part2, 1L);
                assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
 
                // elements for partition 3
-               fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-               fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
                assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
 
@@ -164,14 +163,14 @@ public class AbstractFetcherTimestampsTest {
                assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
 
                // advance partition 3
-               fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-               fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-               fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
                assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
 
                // advance partition 1 beyond partition 2 - this bumps the 
watermark
-               fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+               fetcher.emitRecord(30L, part1, 4L);
                assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
                assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
 
@@ -181,9 +180,9 @@ public class AbstractFetcherTimestampsTest {
                assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
 
                // advance partition 2 again - this bumps the watermark
-               fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
-               fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
-               fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(13L, part2, 2L);
+               fetcher.emitRecord(14L, part2, 3L);
+               fetcher.emitRecord(15L, part2, 3L);
 
                processingTimeService.setCurrentTime(30);
                // this blocks until the periodic thread emitted the watermark

Reply via email to