Repository: flink
Updated Branches:
  refs/heads/master 71d2e3ef1 -> 770f2f83a


[FLINK-4877] Use OperatorTestHarness and TestProcessingTimeService in Kafka 
Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0859a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0859a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0859a698

Branch: refs/heads/master
Commit: 0859a698253f07a28442ee7232e1adb76013dbd3
Parents: 3055475
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Sep 28 15:10:35 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   8 +
 .../connectors/kafka/FlinkKafkaConsumer010.java |  19 +-
 .../kafka/internal/Kafka010Fetcher.java         |  26 ++-
 .../connectors/kafka/Kafka010FetcherTest.java   |  33 ++-
 .../kafka/KafkaTestEnvironmentImpl.java         |   9 +
 .../flink-connector-kafka-0.8/pom.xml           |  10 +-
 .../kafka/internals/Kafka08Fetcher.java         |  10 +-
 .../connectors/kafka/KafkaProducerTest.java     |  32 +--
 .../kafka/KafkaTestEnvironmentImpl.java         |  16 ++
 .../flink-connector-kafka-0.9/pom.xml           |   8 +
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  19 +-
 .../kafka/internal/Kafka09Fetcher.java          |  38 +++-
 .../connectors/kafka/Kafka09FetcherTest.java    |  38 +++-
 .../connectors/kafka/KafkaProducerTest.java     |  30 +--
 .../kafka/KafkaTestEnvironmentImpl.java         |  12 ++
 .../kafka/internals/AbstractFetcher.java        |  13 +-
 .../kafka/AtLeastOnceProducerTest.java          |  25 ++-
 .../connectors/kafka/KafkaTestEnvironment.java  |   5 +
 .../AbstractFetcherTimestampsTest.java          | 151 +++++++-------
 .../kafka/testutils/DataGenerators.java         |  25 ++-
 .../kafka/testutils/MockRuntimeContext.java     | 209 -------------------
 .../util/OneInputStreamOperatorTestHarness.java |  14 +-
 22 files changed, 383 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 0b426b5..8108afc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -72,6 +72,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
                        <version>${project.version}</version>
                        <exclusions>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 267ff57..a9ce336 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -134,9 +134,20 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumer09<T> {
 
                boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-               return new Kafka010Fetcher<>(sourceContext, 
thisSubtaskPartitions,
-                               watermarksPeriodic, watermarksPunctuated,
-                               runtimeContext, deserializer,
-                               properties, pollTimeout, useMetrics);
+               return new Kafka010Fetcher<>(
+                               sourceContext,
+                               thisSubtaskPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               runtimeContext.getProcessingTimeService(),
+                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                               runtimeContext.getUserCodeClassLoader(),
+                               runtimeContext.isCheckpointingEnabled(),
+                               runtimeContext.getTaskNameWithSubtasks(),
+                               runtimeContext.getMetricGroup(),
+                               deserializer,
+                               properties,
+                               pollTimeout,
+                               useMetrics);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 47bee22..4a1f5f6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
@@ -46,13 +47,32 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
                        List<KafkaTopicPartition> assignedPartitions,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       boolean enableCheckpointing,
+                       String taskNameWithSubtasks,
+                       MetricGroup metricGroup,
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long pollTimeout,
                        boolean useMetrics) throws Exception
        {
-               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, 
pollTimeout, useMetrics);
+               super(
+                               sourceContext,
+                               assignedPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               processingTimeProvider,
+                               autoWatermarkInterval,
+                               userCodeClassLoader,
+                               enableCheckpointing,
+                               taskNameWithSubtasks,
+                               metricGroup,
+                               deserializer,
+                               kafkaProperties,
+                               pollTimeout,
+                               useMetrics);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 76e3950..718db48 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -115,7 +117,20 @@ public class Kafka010FetcherTest {
         StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext, topics, null, null, context, schema, new 
Properties(), 0L, false);
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                mock(MetricGroup.class),
+                schema,
+                new Properties(),
+                0L,
+                false);
 
         // ----- run the fetcher -----
 
@@ -235,7 +250,21 @@ public class Kafka010FetcherTest {
         StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext, topics, null, null, context, schema, new 
Properties(), 0L, false);
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                mock(MetricGroup.class),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
 
         // ----- run the fetcher -----
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 7d12cde..c30a4dd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -113,6 +114,14 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+               FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return new StreamSink<>(prod);
+       }
+
+
+       @Override
        public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index 888208e..f17f9ae 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -119,7 +119,15 @@ under the License.
                </dependency>
 
                <!-- test dependencies -->
