Repository: flink Updated Branches: refs/heads/master 71d2e3ef1 -> 770f2f83a
[FLINK-4877] Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0859a698 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0859a698 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0859a698 Branch: refs/heads/master Commit: 0859a698253f07a28442ee7232e1adb76013dbd3 Parents: 3055475 Author: Aljoscha Krettek <[email protected]> Authored: Wed Sep 28 15:10:35 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:04 2016 +0200 ---------------------------------------------------------------------- .../flink-connector-kafka-0.10/pom.xml | 8 + .../connectors/kafka/FlinkKafkaConsumer010.java | 19 +- .../kafka/internal/Kafka010Fetcher.java | 26 ++- .../connectors/kafka/Kafka010FetcherTest.java | 33 ++- .../kafka/KafkaTestEnvironmentImpl.java | 9 + .../flink-connector-kafka-0.8/pom.xml | 10 +- .../kafka/internals/Kafka08Fetcher.java | 10 +- .../connectors/kafka/KafkaProducerTest.java | 32 +-- .../kafka/KafkaTestEnvironmentImpl.java | 16 ++ .../flink-connector-kafka-0.9/pom.xml | 8 + .../connectors/kafka/FlinkKafkaConsumer09.java | 19 +- .../kafka/internal/Kafka09Fetcher.java | 38 +++- .../connectors/kafka/Kafka09FetcherTest.java | 38 +++- .../connectors/kafka/KafkaProducerTest.java | 30 +-- .../kafka/KafkaTestEnvironmentImpl.java | 12 ++ .../kafka/internals/AbstractFetcher.java | 13 +- .../kafka/AtLeastOnceProducerTest.java | 25 ++- .../connectors/kafka/KafkaTestEnvironment.java | 5 + .../AbstractFetcherTimestampsTest.java | 151 +++++++------- .../kafka/testutils/DataGenerators.java | 25 ++- .../kafka/testutils/MockRuntimeContext.java | 209 ------------------- .../util/OneInputStreamOperatorTestHarness.java | 14 +- 22 files changed, 383 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml index 0b426b5..8108afc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml @@ -72,6 +72,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>${project.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 267ff57..a9ce336 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -134,9 +134,20 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); - return new Kafka010Fetcher<>(sourceContext, thisSubtaskPartitions, - watermarksPeriodic, watermarksPunctuated, - runtimeContext, deserializer, - properties, pollTimeout, useMetrics); + return new Kafka010Fetcher<>( + sourceContext, + thisSubtaskPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + runtimeContext.isCheckpointingEnabled(), + runtimeContext.getTaskNameWithSubtasks(), + runtimeContext.getMetricGroup(), + deserializer, + properties, + pollTimeout, + useMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index 47bee22..4a1f5f6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.connectors.kafka.internal; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; @@ -46,13 +47,32 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + boolean enableCheckpointing, + String taskNameWithSubtasks, + MetricGroup metricGroup, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, boolean useMetrics) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, pollTimeout, useMetrics); + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + enableCheckpointing, + taskNameWithSubtasks, + metricGroup, + deserializer, + kafkaProperties, + pollTimeout, + useMetrics); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 76e3950..718db48 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -115,7 +117,20 @@ public class Kafka010FetcherTest { StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + sourceContext, + topics, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + mock(MetricGroup.class), + schema, + new Properties(), + 0L, + false); // ----- run the fetcher ----- @@ -235,7 +250,21 @@ public class Kafka010FetcherTest { StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + sourceContext, + topics, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + mock(MetricGroup.class), + schema, + new Properties(), + 0L, + false); + // ----- run the fetcher ----- http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 7d12cde..c30a4dd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -113,6 +114,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + + @Override public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml index 888208e..f17f9ae 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -119,7 +119,15 @@ under the License. </dependency> <!-- test dependencies --> - + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 5861058..fbcb19c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -98,7 +98,15 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { long autoCommitInterval, boolean useMetrics) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics); + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + useMetrics); this.deserializer = checkNotNull(deserializer); this.kafkaConfig = checkNotNull(kafkaProperties); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 8602ffe..7efa94e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -18,8 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; @@ -80,12 +81,14 @@ public class KafkaProducerTest extends TestLogger { FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>( "mock_topic", new SimpleStringSchema(), new Properties(), null); - producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerPropagating.open(new Configuration()); - + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating)); + + testHarness.open(); + try { - producerPropagating.invoke("value"); - producerPropagating.invoke("value"); + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); fail("This should fail with an exception"); } catch (Exception e) { @@ -94,17 +97,22 @@ public class KafkaProducerTest extends TestLogger { assertTrue(e.getCause().getMessage().contains("Test error")); } + testHarness.close(); + // (2) producer that only logs errors FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>( "mock_topic", new SimpleStringSchema(), new Properties(), null); producerLogging.setLogFailuresOnly(true); - - producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerLogging.open(new Configuration()); - producerLogging.invoke("value"); - producerLogging.invoke("value"); + testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + + testHarness.close(); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 567d22d..6235449 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -34,6 +34,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; @@ -105,6 +106,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public <T> StreamSink<T> getProducerSink( + String topic, + KeyedSerializationSchema<T> serSchema, + Properties props, + KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>( + topic, + serSchema, + props, + partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + @Override public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml index bfcde82..f638c7a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml @@ -83,6 +83,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-base_2.10</artifactId> <version>${project.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index a97476a..29bb8e4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -177,10 +177,21 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); - return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions, - watermarksPeriodic, watermarksPunctuated, - runtimeContext, deserializer, - properties, pollTimeout, useMetrics); + return new Kafka09Fetcher<>( + sourceContext, + thisSubtaskPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + runtimeContext.isCheckpointingEnabled(), + runtimeContext.getTaskNameWithSubtasks(), + runtimeContext.getMetricGroup(), + deserializer, + properties, + pollTimeout, + useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index af3b199..a8c0397 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -18,17 +18,16 @@ package org.apache.flink.streaming.connectors.kafka.internal; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; @@ -67,9 +66,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem /** The schema to convert between Kafka's byte messages, and Flink's objects */ private final KeyedDeserializationSchema<T> deserializer; - /** The subtask's runtime context */ - private final RuntimeContext runtimeContext; - /** The configuration for the Kafka consumer */ private final Properties kafkaProperties; @@ -94,6 +90,12 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem /** Flag tracking whether the latest commit request has completed */ private volatile boolean commitInProgress; + /** For Debug output **/ + private String taskNameWithSubtasks; + + /** We get this from the outside to publish metrics. **/ + private MetricGroup metricGroup; + // ------------------------------------------------------------------------ public Kafka09Fetcher( @@ -101,24 +103,38 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + boolean enableCheckpointing, + String taskNameWithSubtasks, + MetricGroup metricGroup, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, boolean useMetrics) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics); + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + useMetrics); this.deserializer = deserializer; - this.runtimeContext = runtimeContext; this.kafkaProperties = kafkaProperties; this.pollTimeout = pollTimeout; this.nextOffsetsToCommit = new AtomicReference<>(); this.offsetCommitCallback = new CommitCallback(); + this.taskNameWithSubtasks = taskNameWithSubtasks; + this.metricGroup = metricGroup; // if checkpointing is enabled, we are not automatically committing to Kafka. kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - Boolean.toString(!runtimeContext.isCheckpointingEnabled())); + Boolean.toString(!enableCheckpointing)); } // ------------------------------------------------------------------------ @@ -131,7 +147,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem // rather than running the main fetch loop directly here, we spawn a dedicated thread // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code - Thread runner = new Thread(this, getFetcherName() + " for " + runtimeContext.getTaskNameWithSubtasks()); + Thread runner = new Thread(this, getFetcherName() + " for " + taskNameWithSubtasks); runner.setDaemon(true); runner.start(); @@ -187,7 +203,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem if (useMetrics) { - final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer"); + final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); addOffsetStateGauge(kafkaMetricGroup); // register Kafka metrics to Flink Map<MetricName, ? extends Metric> metrics = consumer.metrics(); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index c5cf0cc..1162599 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -20,10 +20,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -112,10 +113,22 @@ public class Kafka09FetcherTest { SourceContext<String> sourceContext = mock(SourceContext.class); List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( - sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + mock(MetricGroup.class), + schema, + new Properties(), + 0L, + false); // ----- run the fetcher ----- @@ -236,10 +249,23 @@ public class Kafka09FetcherTest { SourceContext<String> sourceContext = mock(SourceContext.class); List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( - sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + mock(MetricGroup.class), + schema, + new Properties(), + 0L, + false); + // ----- run the fetcher ----- http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index b80a231..31691d5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -18,8 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; @@ -85,12 +86,14 @@ public class KafkaProducerTest extends TestLogger { FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>( "mock_topic", new SimpleStringSchema(), new Properties(), null); - producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerPropagating.open(new Configuration()); - + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating)); + + testHarness.open(); + try { - producerPropagating.invoke("value"); - producerPropagating.invoke("value"); + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); fail("This should fail with an exception"); } catch (Exception e) { @@ -104,12 +107,15 @@ public class KafkaProducerTest extends TestLogger { FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>( "mock_topic", new SimpleStringSchema(), new Properties(), null); producerLogging.setLogFailuresOnly(true); - - producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerLogging.open(new Configuration()); - producerLogging.invoke("value"); - producerLogging.invoke("value"); + testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + + testHarness.close(); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 223dacb..1802e0c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -31,6 +31,7 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -99,6 +100,17 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public <T> StreamSink<T> getProducerSink( + String topic, + KeyedSerializationSchema<T> serSchema, + Properties props, + KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + @Override public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 065b54f..321991a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -23,7 +23,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -81,7 +80,9 @@ public abstract class AbstractFetcher<T, KPH> { List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); @@ -110,7 +111,7 @@ public abstract class AbstractFetcher<T, KPH> { assignedPartitions, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, - runtimeContext.getUserCodeClassLoader()); + userCodeClassLoader); // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @@ -118,7 +119,7 @@ public abstract class AbstractFetcher<T, KPH> { (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; PeriodicWatermarkEmitter periodicEmitter = - new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval()); + new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval); periodicEmitter.start(); } } @@ -495,9 +496,7 @@ public abstract class AbstractFetcher<T, KPH> { @Override public void trigger(long timestamp) throws Exception { - // sanity check - assert Thread.holdsLock(emitter.getCheckpointLock()); - + long minAcrossAll = Long.MAX_VALUE; for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java index 5e9bacc..6d92f9b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java @@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -69,16 +71,21 @@ public class AtLeastOnceProducerTest { private void runTest(boolean flushOnCheckpoint) throws Throwable { Properties props = new Properties(); final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); + final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, snapshottingFinished); + producer.setFlushOnCheckpoint(flushOnCheckpoint); - producer.setRuntimeContext(new MockRuntimeContext(0, 1)); - producer.open(new Configuration()); + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); for (int i = 0; i < 100; i++) { - producer.invoke("msg-" + i); + testHarness.processElement(new StreamRecord<>("msg-" + i)); } + // start a thread confirming all pending records final Tuple1<Throwable> runnableError = new Tuple1<>(null); final Thread threadA = Thread.currentThread(); @@ -113,8 +120,10 @@ public class AtLeastOnceProducerTest { }; Thread threadB = new Thread(confirmer); threadB.start(); + // this should block: - producer.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0)); + testHarness.snapshot(0, 0); + synchronized (threadA) { threadA.notifyAll(); // just in case, to let the test fail faster } @@ -128,14 +137,14 @@ public class AtLeastOnceProducerTest { throw runnableError.f0; } - producer.close(); + testHarness.close(); } private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> { - private static final long serialVersionUID = -1759403646061180067L; + private static final long serialVersionUID = 1L; - private MockProducer prod; + private transient MockProducer prod; private AtomicBoolean snapshottingFinished; public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) { http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 806d342..10c7b86 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -21,6 +21,7 @@ import kafka.server.KafkaServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -81,6 +82,10 @@ public abstract class KafkaTestEnvironment { public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); + public abstract <T> StreamSink<T> getProducerSink(String topic, + KeyedSerializationSchema<T> serSchema, Properties props, + KafkaPartitioner<T> partitioner); + public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner); http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 0782cb9..5801c24 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -18,16 +18,12 @@ package org.apache.flink.streaming.connectors.kafka.internals; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; -import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; @@ -54,10 +50,15 @@ public class AbstractFetcherTimestampsTest { TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + TestFetcher<Long> fetcher = new TestFetcher<>( - sourceContext, originalPartitions, null, + sourceContext, + originalPartitions, + null, /* periodic watermark assigner */ new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), - new MockRuntimeContext(17, 3)); + processingTimeProvider, + 0); final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; @@ -115,10 +116,6 @@ public class AbstractFetcherTimestampsTest { @Test public void testPeriodicWatermarks() throws Exception { - - ExecutionConfig config = new ExecutionConfig(); - config.setAutoWatermarkInterval(10); - final String testTopic = "test topic name"; List<KafkaTopicPartition> originalPartitions = Arrays.asList( new KafkaTopicPartition(testTopic, 7), @@ -127,70 +124,71 @@ public class AbstractFetcherTimestampsTest { TestSourceContext<Long> sourceContext = new TestSourceContext<>(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock()); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - try { - TestFetcher<Long> fetcher = new TestFetcher<>( - sourceContext, originalPartitions, - new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), - null, new MockRuntimeContext(17, 3, config, timerService)); - - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); - fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); - fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); - fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); - fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); - fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - - // this blocks until the periodic thread emitted the watermark - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); - fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); - fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); - - // this blocks until the periodic thread emitted the watermark - long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); - assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); - } - finally { - timerService.shutdownService(); - } + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), + null, /* punctuated watermarks assigner*/ + processingTimeService, + 10); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(10); + + // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(20); + + // this blocks until the periodic thread emitted the watermark + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE); + fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE); + fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE); + + processingTimeService.setCurrentTime(30); + // this blocks until the periodic thread emitted the watermark + long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); + assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); } // ------------------------------------------------------------------------ @@ -204,9 +202,10 @@ public class AbstractFetcherTimestampsTest { List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, false); + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index ba75212..9e8e1d9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.testutils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -36,6 +37,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; @@ -44,6 +46,8 @@ import java.util.Collection; import java.util.Properties; import java.util.Random; +import static org.mockito.Mockito.mock; + @SuppressWarnings("serial") public class DataGenerators { @@ -145,12 +149,17 @@ public class DataGenerators { producerProperties.setProperty("retries", "3"); StreamTransformation<String> mockTransform = new MockStreamTransformation(); DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform); - DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), - producerProperties, new FixedPartitioner<String>()); - StreamSink<String> producerOperator = sink.getTransformation().getOperator(); - producer = (RichFunction) producerOperator.getUserFunction(); - producer.setRuntimeContext(new MockRuntimeContext(1,0)); - producer.open(new Configuration()); + + StreamSink<String> sink = server.getProducerSink( + topic, + new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), + producerProperties, + new FixedPartitioner<String>()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(sink); + + testHarness.open(); final StringBuilder bld = new StringBuilder(); final Random rnd = new Random(); @@ -164,7 +173,7 @@ public class DataGenerators { } String next = bld.toString(); - producerOperator.processElement(new StreamRecord<>(next)); + testHarness.processElement(new StreamRecord<>(next)); } } catch (Throwable t) { @@ -215,4 +224,4 @@ public class DataGenerators { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java deleted file mode 100644 index f16eacd..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.DoubleCounter; -import org.apache.flink.api.common.accumulators.Histogram; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.functions.BroadcastVariableInitializer; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -@SuppressWarnings("deprecation") -public class MockRuntimeContext extends StreamingRuntimeContext { - - private final int numberOfParallelSubtasks; - private final int indexOfThisSubtask; - - private final ExecutionConfig execConfig; - - private final ProcessingTimeService timeServiceProvider; - - public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { - this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig()); - } - - public MockRuntimeContext( - int numberOfParallelSubtasks, - int indexOfThisSubtask, - ExecutionConfig execConfig) { - this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, null); - } - - public MockRuntimeContext( - int numberOfParallelSubtasks, - int indexOfThisSubtask, - ExecutionConfig execConfig, - ProcessingTimeService timeServiceProvider) { - - super(new MockStreamOperator(), - new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), - Collections.<String, Accumulator<?, ?>>emptyMap()); - - this.numberOfParallelSubtasks = numberOfParallelSubtasks; - this.indexOfThisSubtask = indexOfThisSubtask; - this.execConfig = execConfig; - this.timeServiceProvider = timeServiceProvider; - } - - @Override - public boolean isCheckpointingEnabled() { - return true; - } - - @Override - public String getTaskName() { - return "mock task"; - } - - @Override - public int getNumberOfParallelSubtasks() { - return numberOfParallelSubtasks; - } - - @Override - public int getIndexOfThisSubtask() { - return indexOfThisSubtask; - } - - @Override - public int getAttemptNumber() { - return 0; - } - - @Override - public ExecutionConfig getExecutionConfig() { - return execConfig; - } - - @Override - public ClassLoader getUserCodeClassLoader() { - return getClass().getClassLoader(); - } - - @Override - public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { - // noop - } - - @Override - public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public Map<String, Accumulator<?, ?>> getAllAccumulators() { - throw new UnsupportedOperationException(); - } - - @Override - public IntCounter getIntCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public LongCounter getLongCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public DoubleCounter getDoubleCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public Histogram getHistogram(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public MetricGroup getMetricGroup() { - return new UnregisteredTaskMetricsGroup.DummyIOMetricGroup(); - } - - @Override - public <RT> List<RT> getBroadcastVariable(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) { - throw new UnsupportedOperationException(); - } - - @Override - public DistributedCache getDistributedCache() { - throw new UnsupportedOperationException(); - } - - @Override - public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException(); - } - - @Override - public ProcessingTimeService getProcessingTimeService() { - if (timeServiceProvider == null) { - throw new UnsupportedOperationException(); - } else { - return timeServiceProvider; - } - } - - // ------------------------------------------------------------------------ - - private static class MockStreamOperator extends AbstractStreamOperator<Integer> { - private static final long serialVersionUID = -1153976702711944427L; - - @Override - public ExecutionConfig getExecutionConfig() { - return new ExecutionConfig(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 8041a7c..5b277bf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -47,14 +47,16 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.InstantiationUtil; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.util.Collection; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -90,6 +92,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { // use this as default for tests AbstractStateBackend stateBackend = new MemoryStateBackend(); + private final Object checkpointLock; + /** * Whether setup() was called on the operator. This is reset when calling close(). */ @@ -113,13 +117,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { this.executionConfig = executionConfig; this.closableRegistry = new ClosableRegistry(); + this.checkpointLock = new Object(); + final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0); mockTask = mock(StreamTask.class); processingTimeService = new TestProcessingTimeService(); processingTimeService.setCurrentTime(0); when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(new Object()); + when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(env); @@ -330,7 +336,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } public void setProcessingTime(long time) throws Exception { - processingTimeService.setCurrentTime(time); + synchronized (checkpointLock) { + processingTimeService.setCurrentTime(time); + } } public void processWatermark(Watermark mark) throws Exception {
