Repository: flink
Updated Branches:
  refs/heads/master 4b9e34047 -> 235b02cb5


[FLINK-2160] Change Streaming Source Interface to run(Context)/cancel()

The context can be used to emit elements and retrieve the checkpointing
lock object.

In the future, the context can be extended to provide support for
element emission with timestamps and dealing with the watermark system.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235b02cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235b02cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235b02cb

Branch: refs/heads/master
Commit: 235b02cb56d24bb5db3c254fa34661ccc0e1a083
Parents: 4b9e340
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Jun 4 11:29:17 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Jun 5 15:34:20 2015 +0200

----------------------------------------------------------------------
 .../hbase/example/HBaseWriteStreamExample.java  |  2 +-
 .../connectors/kafka/KafkaProducerExample.java  |  7 ++---
 .../connectors/kafka/api/KafkaSource.java       |  5 ++--
 .../api/persistent/PersistentKafkaSource.java   |  9 +++---
 .../streaming/connectors/kafka/KafkaITCase.java | 31 ++++++++++----------
 .../connectors/rabbitmq/RMQSource.java          |  5 ++--
 .../connectors/twitter/TwitterSource.java       |  5 ++--
 .../source/FileMonitoringFunction.java          |  7 ++---
 .../functions/source/FileSourceFunction.java    |  5 ++--
 .../functions/source/FromElementsFunction.java  |  6 ++--
 .../functions/source/FromIteratorFunction.java  |  6 ++--
 .../source/FromSplittableIteratorFunction.java  |  5 ++--
 .../source/SocketTextStreamFunction.java        | 11 ++++---
 .../api/functions/source/SourceFunction.java    | 29 +++++++++++++++---
 .../streaming/api/operators/StreamSource.java   | 16 ++++++++--
 .../api/ChainedRuntimeContextTest.java          |  3 +-
 .../flink/streaming/api/TypeFillTest.java       |  2 +-
 .../api/complex/ComplexIntegrationTest.java     | 15 ++++------
 .../windowing/WindowIntegrationTest.java        |  8 ++---
 .../api/streamtask/StreamVertexTest.java        |  5 ++--
 .../runtime/tasks/SourceStreamTaskTest.java     |  6 ++--
 .../apache/flink/streaming/util/MockSource.java | 16 ++++++++--
 .../examples/iteration/IterateExample.java      |  5 ++--
 .../streaming/examples/join/WindowJoin.java     |  9 +++---
 .../ml/IncrementalLearningSkeleton.java         | 12 ++++----
 .../examples/windowing/SessionWindowing.java    |  6 ++--
 .../windowing/TopSpeedWindowingExample.java     |  6 ++--
 .../api/scala/StreamExecutionEnvironment.scala  |  7 +++--
 .../StreamCheckpointingITCase.java              |  7 +++--
 .../ProcessFailureStreamingRecoveryITCase.java  | 12 ++++----
 30 files changed, 147 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