-               
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-test</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 5861058..fbcb19c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -98,7 +98,15 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                        long autoCommitInterval,
                        boolean useMetrics) throws Exception
        {
-               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext, useMetrics);
+               super(
+                               sourceContext,
+                               assignedPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               runtimeContext.getProcessingTimeService(),
+                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                               runtimeContext.getUserCodeClassLoader(),
+                               useMetrics);
 
                this.deserializer = checkNotNull(deserializer);
                this.kafkaConfig = checkNotNull(kafkaProperties);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 8602ffe..7efa94e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -80,12 +81,14 @@ public class KafkaProducerTest extends TestLogger {
                        FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
                                        "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
 
-                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerPropagating.open(new Configuration());
-                       
+                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
+
+                       testHarness.open();
+
                        try {
-                               producerPropagating.invoke("value");
-                               producerPropagating.invoke("value");
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               testHarness.processElement(new 
StreamRecord<>("value"));
                                fail("This should fail with an exception");
                        }
                        catch (Exception e) {
@@ -94,17 +97,22 @@ public class KafkaProducerTest extends TestLogger {
                                
assertTrue(e.getCause().getMessage().contains("Test error"));
                        }
 
+                       testHarness.close();
+
                        // (2) producer that only logs errors
 
                        FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
                                        "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
                        producerLogging.setLogFailuresOnly(true);
-                       
-                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerLogging.open(new Configuration());
 
-                       producerLogging.invoke("value");
-                       producerLogging.invoke("value");
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>("value"));
+                       testHarness.processElement(new StreamRecord<>("value"));
+
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 567d22d..6235449 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -34,6 +34,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
@@ -105,6 +106,21 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public <T> StreamSink<T> getProducerSink(
+                       String topic,
+                       KeyedSerializationSchema<T> serSchema,
+                       Properties props,
+                       KafkaPartitioner<T> partitioner) {
+               FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
+                               topic,
+                               serSchema,
+                               props,
+                               partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return new StreamSink<>(prod);
+       }
+
+       @Override
        public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer08<T> prod = new 
FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index bfcde82..f638c7a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -83,6 +83,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-base_2.10</artifactId>
                        <version>${project.version}</version>
                        <exclusions>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index a97476a..29bb8e4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -177,10 +177,21 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
 
                boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-               return new Kafka09Fetcher<>(sourceContext, 
thisSubtaskPartitions,
-                               watermarksPeriodic, watermarksPunctuated,
-                               runtimeContext, deserializer,
-                               properties, pollTimeout, useMetrics);
+               return new Kafka09Fetcher<>(
+                               sourceContext,
+                               thisSubtaskPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               runtimeContext.getProcessingTimeService(),
+                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                               runtimeContext.getUserCodeClassLoader(),
+                               runtimeContext.isCheckpointingEnabled(),
+                               runtimeContext.getTaskNameWithSubtasks(),
+                               runtimeContext.getMetricGroup(),
+                               deserializer,
+                               properties,
+                               pollTimeout,
+                               useMetrics);
                
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index af3b199..a8c0397 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -18,17 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
@@ -67,9 +66,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
        private final KeyedDeserializationSchema<T> deserializer;
 
-       /** The subtask's runtime context */
-       private final RuntimeContext runtimeContext;
-
        /** The configuration for the Kafka consumer */
        private final Properties kafkaProperties;
 
@@ -94,6 +90,12 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        /** Flag tracking whether the latest commit request has completed */
        private volatile boolean commitInProgress;
 
+       /** For Debug output **/
+       private String taskNameWithSubtasks;
+
+       /** We get this from the outside to publish metrics. **/
+       private MetricGroup metricGroup;
+
        // 
------------------------------------------------------------------------
 
        public Kafka09Fetcher(
@@ -101,24 +103,38 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                        List<KafkaTopicPartition> assignedPartitions,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       boolean enableCheckpointing,
+                       String taskNameWithSubtasks,
+                       MetricGroup metricGroup,
                        KeyedDeserializationSchema<T> deserializer,
                        Properties kafkaProperties,
                        long pollTimeout,
                        boolean useMetrics) throws Exception
        {
-               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext, useMetrics);
+               super(
+                               sourceContext,
+                               assignedPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               processingTimeProvider,
+                               autoWatermarkInterval,
+                               userCodeClassLoader,
+                               useMetrics);
 
                this.deserializer = deserializer;
-               this.runtimeContext = runtimeContext;
                this.kafkaProperties = kafkaProperties;
                this.pollTimeout = pollTimeout;
                this.nextOffsetsToCommit = new AtomicReference<>();
                this.offsetCommitCallback = new CommitCallback();
+               this.taskNameWithSubtasks = taskNameWithSubtasks;
+               this.metricGroup = metricGroup;
 
                // if checkpointing is enabled, we are not automatically 
committing to Kafka.
                
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-                               
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+                               Boolean.toString(!enableCheckpointing));
        }
 
        // 
