[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4

Branch: refs/heads/master
Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1
Parents: e40e29d
Author: Stephan Ewen <[email protected]>
Authored: Wed Apr 13 15:45:51 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../kafka/internals/AbstractFetcher.java        |  77 +++--
 .../kafka/internals/ExceptionProxy.java         |  60 +++-
 .../connectors/kafka/util/KafkaUtils.java       |  33 +-
 .../AbstractFetcherTimestampsTest.java          | 306 +++++++++++++++++++
 .../kafka/testutils/MockRuntimeContext.java     |  46 ++-
 5 files changed, 478 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 594aa66..8183575 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -328,45 +328,66 @@ public abstract class AbstractFetcher<T, KPH> {
                        ClassLoader userCodeClassLoader)
                throws IOException, ClassNotFoundException
        {
-               @SuppressWarnings("unchecked")
-               KafkaTopicPartitionState<KPH>[] partitions =
-                               (KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
-               int pos = 0;
-               for (KafkaTopicPartition partition : assignedPartitions) {
-                       // create the kafka version specific partition handle
-                       KPH kafkaHandle = createKafkaPartitionHandle(partition);
+               switch (timestampWatermarkMode) {
                        
-                       // create the partition state
-                       KafkaTopicPartitionState<KPH> partitionState;
-                       switch (timestampWatermarkMode) {
-                               case NO_TIMESTAMPS_WATERMARKS:
-                                       partitionState = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
-                                       break;
-                               case PERIODIC_WATERMARKS: {
+                       case NO_TIMESTAMPS_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               KafkaTopicPartitionState<KPH>[] partitions =
+                                               
(KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       // create the kafka version specific 
partition handle
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+                                       partitions[pos++] = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
+                               }
+
+                               return partitions;
+                       }
+
+                       case PERIODIC_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+                                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+                                                               new 
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+
                                        AssignerWithPeriodicWatermarks<T> 
assignerInstance =
                                                        
watermarksPeriodic.deserializeValue(userCodeClassLoader);
-                                       partitionState = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+                                       
+                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                                                        partition, kafkaHandle, 
assignerInstance);
-                                       break;
                                }
-                                       
-                               case PUNCTUATED_WATERMARKS: {
+
+                               return partitions;
+                       }
+
+                       case PUNCTUATED_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+                                                               new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<?, 
?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+
                                        AssignerWithPunctuatedWatermarks<T> 
assignerInstance =
                                                        
watermarksPunctuated.deserializeValue(userCodeClassLoader);
-                                       partitionState = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+
+                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
                                                        partition, kafkaHandle, 
assignerInstance);
-                                       break;
                                }
-                               default:
-                                       // cannot happen, add this as a guard 
for the future
-                                       throw new RuntimeException();
-                       }
 
-                       partitions[pos++] = partitionState;
+                               return partitions;
+                       }
+                       default:
+                               // cannot happen, add this as a guard for the 
future
+                               throw new RuntimeException();
                }
-               
-               return partitions;
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index 9a0e4e3..c736493 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -22,7 +22,48 @@ import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
+ * A proxy that communicates exceptions between threads. Typically used if an 
exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) 
thread.
  * 