index 74c6c57..d1a61d3 100644
--- 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -53,7 +53,7 @@ public class HBaseWriteStreamExample {
                        private volatile boolean isRunning = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<String> out) throws Exception {
+                       public void run(SourceContext<String> out) throws 
Exception {
                                while (isRunning) {
                                        
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 4d98b1b..f241d1c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.Collector;
 
 @SuppressWarnings("serial")
 public class KafkaProducerExample {
@@ -44,13 +43,13 @@ public class KafkaProducerExample {
                        private volatile boolean running = true;
                        
                        @Override
-                       public void run(Object checkpointLock, 
Collector<String> collector) throws Exception {
+                       public void run(SourceContext<String> ctx) throws 
Exception {
                                for (int i = 0; i < 20 && running; i++) {
-                                       collector.collect("message #" + i);
+                                       ctx.collect("message #" + i);
                                        Thread.sleep(100L);
                                }
 
-                               collector.collect("q");
+                               ctx.collect("q");
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index a4c56e4..3bcbfa7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -32,7 +32,6 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.ConnectorSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,7 +185,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<OUT> collector) throws 
Exception {
+       public void run(SourceContext<OUT> ctx) throws Exception {
                
                // NOTE: Since this source is not checkpointed, we do not need 
to
                // acquire the checkpoint lock
@@ -196,7 +195,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
                                if (schema.isEndOfStream(out)) {
                                        break;
                                }
-                               collector.collect(out);
+                               ctx.collect(out);
                        }
                } finally {
                        consumer.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 5d77a1a..bda6076 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,10 +168,12 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<OUT> collector) throws 
Exception {
+       public void run(SourceContext<OUT> ctx) throws Exception {
                if (iteratorToRead == null) {
                        throw new IllegalStateException("Kafka iterator not 
initialized properly.");
                }
+
+               final Object checkpointLock = ctx.getCheckpointLock();
                
                while (running && iteratorToRead.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> message = 
iteratorToRead.next();
@@ -190,7 +191,7 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                        // make the state update and the element emission atomic
                        synchronized (checkpointLock) {
                                lastOffsets[message.partition()] = 
message.offset();
-                               collector.collect(next);
+                               ctx.collect(next);
                        }
 
                        if (LOG.isTraceEnabled()) {
@@ -378,4 +379,4 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                        }
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index d695b09..0a12b07 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -399,13 +399,14 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+                       public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
                                LOG.info("Starting source.");
                                int cnt = from;
                                int partition = 
getRuntimeContext().getIndexOfThisSubtask();
                                while (running) {
                                        LOG.info("Writing " + cnt + " to 
partition " + partition);
-                                       collector.collect(new Tuple2<Integer, 
Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
+                                       ctx.collect(new Tuple2<Integer, 
Integer>(getRuntimeContext().getIndexOfThisSubtask(),
+                                                       cnt));
                                        if (cnt == to) {
                                                LOG.info("Writer reached end.");
                                                return;
@@ -492,11 +493,11 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Tuple2<Long, String>> collector) throws Exception {
+                       public void run(SourceContext<Tuple2<Long, String>> 
ctx) throws Exception {
                                LOG.info("Starting source.");
                                int cnt = 0;
                                while (running) {
-                                       collector.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
+                                       ctx.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
                                        try {
                                                Thread.sleep(100);
                                        } catch (InterruptedException ignored) {
@@ -576,11 +577,11 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Tuple2<Long, String>> collector) throws Exception {
+                       public void run(SourceContext<Tuple2<Long, String>> 
ctx) throws Exception {
                                LOG.info("Starting source.");
                                int cnt = 0;
                                while (running) {
-                                       collector.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
+                                       ctx.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
                                        LOG.info("Produced " + cnt);
 
                                        try {
@@ -668,14 +669,14 @@ public class KafkaITCase {
                        }
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Tuple2<Long, byte[]>> collector) throws Exception {
+                       public void run(SourceContext<Tuple2<Long, byte[]>> 
ctx) throws Exception {
                                LOG.info("Starting source.");
                                long cnt = 0;
                                Random rnd = new Random(1337);
                                while (running) {
                                        //
                                        byte[] wl = new 
byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
-                                       collector.collect(new Tuple2<Long, 
byte[]>(cnt++, wl));
+                                       ctx.collect(new Tuple2<Long, 
byte[]>(cnt++, wl));
                                        LOG.info("Emitted cnt=" + (cnt - 1) + " 
with byte.length = " + wl.length);
 
                                        try {
@@ -685,7 +686,7 @@ public class KafkaITCase {
                                        if(cnt == 10) {
                                                LOG.info("Send end signal");
                                                // signal end
-                                               collector.collect(new 
Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+                                               ctx.collect(new Tuple2<Long, 
byte[]>(-1L, new byte[]{1}));
                                                running = false;
                                        }
                                }
@@ -776,11 +777,11 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Tuple2<Long, String>> collector) throws Exception {
+                       public void run(SourceContext<Tuple2<Long, String>> 
ctx) throws Exception {
                                LOG.info("Starting source.");
                                int cnt = 0;
                                while (running) {
-                                       collector.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
+                                       ctx.collect(new Tuple2<Long, 
String>(1000L + cnt, "kafka-" + cnt++));
                                        try {
                                                Thread.sleep(100);
                                        } catch (InterruptedException ignored) {
@@ -869,11 +870,11 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<String> collector) throws Exception {
+                       public void run(SourceContext<String> ctx) throws 
Exception {
                                LOG.info("Starting source.");
                                int cnt = 0;
                                while (running) {
-                                       collector.collect("kafka-" + cnt++);
+                                       ctx.collect("kafka-" + cnt++);
                                        try {
                                                Thread.sleep(100);
                                        } catch (InterruptedException ignored) {
@@ -911,12 +912,12 @@ public class KafkaITCase {
                        boolean running = true;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<String> collector) throws Exception {
+                       public void run(SourceContext<String> ctx) throws 
Exception {
                                LOG.info("Starting source.");
                                int cnt = 0;
                                while (running) {
                                        String msg = "kafka-" + cnt++;
-                                       collector.collect(msg);
+                                       ctx.collect(msg);
                                        LOG.info("sending message = "+msg);
 
                                        if ((cnt - 1) % 20 == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 93812c2..b18b8d8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -27,7 +27,6 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
-import org.apache.flink.util.Collector;
 
 public class RMQSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
@@ -86,7 +85,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<OUT> out) throws 
Exception {
+       public void run(SourceContext<OUT> ctx) throws Exception {
                while (running) {
                        delivery = consumer.nextDelivery();
 
@@ -95,7 +94,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
                                break;
                        }
 
-                       out.collect(result);
+                       ctx.collect(result);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index a1a6d9b..bad0f8c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,7 +208,7 @@ public class TwitterSource extends 
RichSourceFunction<String> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<String> out) throws 
Exception {
+       public void run(SourceContext<String> ctx) throws Exception {
                while (isRunning) {
                        if (client.isDone()) {
                                if (LOG.isErrorEnabled()) {
@@ -219,7 +218,7 @@ public class TwitterSource extends 
RichSourceFunction<String> {
                                break;
                        }
 
-                       out.collect(queue.take());
+                       ctx.collect(queue.take());
 
                        if (maxNumberOfTweets != -1 && currentNumberOfTweets >= 
maxNumberOfTweets) {
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index a9166c1..2c85650 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +62,7 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<Tuple3<String, Long, 
Long>> collector) throws Exception {
+       public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws 
Exception {
                FileSystem fileSystem = FileSystem.get(new URI(path));
 
                while (isRunning) {
@@ -71,7 +70,7 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
                        for (String filePath : files) {
                                if (watchType == WatchType.ONLY_NEW_FILES
                                                || watchType == 
WatchType.REPROCESS_WITH_APPENDED) {
-                                       collector.collect(new Tuple3<String, 
Long, Long>(filePath, 0L, -1L));
+                                       ctx.collect(new Tuple3<String, Long, 
Long>(filePath, 0L, -1L));
                                        offsetOfFiles.put(filePath, -1L);
                                } else if (watchType == 
WatchType.PROCESS_ONLY_APPENDED) {
                                        long offset = 0;
@@ -80,7 +79,7 @@ public class FileMonitoringFunction implements 
SourceFunction<Tuple3<String, Lon
                                                offset = 
offsetOfFiles.get(filePath);
                                        }
 
-                                       collector.collect(new Tuple3<String, 
Long, Long>(filePath, offset, fileSize));
+                                       ctx.collect(new Tuple3<String, Long, 
Long>(filePath, offset, fileSize));
                                        offsetOfFiles.put(filePath, fileSize);
 
                                        LOG.info("File processed: {}, {}, {}", 
filePath, offset, fileSize);

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index ebbff9c..cf08e5a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -116,7 +115,7 @@ public class FileSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<OUT> out) throws 
Exception {
+       public void run(SourceContext<OUT> ctx) throws Exception {
                while (isRunning) {
                        OUT nextElement = serializer.createInstance();
                        nextElement =  format.nextRecord(nextElement);
@@ -126,7 +125,7 @@ public class FileSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT> {
                        } else if (nextElement == null) {
                                break;
                        }
-                       out.collect(nextElement);
+                       ctx.collect(nextElement);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 838cfa4..736cc73 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -21,8 +21,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.flink.util.Collector;
-
 public class FromElementsFunction<T> implements SourceFunction<T> {
        
        private static final long serialVersionUID = 1L;
@@ -44,11 +42,11 @@ public class FromElementsFunction<T> implements 
SourceFunction<T> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<T> out) throws 
Exception {
+       public void run(SourceContext<T> ctx) throws Exception {
                Iterator<T> it = iterable.iterator();
 
                while (isRunning && it.hasNext()) {
-                       out.collect(it.next());
+                       ctx.collect(it.next());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index 7320d77..655710e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.util.Collector;
-
 import java.util.Iterator;
 
 public class FromIteratorFunction<T> implements SourceFunction<T> {
@@ -34,9 +32,9 @@ public class FromIteratorFunction<T> implements 
SourceFunction<T> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<T> out) throws 
Exception {
+       public void run(SourceContext<T> ctx) throws Exception {
                while (isRunning && iterator.hasNext()) {
-                       out.collect(iterator.next());
+                       ctx.collect(iterator.next());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 61e1b7f..bc78e4d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 
 import java.util.Iterator;
@@ -46,9 +45,9 @@ public class FromSplittableIteratorFunction<T> extends 
RichParallelSourceFunctio
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<T> out) throws 
Exception {
+       public void run(SourceContext<T> ctx) throws Exception {
                while (isRunning && iterator.hasNext()) {
-                       out.collect(iterator.next());
+                       ctx.collect(iterator.next());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index c5800f0..fb66f16 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -26,7 +26,6 @@ import java.net.Socket;
 import java.net.SocketException;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,11 +63,11 @@ public class SocketTextStreamFunction extends 
RichSourceFunction<String> {
        }
 
        @Override
-       public void run(Object checkpointLock, Collector<String> collector) 
throws Exception {
-               streamFromSocket(collector, socket);
+       public void run(SourceContext<String> ctx) throws Exception {
+               streamFromSocket(ctx, socket);
        }
 
-       public void streamFromSocket(Collector<String> collector, Socket 
socket) throws Exception {
+       public void streamFromSocket(SourceContext<String> ctx, Socket socket) 
throws Exception {
                try {
                        StringBuffer buffer = new StringBuffer();
                        BufferedReader reader = new BufferedReader(new 
InputStreamReader(
@@ -117,7 +116,7 @@ public class SocketTextStreamFunction extends 
RichSourceFunction<String> {
                                }
 
                                if (data == delimiter) {
-                                       collector.collect(buffer.toString());
+                                       ctx.collect(buffer.toString());
                                        buffer = new StringBuffer();
                                } else if (data != '\r') { // ignore carriage 
return
                                        buffer.append((char) data);
@@ -125,7 +124,7 @@ public class SocketTextStreamFunction extends 
RichSourceFunction<String> {
                        }
 
                        if (buffer.length() > 0) {
-                               collector.collect(buffer.toString());
+                               ctx.collect(buffer.toString());
                        }
                } finally {
                        socket.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 8c349e9..4c6ec1a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
@@ -84,10 +83,9 @@ public interface SourceFunction<T> extends Function, 
Serializable {
         * elements. Also, the update of state and emission of elements must 
happen in the same
         * synchronized block.
         *
-        * @param checkpointLock The object to synchronize on when updating 
state and emitting elements.
-        * @param out The collector to use for emitting elements
+        * @param ctx The context for interaction with the outside world.
         */
-       void run(final Object checkpointLock, Collector<T> out) throws 
Exception;
+       void run(SourceContext<T> ctx) throws Exception;
 
        /**
         * Cancels the source. Most sources will have a while loop inside the
@@ -96,4 +94,27 @@ public interface SourceFunction<T> extends Function, 
Serializable {
         * is set to false in this method.
         */
        void cancel();
+
+       /**
+        * Interface that source functions use to communicate with the outside 
world. Normally
+        * sources would just emit elements in a loop using {@link #collect}. 
If the source is a
+        * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} 
source it must retrieve
+        * the checkpoint lock object and use it to protect state updates and 
element emission as
+        * described in {@link 
org.apache.flink.streaming.api.functions.source.SourceFunction}.
+        *
+        * @param <T> The type of the elements produced by the source.
+        */
+       public static interface SourceContext<T> {
+
+               /**
+                * Emits one element from the source.
+                */
+               public void collect(T element);
+
+               /**
+                * Returns the checkpoint lock. Please refer to the explanation 
about checkpointed sources
+                * in {@link 
org.apache.flink.streaming.api.functions.source.SourceFunction}.
+                */
+               public Object getCheckpointLock();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index e63349a..907f93a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -33,8 +33,20 @@ public class StreamSource<OUT> extends 
AbstractUdfStreamOperator<OUT, SourceFunc
                this.chainingStrategy = ChainingStrategy.HEAD;
        }
 
-       public void run(Object lockingObject, Collector<OUT> collector) throws 
Exception {
-               userFunction.run(lockingObject, collector);
+       public void run(final Object lockingObject, final Collector<OUT> 
collector) throws Exception {
+               SourceFunction.SourceContext<OUT> ctx = new 
SourceFunction.SourceContext<OUT>() {
+                       @Override
+                       public void collect(OUT element) {
+                               collector.collect(element);
+                       }
+
+                       @Override
+                       public Object getCheckpointLock() {
+                               return lockingObject;
+                       }
+               };
+
+               userFunction.run(ctx);
        }
 
        public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index fef8a31..3ad6b8e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -47,7 +46,7 @@ public class ChainedRuntimeContextTest {
        private static class TestSource extends 
RichParallelSourceFunction<Integer> {
 
                @Override
-               public void run(Object checkpointLock, Collector<Integer> out) 
throws Exception {
+               public void run(SourceContext<Integer> ctx) throws Exception {
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 1dbbc00..399297b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -118,7 +118,7 @@ public class TypeFillTest {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void run(Object checkpointLock, Collector<T> out) throws 
Exception {
+               public void run(SourceContext<T> ctx) throws Exception {
 
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 2858658..809668d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -484,11 +484,10 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                long cnt = 0;
 
                @Override
-               public void run(Object checkpointLock,
-                               Collector<OuterPojo> out) throws Exception {
+               public void run(SourceContext<OuterPojo> ctx) throws Exception {
                        for (int i = 0; i < 20; i++) {
                                OuterPojo result = new OuterPojo(new 
InnerPojo(cnt / 2, "water_melon-b"), 2L);
-                               out.collect(result);
+                               ctx.collect(result);
                        }
                }
 
@@ -502,11 +501,10 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void run(Object checkpointLock,
-                               Collector<Tuple2<Long, Tuple2<String, Long>>> 
out) throws Exception {
+               public void run(SourceContext<Tuple2<Long, Tuple2<String, 
Long>>> ctx) throws Exception {
                        for (int i = 0; i < 20; i++) {
                                Tuple2<Long, Tuple2<String, Long>> result = new 
Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L));
-                               out.collect(result);
+                               ctx.collect(result);
                        }
                }
 
@@ -622,10 +620,9 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
-               public void run(Object checkpointLock,
-                               Collector<Rectangle> out) throws Exception {
+               public void run(SourceContext<Rectangle> ctx) throws Exception {
                        for (int i = 0; i < 100; i++) {
-                               out.collect(rectangle);
+                               ctx.collect(rectangle);
                                rectangle = rectangle.next();
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index ec97984..ec8cda8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -162,9 +162,9 @@ public class WindowIntegrationTest implements Serializable {
                        private static final long serialVersionUID = 1L;
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Integer> out) throws Exception {
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
                                for (int i = 1; i <= 10; i++) {
-                                       out.collect(i);
+                                       ctx.collect(i);
                                }
                        }
 
@@ -189,9 +189,9 @@ public class WindowIntegrationTest implements Serializable {
                        }
 
                        @Override
-                       public void run(Object checkpointLock, 
Collector<Integer> out) throws Exception {
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
                                for (;i < 11; i += 2) {
-                                       out.collect(i);
+                                       ctx.collect(i);
                                }
 
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 55f4add..9085034 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 public class StreamVertexTest {
@@ -52,10 +51,10 @@ public class StreamVertexTest {
                private int i = 0;
 
                @Override
-               public void run(Object checkpointLock, 
Collector<Tuple1<Integer>> out) throws Exception {
+               public void run(SourceContext<Tuple1<Integer>> ctx) throws 
Exception {
                        for (int i = 0; i < 10; i++) {
                                tuple.f0 = i;
-                               out.collect(tuple);
+                               ctx.collect(tuple);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index ef44053..c745e6c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -169,7 +168,8 @@ public class SourceStreamTaskTest extends 
StreamTaskTestBase {
                }
 
                @Override
-               public void run(final Object lockObject, Collector<Tuple2<Long, 
Integer>> out) {
+               public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
+                       final Object lockObject = ctx.getCheckpointLock();
                        while (isRunning && count < maxElements) {
                                // simulate some work
                                try {
@@ -179,7 +179,7 @@ public class SourceStreamTaskTest extends 
StreamTaskTestBase {
                                }
 
                                synchronized (lockObject) {
-                                       out.collect(new Tuple2<Long, 
Integer>(lastCheckpointId, count));
+                                       ctx.collect(new Tuple2<Long, 
Integer>(lastCheckpointId, count));
                                        count++;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index 77600f3..5bf3b61 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -33,8 +33,20 @@ public class MockSource<T> {
                        ((RichSourceFunction<T>) sourceFunction).open(new 
Configuration());
                }
                try {
-                       Collector<T> collector = new MockOutput<T>(outputs);
-                       sourceFunction.run(new Object(), collector);
+                       final Collector<T> collector = new 
MockOutput<T>(outputs);
+                       final Object lockObject = new Object();
+                       SourceFunction.SourceContext<T> ctx = new 
SourceFunction.SourceContext<T>() {
+                               @Override
+                               public void collect(T element) {
+                                       collector.collect(element);
+                               }
+
+                               @Override
+                               public Object getCheckpointLock() {
+                                       return lockObject;
+                               }
+                       };
+                       sourceFunction.run(ctx);
                } catch (Exception e) {
                        throw new RuntimeException("Cannot invoke source.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 2682942..78d361d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -114,13 +113,13 @@ public class IterateExample {
                private volatile boolean isRunning = true;
 
                @Override
-               public void run(Object checkpointLock, 
Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
 
                        while (isRunning) {
                                int first = rnd.nextInt(BOUND / 2 - 1) + 1;
                                int second = rnd.nextInt(BOUND / 2 - 1) + 1;
 
-                               collector.collect(new Tuple2<Integer, 
Integer>(first, second));
+                               ctx.collect(new Tuple2<Integer, Integer>(first, 
second));
                                Thread.sleep(500L);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8d7e5de..0fec4c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
 
 import java.util.Random;
 
@@ -112,12 +111,12 @@ public class WindowJoin {
                }
 
                @Override
-               public void run(Object checkpointLock, Collector<Tuple2<String, 
Integer>> out) throws Exception {
+               public void run(SourceContext<Tuple2<String, Integer>> ctx) 
throws Exception {
                        while (isRunning) {
                                outTuple.f0 = names[rand.nextInt(names.length)];
                                outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
                                Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                               out.collect(outTuple);
+                               ctx.collect(outTuple);
                        }
                }
 
@@ -146,12 +145,12 @@ public class WindowJoin {
 
 
                @Override
-               public void run(Object checkpointLock, Collector<Tuple2<String, 
Integer>> out) throws Exception {
+               public void run(SourceContext<Tuple2<String, Integer>> ctx) 
throws Exception {
                        while (isRunning) {
                                outTuple.f0 = names[rand.nextInt(names.length)];
                                outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
                                Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                               out.collect(outTuple);
+                               ctx.collect(outTuple);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 1b30f59..48111f6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -97,9 +97,9 @@ public class IncrementalLearningSkeleton {
                private volatile boolean isRunning = true;
 
                @Override
-               public void run(Object checkpointLock, Collector<Integer> 
collector) throws Exception {
+               public void run(SourceContext<Integer> ctx) throws Exception {
                        while (isRunning) {
-                               collector.collect(getNewData());
+                               ctx.collect(getNewData());
                        }
                }
 
@@ -123,10 +123,10 @@ public class IncrementalLearningSkeleton {
                private int counter;
 
                @Override
-               public void run(Object checkpointLock, Collector<Integer> 
collector) throws Exception {
+               public void run(SourceContext<Integer> ctx) throws Exception {
                        Thread.sleep(15);
                        while (counter < 50) {
-                               collector.collect(getNewData());
+                               ctx.collect(getNewData());
                        }
                }
 
@@ -153,7 +153,7 @@ public class IncrementalLearningSkeleton {
                private volatile boolean isRunning = true;
 
                @Override
-               public void run(Object checkpointLock, Collector<Integer> 
collector) throws Exception {
+               public void run(SourceContext<Integer> collector) throws 
Exception {
                        while (isRunning) {
                                collector.collect(getTrainingData());
                        }
@@ -181,7 +181,7 @@ public class IncrementalLearningSkeleton {
                private int counter = 0;
 
                @Override
-               public void run(Object checkpointLock, Collector<Integer> 
collector) throws Exception {
+               public void run(SourceContext<Integer> collector) throws 
Exception {
                        while (counter < 8200) {
                                collector.collect(getTrainingData());
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 2dd9378..f8d8652 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -58,13 +57,12 @@ public class SessionWindowing {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
-                                       public void run(Object checkpointLock, 
Collector<Tuple3<String, Long, Integer>> collector)
-                                                       throws Exception {
+                                       public void 
run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
                                                for (Tuple3<String, Long, 
Integer> value : input) {
                                                        // We sleep three 
seconds between every output so we
                                                        // can see whether we 
properly detect sessions
                                                        // before the next 
start for a specific id
-                                                       
collector.collect(value);
+                                                       ctx.collect(value);
                                                        if (!fileOutput) {
                                                                
System.out.println("Collected: " + value);
                                                                
Thread.sleep(3000);

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 850e30d..657ce2a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
 
 import java.util.Arrays;
 import java.util.Random;
@@ -108,8 +107,7 @@ public class TopSpeedWindowingExample {
                }
 
                @Override
-               public void run(Object checkpointLock, 
Collector<Tuple4<Integer, Integer, Double, Long>> collector)
-                               throws Exception {
+               public void run(SourceContext<Tuple4<Integer, Integer, Double, 
Long>> ctx) throws Exception {
 
                        while (isRunning) {
                                Thread.sleep(1000);
@@ -122,7 +120,7 @@ public class TopSpeedWindowingExample {
                                        distances[carId] += speeds[carId] / 
3.6d;
                                        Tuple4<Integer, Integer, Double, Long> 
record = new Tuple4<Integer, Integer, Double, Long>(carId,
                                                        speeds[carId], 
distances[carId], System.currentTimeMillis());
-                                       collector.collect(record);
+                                       ctx.collect(record);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index b2ccf8c..008ad6c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateHandleProvider
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaEnv}
 import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, 
SourceFunction}
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{Collector, SplittableIterator}
@@ -405,12 +406,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * source functionality.
    *
    */
-  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): 
DataStream[T] = {
+  def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => 
Unit): DataStream[T] = {
     require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def run(lockObject: AnyRef, out: Collector[T]) {
-        cleanFun(out)
+      override def run(ctx: SourceContext[T]) {
+        cleanFun(ctx)
       }
       override def cancel() = {}
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 64e1f24..f0eef9d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
-import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -219,7 +218,9 @@ public class StreamCheckpointingITCase {
                }
 
                @Override
-               public void run(Object lockingObject, Collector out) throws 
Exception {
+               public void run(SourceContext<String> ctx) throws Exception {
+                       final Object lockingObject = ctx.getCheckpointLock();
+
                        while (isRunning && index < numElements) {
                                char first = (char) ((index % 40) + 40);
 
@@ -230,7 +231,7 @@ public class StreamCheckpointingITCase {
 
                                synchronized (lockingObject) {
                                        index += step;
-                                       out.collect(result);
+                                       ctx.collect(result);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 0b99e04..626b1d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 import org.junit.Assert;
 
 /**
@@ -124,12 +123,13 @@ public class ProcessFailureStreamingRecoveryITCase 
extends AbstractProcessFailur
                }
 
                @Override
-               public void run(Object checkpointLock, Collector<Long> 
collector) throws Exception {
+               public void run(SourceContext<Long> sourceCtx) throws Exception 
{
+                       final Object checkpointLock = 
sourceCtx.getCheckpointLock();
 
-                       StreamingRuntimeContext context = 
(StreamingRuntimeContext) getRuntimeContext();
+                       StreamingRuntimeContext runtimeCtx = 
(StreamingRuntimeContext) getRuntimeContext();
 
-                       final long stepSize = 
context.getNumberOfParallelSubtasks();
-                       final long congruence = context.getIndexOfThisSubtask();
+                       final long stepSize = 
runtimeCtx.getNumberOfParallelSubtasks();
+                       final long congruence = 
runtimeCtx.getIndexOfThisSubtask();
                        final long toCollect = (end % stepSize > congruence) ? 
(end / stepSize + 1) : (end / stepSize);
 
                        final File proceedFile = new File(coordinateDir, 
PROCEED_MARKER_FILE);
@@ -148,7 +148,7 @@ public class ProcessFailureStreamingRecoveryITCase extends 
AbstractProcessFailur
                                }
 
                                synchronized (checkpointLock) {
-                                       collector.collect(collected * stepSize 
+ congruence);
+                                       sourceCtx.collect(collected * stepSize 
+ congruence);
                                        collected++;
                                }
                        }

Reply via email to