------------------------------------------------------------------------
@@ -131,7 +147,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
                // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
                // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
-               Thread runner = new Thread(this, getFetcherName() + " for " + 
runtimeContext.getTaskNameWithSubtasks());
+               Thread runner = new Thread(this, getFetcherName() + " for " + 
taskNameWithSubtasks);
                runner.setDaemon(true);
                runner.start();
 
@@ -187,7 +203,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
 
 
                        if (useMetrics) {
-                               final MetricGroup kafkaMetricGroup = 
runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+                               final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
                                addOffsetStateGauge(kafkaMetricGroup);
                                // register Kafka metrics to Flink
                                Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index c5cf0cc..1162599 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -112,10 +113,22 @@ public class Kafka09FetcherTest {
                SourceContext<String> sourceContext = mock(SourceContext.class);
                List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
-               
+
                final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-                               sourceContext, topics, null, null, context, 
schema, new Properties(), 0L, false);
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               mock(MetricGroup.class),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
 
                // ----- run the fetcher -----
 
@@ -236,10 +249,23 @@ public class Kafka09FetcherTest {
                SourceContext<String> sourceContext = mock(SourceContext.class);
                List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
                KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
 
                final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-                               sourceContext, topics, null, null, context, 
schema, new Properties(), 0L, false);
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               mock(MetricGroup.class),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
+
 
                // ----- run the fetcher -----
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index b80a231..31691d5 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -85,12 +86,14 @@ public class KafkaProducerTest extends TestLogger {
                        FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
                                        "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
 
-                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerPropagating.open(new Configuration());
-                       
+                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
+
+                       testHarness.open();
+
                        try {
-                               producerPropagating.invoke("value");
-                               producerPropagating.invoke("value");
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               testHarness.processElement(new 
StreamRecord<>("value"));
                                fail("This should fail with an exception");
                        }
                        catch (Exception e) {
@@ -104,12 +107,15 @@ public class KafkaProducerTest extends TestLogger {
                        FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
                                        "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
                        producerLogging.setLogFailuresOnly(true);
-                       
-                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
-                       producerLogging.open(new Configuration());
 
-                       producerLogging.invoke("value");
-                       producerLogging.invoke("value");
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>("value"));
+                       testHarness.processElement(new StreamRecord<>("value"));
+
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 223dacb..1802e0c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,6 +31,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -99,6 +100,17 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public <T> StreamSink<T> getProducerSink(
+                       String topic,
+                       KeyedSerializationSchema<T> serSchema,
+                       Properties props,
+                       KafkaPartitioner<T> partitioner) {
+               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return new StreamSink<>(prod);
+       }
+
+       @Override
        public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 065b54f..321991a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -81,7 +80,9 @@ public abstract class AbstractFetcher<T, KPH> {
                        List<KafkaTopicPartition> assignedPartitions,
                        SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
                        SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
                        boolean useMetrics) throws Exception
        {
                this.sourceContext = checkNotNull(sourceContext);
@@ -110,7 +111,7 @@ public abstract class AbstractFetcher<T, KPH> {
                                assignedPartitions,
                                timestampWatermarkMode,
                                watermarksPeriodic, watermarksPunctuated,
-                               runtimeContext.getUserCodeClassLoader());
+                               userCodeClassLoader);
                
                // if we have periodic watermarks, kick off the interval 
scheduler
                if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@@ -118,7 +119,7 @@ public abstract class AbstractFetcher<T, KPH> {
                                        
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
                        
                        PeriodicWatermarkEmitter periodicEmitter = 
-                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext.getProcessingTimeService(), 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
+                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, processingTimeProvider, autoWatermarkInterval);
                        periodicEmitter.start();
                }
        }
@@ -495,9 +496,7 @@ public abstract class AbstractFetcher<T, KPH> {
                
                @Override
                public void trigger(long timestamp) throws Exception {
-                       // sanity check
-                       assert Thread.holdsLock(emitter.getCheckpointLock());
-                       
+
                        long minAcrossAll = Long.MAX_VALUE;
                        for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?> state : allPartitions) {
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index 5e9bacc..6d92f9b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -69,16 +71,21 @@ public class AtLeastOnceProducerTest {
        private void runTest(boolean flushOnCheckpoint) throws Throwable {
                Properties props = new Properties();
                final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
+
                final TestingKafkaProducer<String> producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
                                snapshottingFinished);
+
                producer.setFlushOnCheckpoint(flushOnCheckpoint);
-               producer.setRuntimeContext(new MockRuntimeContext(0, 1));
 
-               producer.open(new Configuration());
+               OneInputStreamOperatorTestHarness<String, Object> testHarness =
+                               new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
+
+               testHarness.open();
 
                for (int i = 0; i < 100; i++) {
-                       producer.invoke("msg-" + i);
+                       testHarness.processElement(new StreamRecord<>("msg-" + 
i));
                }
+
                // start a thread confirming all pending records
                final Tuple1<Throwable> runnableError = new Tuple1<>(null);
                final Thread threadA = Thread.currentThread();
@@ -113,8 +120,10 @@ public class AtLeastOnceProducerTest {
                };
                Thread threadB = new Thread(confirmer);
                threadB.start();
+
                // this should block:
-               producer.snapshotState(new 
StateSnapshotContextSynchronousImpl(0, 0));
+               testHarness.snapshot(0, 0);
+
                synchronized (threadA) {
                        threadA.notifyAll(); // just in case, to let the test 
fail faster
                }
@@ -128,14 +137,14 @@ public class AtLeastOnceProducerTest {
                        throw runnableError.f0;
                }
 
-               producer.close();
+               testHarness.close();
        }
 
 
        private static class TestingKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
-               private static final long serialVersionUID = 
-1759403646061180067L;
+               private static final long serialVersionUID = 1L;
 
-               private MockProducer prod;
+               private transient MockProducer prod;
                private AtomicBoolean snapshottingFinished;
 
                public TestingKafkaProducer(String defaultTopicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
AtomicBoolean snapshottingFinished) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 806d342..10c7b86 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -21,6 +21,7 @@ import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -81,6 +82,10 @@ public abstract class KafkaTestEnvironment {
 
        public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> 
topics, KeyedDeserializationSchema<T> readSchema, Properties props);
 
+       public abstract <T> StreamSink<T> getProducerSink(String topic,
+                       KeyedSerializationSchema<T> serSchema, Properties props,
+                       KafkaPartitioner<T> partitioner);
+
        public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> 
stream, String topic,
                                                                                
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
                                                                                
                                KafkaPartitioner<T> partitioner);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 0782cb9..5801c24 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -18,16 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
@@ -54,10 +50,15 @@ public class AbstractFetcherTimestampsTest {
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
+               TestProcessingTimeService processingTimeProvider = new 
TestProcessingTimeService();
+
                TestFetcher<Long> fetcher = new TestFetcher<>(
-                               sourceContext, originalPartitions, null,
+                               sourceContext,
+                               originalPartitions,
+                               null, /* periodic watermark assigner */
                                new 
SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new 
PunctuatedTestExtractor()),
-                               new MockRuntimeContext(17, 3));
+                               processingTimeProvider,
+                               0);
 
                final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
                final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
@@ -115,10 +116,6 @@ public class AbstractFetcherTimestampsTest {
        
        @Test
        public void testPeriodicWatermarks() throws Exception {
-
-               ExecutionConfig config = new ExecutionConfig();
-               config.setAutoWatermarkInterval(10);
-
                final String testTopic = "test topic name";
                List<KafkaTopicPartition> originalPartitions = Arrays.asList(
                                new KafkaTopicPartition(testTopic, 7),
@@ -127,70 +124,71 @@ public class AbstractFetcherTimestampsTest {
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(errorRef), 
sourceContext.getCheckpointLock());
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
 
-               try {
-                       TestFetcher<Long> fetcher = new TestFetcher<>(
-                                       sourceContext, originalPartitions,
-                                       new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
-                                       null, new MockRuntimeContext(17, 3, 
config, timerService));
-       
-                       final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
-                       final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
-                       final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
-       
-                       // elements generate a watermark if the timestamp is a 
multiple of three
-       
-                       // elements for partition 1
-                       fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-                       fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-                       fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
-                       assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
-                       assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
-       
-                       // elements for partition 2
-                       fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
-                       assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
-                       assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
-       
-                       // elements for partition 3
-                       fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-                       fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
-                       assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
-                       assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
-       
-                       // now, we should have a watermark (this blocks until 
the periodic thread emitted the watermark)
-                       assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
-       
-                       // advance partition 3
-                       fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-                       fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-                       fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
-                       assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
-                       assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
-       
-                       // advance partition 1 beyond partition 2 - this bumps 
the watermark
-                       fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
-                       assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
-                       assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
-                       
-                       // this blocks until the periodic thread emitted the 
watermark
-                       assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
-       
-                       // advance partition 2 again - this bumps the watermark
-                       fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
-                       fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
-                       fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
-       
-                       // this blocks until the periodic thread emitted the 
watermark
-                       long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
-                       assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
-               }
-               finally {
-                       timerService.shutdownService();
-               }
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                               sourceContext,
+                               originalPartitions,
+                               new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
+                               null, /* punctuated watermarks assigner*/
+                               processingTimeService,
+                               10);
+
+               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+
+               // elements generate a watermark if the timestamp is a multiple 
of three
+
+               // elements for partition 1
+               fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 2
+               fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // elements for partition 3
+               fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
+               fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+
+               processingTimeService.setCurrentTime(10);
+
+               // now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
+               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 3
+               fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
+               fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+
+               // advance partition 1 beyond partition 2 - this bumps the 
watermark
+               fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+
+               processingTimeService.setCurrentTime(20);
+
+               // this blocks until the periodic thread emitted the watermark
+               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+
+               // advance partition 2 again - this bumps the watermark
+               fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+               fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+               fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+
+               processingTimeService.setCurrentTime(30);
+               // this blocks until the periodic thread emitted the watermark
+               long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
+               assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
        }
 
        // 
------------------------------------------------------------------------
@@ -204,9 +202,10 @@ public class AbstractFetcherTimestampsTest {
                                List<KafkaTopicPartition> assignedPartitions,
                                
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-                               StreamingRuntimeContext runtimeContext) throws 
Exception
+                               ProcessingTimeService processingTimeProvider,
+                               long autoWatermarkInterval) throws Exception
                {
-                       super(sourceContext, assignedPartitions, 
watermarksPeriodic, watermarksPunctuated, runtimeContext, false);
+                       super(sourceContext, assignedPartitions, 
watermarksPeriodic, watermarksPunctuated, processingTimeProvider, 
autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index ba75212..9e8e1d9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -36,6 +37,7 @@ import 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
@@ -44,6 +46,8 @@ import java.util.Collection;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.mockito.Mockito.mock;
+
 @SuppressWarnings("serial")
 public class DataGenerators {
 
@@ -145,12 +149,17 @@ public class DataGenerators {
                                producerProperties.setProperty("retries", "3");
                                StreamTransformation<String> mockTransform = 
new MockStreamTransformation();
                                DataStream<String> stream = new 
DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
-                               DataStreamSink<String> sink = 
server.produceIntoKafka(stream, topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
-                                               producerProperties, new 
FixedPartitioner<String>());
-                               StreamSink<String> producerOperator = 
sink.getTransformation().getOperator();
-                               producer = (RichFunction) 
producerOperator.getUserFunction();
-                               producer.setRuntimeContext(new 
MockRuntimeContext(1,0));
-                               producer.open(new Configuration());
+
+                               StreamSink<String> sink = 
server.getProducerSink(
+                                               topic,
+                                               new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+                                               producerProperties,
+                                               new FixedPartitioner<String>());
+
+                               OneInputStreamOperatorTestHarness<String, 
Object> testHarness =
+                                               new 
OneInputStreamOperatorTestHarness<>(sink);
+
+                               testHarness.open();
 
                                final StringBuilder bld = new StringBuilder();
                                final Random rnd = new Random();
@@ -164,7 +173,7 @@ public class DataGenerators {
                                        }
 
                                        String next = bld.toString();
-                                       producerOperator.processElement(new 
StreamRecord<>(next));
+                                       testHarness.processElement(new 
StreamRecord<>(next));
                                }
                        }
                        catch (Throwable t) {
@@ -215,4 +224,4 @@ public class DataGenerators {
                        }
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
deleted file mode 100644
index f16eacd..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("deprecation")
-public class MockRuntimeContext extends StreamingRuntimeContext {
-
-       private final int numberOfParallelSubtasks;
-       private final int indexOfThisSubtask;
-
-       private final ExecutionConfig execConfig;
-
-       private final ProcessingTimeService timeServiceProvider;
-       
-       public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
-               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig());
-       }
-
-       public MockRuntimeContext(
-                       int numberOfParallelSubtasks,
-                       int indexOfThisSubtask,
-                       ExecutionConfig execConfig) {
-               this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, 
null);
-       }
-       
-       public MockRuntimeContext(
-                       int numberOfParallelSubtasks,
-                       int indexOfThisSubtask,
-                       ExecutionConfig execConfig,
-                       ProcessingTimeService timeServiceProvider) {
-               
-               super(new MockStreamOperator(),
-                       new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-                       Collections.<String, Accumulator<?, ?>>emptyMap());
-
-               this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-               this.indexOfThisSubtask = indexOfThisSubtask;
-               this.execConfig = execConfig;
-               this.timeServiceProvider = timeServiceProvider;
-       }
-
-       @Override
-       public boolean isCheckpointingEnabled() {
-               return true;
-       }
-
-       @Override
-       public String getTaskName() {
-               return "mock task";
-       }
-
-       @Override
-       public int getNumberOfParallelSubtasks() {
-               return numberOfParallelSubtasks;
-       }
-
-       @Override
-       public int getIndexOfThisSubtask() {
-               return indexOfThisSubtask;
-       }
-
-       @Override
-       public int getAttemptNumber() {
-               return 0;
-       }
-
-       @Override
-       public ExecutionConfig getExecutionConfig() {
-               return execConfig;
-       }
-
-       @Override
-       public ClassLoader getUserCodeClassLoader() {
-               return getClass().getClassLoader();
-       }
-
-       @Override
-       public <V, A extends Serializable> void addAccumulator(String name, 
Accumulator<V, A> accumulator) {
-               // noop
-       }
-
-       @Override
-       public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public IntCounter getIntCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public LongCounter getLongCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public DoubleCounter getDoubleCounter(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Histogram getHistogram(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public MetricGroup getMetricGroup() {
-               return new UnregisteredTaskMetricsGroup.DummyIOMetricGroup();
-       }
-
-       @Override
-       public <RT> List<RT> getBroadcastVariable(String name) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <T, C> C getBroadcastVariableWithInitializer(String name, 
BroadcastVariableInitializer<T, C> initializer) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public DistributedCache getDistributedCache() {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public ProcessingTimeService getProcessingTimeService() {
-               if (timeServiceProvider == null) {
-                       throw new UnsupportedOperationException();
-               } else {
-                       return timeServiceProvider;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class MockStreamOperator extends 
AbstractStreamOperator<Integer> {
-               private static final long serialVersionUID = 
-1153976702711944427L;
-
-               @Override
-               public ExecutionConfig getExecutionConfig() {
-                       return new ExecutionConfig();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8041a7c..5b277bf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -47,14 +47,16 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.InstantiationUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -90,6 +92,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        // use this as default for tests
        AbstractStateBackend stateBackend = new MemoryStateBackend();
 
+       private final Object checkpointLock;
+
        /**
         * Whether setup() was called on the operator. This is reset when 
calling close().
         */
@@ -113,13 +117,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                this.executionConfig = executionConfig;
                this.closableRegistry = new ClosableRegistry();
 
+               this.checkpointLock = new Object();
+
                final Environment env = new MockEnvironment("MockTwoInputTask", 
3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, 
executionConfig, MAX_PARALLELISM, 1, 0);
                mockTask = mock(StreamTask.class);
                processingTimeService = new TestProcessingTimeService();
                processingTimeService.setCurrentTime(0);
 
                when(mockTask.getName()).thenReturn("Mock Task");
-               when(mockTask.getCheckpointLock()).thenReturn(new Object());
+               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
                when(mockTask.getEnvironment()).thenReturn(env);
@@ -330,7 +336,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        public void setProcessingTime(long time) throws Exception {
-               processingTimeService.setCurrentTime(time);
+               synchronized (checkpointLock) {
+                       processingTimeService.setCurrentTime(time);
+               }
        }
 
        public void processWatermark(Watermark mark) throws Exception {

Reply via email to