+ * <p>The spawned thread would set the exception via {@link 
#reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link 
#checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted 
as soon as
+ * an exception occurs.
+ * 
+ * <pre>
+ * {@code
+ * 
+ * final ExceptionProxy errorProxy = new 
ExceptionProxy(Thread.currentThread());
+ * 
+ * Thread subThread = new Thread() {
+ * 
+ *     public void run() {
+ *         try {
+ *             doSomething();
+ *         } catch (Throwable t) {
+ *             errorProxy.reportError(
+ *         } finally {
+ *             doSomeCleanup();
+ *         }
+ *     }
+ * };
+ * subThread.start();
+ * 
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ * 
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ * 
+ * try {
+ *     subThread.join();
+ * } catch (InterruptedException e) {
+ *     errorProxy.checkAndThrowException();
+ *     // restore interrupted status, if not caused by an exception
+ *     Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
  */
 public class ExceptionProxy {
        
@@ -33,6 +74,8 @@ public class ExceptionProxy {
        private final AtomicReference<Throwable> exception;
 
        /**
+        * Creates an exception proxy that interrupts the given thread upon
+        * report of an exception. The thread to interrupt may be null.
         * 
         * @param toInterrupt The thread to interrupt upon an exception. May be 
null.
         */
@@ -44,18 +87,27 @@ public class ExceptionProxy {
        // 
------------------------------------------------------------------------
        
        /**
-        * Sets the exception occurred and interrupts the target thread,
+        * Sets the exception and interrupts the target thread,
         * if no other exception has occurred so far.
         * 
+        * <p>The exception is only set (and the interruption is only 
triggered),
+        * if no other exception was set before.
+        * 
         * @param t The exception that occurred
         */
        public void reportError(Throwable t) {
-               // set the exception, if it is the first
-               if (exception.compareAndSet(null, t) && toInterrupt != null) {
+               // set the exception, if it is the first (and the exception is 
non null)
+               if (t != null && exception.compareAndSet(null, t) && 
toInterrupt != null) {
                        toInterrupt.interrupt();
                }
        }
-       
+
+       /**
+        * Checks whether an exception has been set via {@link 
#reportError(Throwable)}.
+        * If yes, that exception if re-thrown by this method.
+        * 
+        * @throws Exception This method re-throws the exception, if set.
+        */
        public void checkAndThrowException() throws Exception {
                Throwable t = exception.get();
                if (t != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index bda90bd..fc07247 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -19,23 +19,36 @@ package org.apache.flink.streaming.connectors.kafka.util;
 
 import java.util.Properties;
 
+/**
+ * Simple utilities, used by the Flink Kafka Consumers.
+ */
 public class KafkaUtils {
 
        public static int getIntFromConfig(Properties config, String key, int 
defaultValue) {
-               try {
-                       return Integer.parseInt(config.getProperty(key, 
Integer.toString(defaultValue)));
-               } catch(NumberFormatException nfe) {
-                       throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
-                                       "Entered value='" + 
config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+               String val = config.getProperty(key);
+               if (val == null) {
+                       return defaultValue;
+               } else {
+                       try {
+                               return Integer.parseInt(val);
+                       } catch (NumberFormatException nfe) {
+                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
+                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
+                       }
                }
        }
 
        public static long getLongFromConfig(Properties config, String key, 
long defaultValue) {
-               try {
-                       return Long.parseLong(config.getProperty(key, 
Long.toString(defaultValue)));
-               } catch(NumberFormatException nfe) {
-                       throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
-                                       "Entered value='" + 
config.getProperty(key) + "'. Default value='" + defaultValue + "'");
+               String val = config.getProperty(key);
+               if (val == null) {
+                       return defaultValue;
+               } else {
+                       try {
+                               return Long.parseLong(val);
+                       } catch (NumberFormatException nfe) {
+                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
+                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..c073a04
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+       
+       @Test
+       public void testPunctuatedWatermarks() throws Exception {
+               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+                               new KafkaTopicPartition("test topic name", 7),
+                               new KafkaTopicPartition("test topic name", 13),
+                               new KafkaTopicPartition("test topic name", 21));
+
+               TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
+
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                               sourceContext, originalPartitions, null,
+                               new 
SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new 
PunctuatedTestExtractor()),
+                               new MockRuntimeContext(17, 3));
+
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+
+               // elements generate a watermark if the timestamp is a multiple 
of three
+               
+               // elements for partition 1
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
+               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+               assertFalse(sourceContext.hasWatermark());
+
+               // elements for partition 2
+               fetcher.emitRecord(12L, part2, 1L);
+               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+               assertFalse(sourceContext.hasWatermark());
+
+               // elements for partition 3
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
+               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+               
+               // now, we should have a watermark
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+               
+               // advance partition 3
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
+               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // advance partition 1 beyond partition 2 - this bumps the 
watermark
+               fetcher.emitRecord(30L, part1, 4L);
+               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 2 again - this bumps the watermark
+               fetcher.emitRecord(13L, part2, 2L);
+               assertFalse(sourceContext.hasWatermark());
+               fetcher.emitRecord(14L, part2, 3L);
+               assertFalse(sourceContext.hasWatermark());
+               fetcher.emitRecord(15L, part2, 3L);
+               assertTrue(sourceContext.hasWatermark());
+               assertEquals(15L, 
sourceContext.getLatestWatermark().getTimestamp());
+       }
+       
+       @Test
+       public void testPeriodicWatermarks() throws Exception {
+               ExecutionConfig config = new ExecutionConfig();
+               config.setAutoWatermarkInterval(10);
+               
+               List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+                               new KafkaTopicPartition("test topic name", 7),
+                               new KafkaTopicPartition("test topic name", 13),
+                               new KafkaTopicPartition("test topic name", 21));
+
+               TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
+
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                               sourceContext, originalPartitions,
+                               new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
+                               null, new MockRuntimeContext(17, 3, config, 
sourceContext.getCheckpointLock()));
+
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+
+               // elements generate a watermark if the timestamp is a multiple 
of three
+
+               // elements for partition 1
+               fetcher.emitRecord(1L, part1, 1L);
+               fetcher.emitRecord(2L, part1, 2L);
+               fetcher.emitRecord(3L, part1, 3L);
+               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 2
+               fetcher.emitRecord(12L, part2, 1L);
+               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 3
+               fetcher.emitRecord(101L, part3, 1L);
+               fetcher.emitRecord(102L, part3, 2L);
+               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
+               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 3
+               fetcher.emitRecord(1003L, part3, 3L);
+               fetcher.emitRecord(1004L, part3, 4L);
+               fetcher.emitRecord(1005L, part3, 5L);
+               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // advance partition 1 beyond partition 2 - this bumps the 
watermark
+               fetcher.emitRecord(30L, part1, 4L);
+               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+               
+               // this blocks until the periodic thread emitted the watermark
+               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 2 again - this bumps the watermark
+               fetcher.emitRecord(13L, part2, 2L);
+               fetcher.emitRecord(14L, part2, 3L);
+               fetcher.emitRecord(15L, part2, 3L);
+
+               // this blocks until the periodic thread emitted the watermark
+               long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
+               assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test mocks
+       // 
------------------------------------------------------------------------
+
+       private static final class TestFetcher<T> extends AbstractFetcher<T, 
Object> {
+
+               protected TestFetcher(
+                               SourceContext<T> sourceContext,
+                               List<KafkaTopicPartition> assignedPartitions,
+                               
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+                               
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+                               StreamingRuntimeContext runtimeContext) throws 
Exception
+               {
+                       super(sourceContext, assignedPartitions, 
watermarksPeriodic, watermarksPunctuated, runtimeContext);
+               }
+
+               @Override
+               public void runFetchLoop() throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void cancel() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+                       return new Object();
+               }
+
+               @Override
+               public void 
commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class TestSourceContext<T> implements 
SourceContext<T> {
+
+               private final Object checkpointLock = new Object();
+               private final Object watermarkLock = new Object();
+
+               private volatile StreamRecord<T> latestElement;
+               private volatile Watermark currentWatermark;
+
+               @Override
+               public void collect(T element) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       this.latestElement = new StreamRecord<T>(element, 
timestamp);
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       synchronized (watermarkLock) {
+                               currentWatermark = mark;
+                               watermarkLock.notifyAll();
+                       }
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return checkpointLock;
+               }
+
+               @Override
+               public void close() {}
+
+               public StreamRecord<T> getLatestElement() {
+                       return latestElement;
+               }
+
+               public boolean hasWatermark() {
+                       return currentWatermark != null;
+               }
+               
+               public Watermark getLatestWatermark() throws 
InterruptedException {
+                       synchronized (watermarkLock) {
+                               while (currentWatermark == null) {
+                                       watermarkLock.wait();
+                               }
+                               Watermark wm = currentWatermark;
+                               currentWatermark = null;
+                               return wm;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class PeriodicTestExtractor implements 
AssignerWithPeriodicWatermarks<Long> {
+
+               private volatile long maxTimestamp = Long.MIN_VALUE;
+               
+               @Override
+               public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                       maxTimestamp = Math.max(maxTimestamp, element);
+                       return element;
+               }
+
+               @Nullable
+               @Override
+               public Watermark getCurrentWatermark() {
+                       return new Watermark(maxTimestamp);
+               }
+       }
+
+       private static class PunctuatedTestExtractor implements 
AssignerWithPunctuatedWatermarks<Long> {
+
+               @Override
+               public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                       return element;
+               }
+
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(Long lastElement, 
long extractedTimestamp) {
+                       return extractedTimestamp % 3 == 0 ? new 
Watermark(extractedTimestamp) : null;
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e74eee4..3e46503 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -37,24 +37,43 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
        private final int numberOfParallelSubtasks;
        private final int indexOfThisSubtask;
+       
+       private final ExecutionConfig execConfig;
+       private final Object checkpointLock;
 
+       private ScheduledExecutorService timer;
+       
        public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
+               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig(), null);
+       }
+       
+       public MockRuntimeContext(
+                       int numberOfParallelSubtasks, int indexOfThisSubtask, 
+                       ExecutionConfig execConfig,
+                       Object checkpointLock) {
                super(new MockStreamOperator(),
                                new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
                                Collections.<String, Accumulator<?, 
?>>emptyMap());
+               
                this.numberOfParallelSubtasks = numberOfParallelSubtasks;
                this.indexOfThisSubtask = indexOfThisSubtask;
+               this.execConfig = execConfig;
+               this.checkpointLock = checkpointLock;
        }
 
        @Override
@@ -64,7 +83,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
 
        @Override
        public String getTaskName() {
-               return null;
+               return "mock task";
        }
 
        @Override
@@ -84,7 +103,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
 
        @Override
        public ExecutionConfig getExecutionConfig() {
-               throw new UnsupportedOperationException();
+               return execConfig;
        }
 
        @Override
@@ -167,6 +186,29 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
                throw new UnsupportedOperationException();
        }
        
+       @Override
+       public void registerTimer(final long time, final Triggerable target) {
+               if (timer == null) {
+                       timer = Executors.newSingleThreadScheduledExecutor();
+               }
+               
+               final long delay = Math.max(time - System.currentTimeMillis(), 
0);
+
+               timer.schedule(new Runnable() {
+                       @Override
+                       public void run() {
+                               synchronized (checkpointLock) {
+                                       try {
+                                               target.trigger(time);
+                                       } catch (Throwable t) {
+                                               System.err.println("!!! Caught 
exception while processing timer. !!!");
+                                               t.printStackTrace();
+                                       }
+                               }
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+       }
+
        // 
------------------------------------------------------------------------
 
        private static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {

Reply via email to