[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4 Branch: refs/heads/master Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1 Parents: e40e29d Author: Stephan Ewen <[email protected]> Authored: Wed Apr 13 15:45:51 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 20:50:49 2016 +0200 ---------------------------------------------------------------------- .../kafka/internals/AbstractFetcher.java | 77 +++-- .../kafka/internals/ExceptionProxy.java | 60 +++- .../connectors/kafka/util/KafkaUtils.java | 33 +- .../AbstractFetcherTimestampsTest.java | 306 +++++++++++++++++++ .../kafka/testutils/MockRuntimeContext.java | 46 ++- 5 files changed, 478 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 594aa66..8183575 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -328,45 +328,66 @@ public abstract class AbstractFetcher<T, KPH> { ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - @SuppressWarnings("unchecked") - KafkaTopicPartitionState<KPH>[] partitions = - (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()]; - - int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - // create the kafka version specific partition handle - KPH kafkaHandle = createKafkaPartitionHandle(partition); + switch (timestampWatermarkMode) { - // create the partition state - KafkaTopicPartitionState<KPH> partitionState; - switch (timestampWatermarkMode) { - case NO_TIMESTAMPS_WATERMARKS: - partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle); - break; - case PERIODIC_WATERMARKS: { + case NO_TIMESTAMPS_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionState<KPH>[] partitions = + (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + // create the kafka version specific partition handle + KPH kafkaHandle = createKafkaPartitionHandle(partition); + partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle); + } + + return partitions; + } + + case PERIODIC_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[]) + new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + KPH kafkaHandle = createKafkaPartitionHandle(partition); + AssignerWithPeriodicWatermarks<T> assignerInstance = watermarksPeriodic.deserializeValue(userCodeClassLoader); - partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( + + partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( partition, kafkaHandle, assignerInstance); - break; } - - case PUNCTUATED_WATERMARKS: { + + return partitions; + } + + case PUNCTUATED_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[]) + new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + KPH kafkaHandle = createKafkaPartitionHandle(partition); + AssignerWithPunctuatedWatermarks<T> assignerInstance = watermarksPunctuated.deserializeValue(userCodeClassLoader); - partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( + + partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( partition, kafkaHandle, assignerInstance); - break; } - default: - // cannot happen, add this as a guard for the future - throw new RuntimeException(); - } - partitions[pos++] = partitionState; + return partitions; + } + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } - - return partitions; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java index 9a0e4e3..c736493 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java @@ -22,7 +22,48 @@ import javax.annotation.Nullable; import java.util.concurrent.atomic.AtomicReference; /** + * A proxy that communicates exceptions between threads. Typically used if an exception + * from a spawned thread needs to be recognized by the "parent" (spawner) thread. * + * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}. + * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}. + * Optionally, the parent can pass itself in the constructor to be interrupted as soon as + * an exception occurs. + * + * <pre> + * {@code + * + * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread()); + * + * Thread subThread = new Thread() { + * + * public void run() { + * try { + * doSomething(); + * } catch (Throwable t) { + * errorProxy.reportError( + * } finally { + * doSomeCleanup(); + * } + * } + * }; + * subThread.start(); + * + * doSomethingElse(); + * errorProxy.checkAndThrowException(); + * + * doSomethingMore(); + * errorProxy.checkAndThrowException(); + * + * try { + * subThread.join(); + * } catch (InterruptedException e) { + * errorProxy.checkAndThrowException(); + * // restore interrupted status, if not caused by an exception + * Thread.currentThread().interrupt(); + * } + * } + * </pre> */ public class ExceptionProxy { @@ -33,6 +74,8 @@ public class ExceptionProxy { private final AtomicReference<Throwable> exception; /** + * Creates an exception proxy that interrupts the given thread upon + * report of an exception. The thread to interrupt may be null. * * @param toInterrupt The thread to interrupt upon an exception. May be null. */ @@ -44,18 +87,27 @@ public class ExceptionProxy { // ------------------------------------------------------------------------ /** - * Sets the exception occurred and interrupts the target thread, + * Sets the exception and interrupts the target thread, * if no other exception has occurred so far. * + * <p>The exception is only set (and the interruption is only triggered), + * if no other exception was set before. + * * @param t The exception that occurred */ public void reportError(Throwable t) { - // set the exception, if it is the first - if (exception.compareAndSet(null, t) && toInterrupt != null) { + // set the exception, if it is the first (and the exception is non null) + if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) { toInterrupt.interrupt(); } } - + + /** + * Checks whether an exception has been set via {@link #reportError(Throwable)}. + * If yes, that exception if re-thrown by this method. + * + * @throws Exception This method re-throws the exception, if set. + */ public void checkAndThrowException() throws Exception { Throwable t = exception.get(); if (t != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java index bda90bd..fc07247 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java @@ -19,23 +19,36 @@ package org.apache.flink.streaming.connectors.kafka.util; import java.util.Properties; +/** + * Simple utilities, used by the Flink Kafka Consumers. + */ public class KafkaUtils { public static int getIntFromConfig(Properties config, String key, int defaultValue) { - try { - return Integer.parseInt(config.getProperty(key, Integer.toString(defaultValue))); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + - "Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'"); + String val = config.getProperty(key); + if (val == null) { + return defaultValue; + } else { + try { + return Integer.parseInt(val); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + + "Entered value='" + val + "'. Default value='" + defaultValue + "'"); + } } } public static long getLongFromConfig(Properties config, String key, long defaultValue) { - try { - return Long.parseLong(config.getProperty(key, Long.toString(defaultValue))); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + - "Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'"); + String val = config.getProperty(key); + if (val == null) { + return defaultValue; + } else { + try { + return Long.parseLong(val); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + + "Entered value='" + val + "'. Default value='" + defaultValue + "'"); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java new file mode 100644 index 0000000..c073a04 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class AbstractFetcherTimestampsTest { + + @Test + public void testPunctuatedWatermarks() throws Exception { + List<KafkaTopicPartition> originalPartitions = Arrays.asList( + new KafkaTopicPartition("test topic name", 7), + new KafkaTopicPartition("test topic name", 13), + new KafkaTopicPartition("test topic name", 21)); + + TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, originalPartitions, null, + new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), + new MockRuntimeContext(17, 3)); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + // now, we should have a watermark + assertTrue(sourceContext.hasWatermark()); + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + assertTrue(sourceContext.hasWatermark()); + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L); + assertFalse(sourceContext.hasWatermark()); + fetcher.emitRecord(14L, part2, 3L); + assertFalse(sourceContext.hasWatermark()); + fetcher.emitRecord(15L, part2, 3L); + assertTrue(sourceContext.hasWatermark()); + assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); + } + + @Test + public void testPeriodicWatermarks() throws Exception { + ExecutionConfig config = new ExecutionConfig(); + config.setAutoWatermarkInterval(10); + + List<KafkaTopicPartition> originalPartitions = Arrays.asList( + new KafkaTopicPartition("test topic name", 7), + new KafkaTopicPartition("test topic name", 13), + new KafkaTopicPartition("test topic name", 21)); + + TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, originalPartitions, + new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), + null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock())); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + + // this blocks until the periodic thread emitted the watermark + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L); + fetcher.emitRecord(14L, part2, 3L); + fetcher.emitRecord(15L, part2, 3L); + + // this blocks until the periodic thread emitted the watermark + long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); + assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); + } + + // ------------------------------------------------------------------------ + // Test mocks + // ------------------------------------------------------------------------ + + private static final class TestFetcher<T> extends AbstractFetcher<T, Object> { + + protected TestFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception + { + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext); + } + + @Override + public void runFetchLoop() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override + public Object createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new Object(); + } + + @Override + public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + throw new UnsupportedOperationException(); + } + } + + // ------------------------------------------------------------------------ + + private static final class TestSourceContext<T> implements SourceContext<T> { + + private final Object checkpointLock = new Object(); + private final Object watermarkLock = new Object(); + + private volatile StreamRecord<T> latestElement; + private volatile Watermark currentWatermark; + + @Override + public void collect(T element) { + throw new UnsupportedOperationException(); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + this.latestElement = new StreamRecord<T>(element, timestamp); + } + + @Override + public void emitWatermark(Watermark mark) { + synchronized (watermarkLock) { + currentWatermark = mark; + watermarkLock.notifyAll(); + } + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() {} + + public StreamRecord<T> getLatestElement() { + return latestElement; + } + + public boolean hasWatermark() { + return currentWatermark != null; + } + + public Watermark getLatestWatermark() throws InterruptedException { + synchronized (watermarkLock) { + while (currentWatermark == null) { + watermarkLock.wait(); + } + Watermark wm = currentWatermark; + currentWatermark = null; + return wm; + } + } + } + + // ------------------------------------------------------------------------ + + private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> { + + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + maxTimestamp = Math.max(maxTimestamp, element); + return element; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(maxTimestamp); + } + } + + private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index e74eee4..3e46503 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -37,24 +37,43 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.operators.Triggerable; import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; @SuppressWarnings("deprecation") public class MockRuntimeContext extends StreamingRuntimeContext { private final int numberOfParallelSubtasks; private final int indexOfThisSubtask; + + private final ExecutionConfig execConfig; + private final Object checkpointLock; + private ScheduledExecutorService timer; + public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { + this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null); + } + + public MockRuntimeContext( + int numberOfParallelSubtasks, int indexOfThisSubtask, + ExecutionConfig execConfig, + Object checkpointLock) { super(new MockStreamOperator(), new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), Collections.<String, Accumulator<?, ?>>emptyMap()); + this.numberOfParallelSubtasks = numberOfParallelSubtasks; this.indexOfThisSubtask = indexOfThisSubtask; + this.execConfig = execConfig; + this.checkpointLock = checkpointLock; } @Override @@ -64,7 +83,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public String getTaskName() { - return null; + return "mock task"; } @Override @@ -84,7 +103,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public ExecutionConfig getExecutionConfig() { - throw new UnsupportedOperationException(); + return execConfig; } @Override @@ -167,6 +186,29 @@ public class MockRuntimeContext extends StreamingRuntimeContext { throw new UnsupportedOperationException(); } + @Override + public void registerTimer(final long time, final Triggerable target) { + if (timer == null) { + timer = Executors.newSingleThreadScheduledExecutor(); + } + + final long delay = Math.max(time - System.currentTimeMillis(), 0); + + timer.schedule(new Runnable() { + @Override + public void run() { + synchronized (checkpointLock) { + try { + target.trigger(time); + } catch (Throwable t) { + System.err.println("!!! Caught exception while processing timer. !!!"); + t.printStackTrace(); + } + } + } + }, delay, TimeUnit.MILLISECONDS); + } + // ------------------------------------------------------------------------ private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
