Repository: flink Updated Branches: refs/heads/master 4b9e34047 -> 235b02cb5
[FLINK-2160] Change Streaming Source Interface to run(Context)/cancel() The context can be used to emit elements and retrieve the checkpointing lock object. In the future, the context can be extended to provide support for element emission with timestamps and dealing with the watermark system. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235b02cb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235b02cb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235b02cb Branch: refs/heads/master Commit: 235b02cb56d24bb5db3c254fa34661ccc0e1a083 Parents: 4b9e340 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Thu Jun 4 11:29:17 2015 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Jun 5 15:34:20 2015 +0200 ---------------------------------------------------------------------- .../hbase/example/HBaseWriteStreamExample.java | 2 +- .../connectors/kafka/KafkaProducerExample.java | 7 ++--- .../connectors/kafka/api/KafkaSource.java | 5 ++-- .../api/persistent/PersistentKafkaSource.java | 9 +++--- .../streaming/connectors/kafka/KafkaITCase.java | 31 ++++++++++---------- .../connectors/rabbitmq/RMQSource.java | 5 ++-- .../connectors/twitter/TwitterSource.java | 5 ++-- .../source/FileMonitoringFunction.java | 7 ++--- .../functions/source/FileSourceFunction.java | 5 ++-- .../functions/source/FromElementsFunction.java | 6 ++-- .../functions/source/FromIteratorFunction.java | 6 ++-- .../source/FromSplittableIteratorFunction.java | 5 ++-- .../source/SocketTextStreamFunction.java | 11 ++++--- .../api/functions/source/SourceFunction.java | 29 +++++++++++++++--- .../streaming/api/operators/StreamSource.java | 16 ++++++++-- .../api/ChainedRuntimeContextTest.java | 3 +- .../flink/streaming/api/TypeFillTest.java | 2 +- .../api/complex/ComplexIntegrationTest.java | 15 ++++------ .../windowing/WindowIntegrationTest.java | 8 ++--- .../api/streamtask/StreamVertexTest.java | 5 ++-- .../runtime/tasks/SourceStreamTaskTest.java | 6 ++-- .../apache/flink/streaming/util/MockSource.java | 16 ++++++++-- .../examples/iteration/IterateExample.java | 5 ++-- .../streaming/examples/join/WindowJoin.java | 9 +++--- .../ml/IncrementalLearningSkeleton.java | 12 ++++---- .../examples/windowing/SessionWindowing.java | 6 ++-- .../windowing/TopSpeedWindowingExample.java | 6 ++-- .../api/scala/StreamExecutionEnvironment.scala | 7 +++-- .../StreamCheckpointingITCase.java | 7 +++-- .../ProcessFailureStreamingRecoveryITCase.java | 12 ++++---- 30 files changed, 147 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java index 74c6c57..d1a61d3 100644 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java @@ -53,7 +53,7 @@ public class HBaseWriteStreamExample { private volatile boolean isRunning = true; @Override - public void run(Object checkpointLock, Collector<String> out) throws Exception { + public void run(SourceContext<String> out) throws Exception { while (isRunning) { out.collect(String.valueOf(Math.floor(Math.random() * 100))); } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index 4d98b1b..f241d1c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -21,7 +21,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.api.KafkaSink; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; -import org.apache.flink.util.Collector; @SuppressWarnings("serial") public class KafkaProducerExample { @@ -44,13 +43,13 @@ public class KafkaProducerExample { private volatile boolean running = true; @Override - public void run(Object checkpointLock, Collector<String> collector) throws Exception { + public void run(SourceContext<String> ctx) throws Exception { for (int i = 0; i < 20 && running; i++) { - collector.collect("message #" + i); + ctx.collect("message #" + i); Thread.sleep(100L); } - collector.collect("q"); + ctx.collect("q"); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index a4c56e4..3bcbfa7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -32,7 +32,6 @@ import kafka.javaapi.consumer.ConsumerConnector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.ConnectorSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,7 +185,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { } @Override - public void run(Object checkpointLock, Collector<OUT> collector) throws Exception { + public void run(SourceContext<OUT> ctx) throws Exception { // NOTE: Since this source is not checkpointed, we do not need to // acquire the checkpoint lock @@ -196,7 +195,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { if (schema.isEndOfStream(out)) { break; } - collector.collect(out); + ctx.collect(out); } } finally { consumer.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index 5d77a1a..bda6076 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.util.Collector; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,10 +168,12 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> } @Override - public void run(Object checkpointLock, Collector<OUT> collector) throws Exception { + public void run(SourceContext<OUT> ctx) throws Exception { if (iteratorToRead == null) { throw new IllegalStateException("Kafka iterator not initialized properly."); } + + final Object checkpointLock = ctx.getCheckpointLock(); while (running && iteratorToRead.hasNext()) { MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next(); @@ -190,7 +191,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> // make the state update and the element emission atomic synchronized (checkpointLock) { lastOffsets[message.partition()] = message.offset(); - collector.collect(next); + ctx.collect(next); } if (LOG.isTraceEnabled()) { @@ -378,4 +379,4 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index d695b09..0a12b07 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -399,13 +399,14 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<Tuple2<Integer, Integer>> collector) throws Exception { + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { LOG.info("Starting source."); int cnt = from; int partition = getRuntimeContext().getIndexOfThisSubtask(); while (running) { LOG.info("Writing " + cnt + " to partition " + partition); - collector.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); + ctx.collect(new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), + cnt)); if (cnt == to) { LOG.info("Writer reached end."); return; @@ -492,11 +493,11 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception { + public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { LOG.info("Starting source."); int cnt = 0; while (running) { - collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); + ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -576,11 +577,11 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception { + public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { LOG.info("Starting source."); int cnt = 0; while (running) { - collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); + ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); LOG.info("Produced " + cnt); try { @@ -668,14 +669,14 @@ public class KafkaITCase { } @Override - public void run(Object checkpointLock, Collector<Tuple2<Long, byte[]>> collector) throws Exception { + public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception { LOG.info("Starting source."); long cnt = 0; Random rnd = new Random(1337); while (running) { // byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))]; - collector.collect(new Tuple2<Long, byte[]>(cnt++, wl)); + ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl)); LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length); try { @@ -685,7 +686,7 @@ public class KafkaITCase { if(cnt == 10) { LOG.info("Send end signal"); // signal end - collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1})); + ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1})); running = false; } } @@ -776,11 +777,11 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<Tuple2<Long, String>> collector) throws Exception { + public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { LOG.info("Starting source."); int cnt = 0; while (running) { - collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); + ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++)); try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -869,11 +870,11 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<String> collector) throws Exception { + public void run(SourceContext<String> ctx) throws Exception { LOG.info("Starting source."); int cnt = 0; while (running) { - collector.collect("kafka-" + cnt++); + ctx.collect("kafka-" + cnt++); try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -911,12 +912,12 @@ public class KafkaITCase { boolean running = true; @Override - public void run(Object checkpointLock, Collector<String> collector) throws Exception { + public void run(SourceContext<String> ctx) throws Exception { LOG.info("Starting source."); int cnt = 0; while (running) { String msg = "kafka-" + cnt++; - collector.collect(msg); + ctx.collect(msg); LOG.info("sending message = "+msg); if ((cnt - 1) % 20 == 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 93812c2..b18b8d8 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -27,7 +27,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; -import org.apache.flink.util.Collector; public class RMQSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; @@ -86,7 +85,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> { } @Override - public void run(Object checkpointLock, Collector<OUT> out) throws Exception { + public void run(SourceContext<OUT> ctx) throws Exception { while (running) { delivery = consumer.nextDelivery(); @@ -95,7 +94,7 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> { break; } - out.collect(result); + ctx.collect(result); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java index a1a6d9b..bad0f8c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,7 +208,7 @@ public class TwitterSource extends RichSourceFunction<String> { } @Override - public void run(Object checkpointLock, Collector<String> out) throws Exception { + public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { if (client.isDone()) { if (LOG.isErrorEnabled()) { @@ -219,7 +218,7 @@ public class TwitterSource extends RichSourceFunction<String> { break; } - out.collect(queue.take()); + ctx.collect(queue.take()); if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) { break; http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java index a9166c1..2c85650 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java @@ -28,7 +28,6 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +62,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon } @Override - public void run(Object checkpointLock, Collector<Tuple3<String, Long, Long>> collector) throws Exception { + public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception { FileSystem fileSystem = FileSystem.get(new URI(path)); while (isRunning) { @@ -71,7 +70,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon for (String filePath : files) { if (watchType == WatchType.ONLY_NEW_FILES || watchType == WatchType.REPROCESS_WITH_APPENDED) { - collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L)); + ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L)); offsetOfFiles.put(filePath, -1L); } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) { long offset = 0; @@ -80,7 +79,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon offset = offsetOfFiles.get(filePath); } - collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize)); + ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize)); offsetOfFiles.put(filePath, fileSize); LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize); http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java index ebbff9c..cf08e5a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import org.apache.flink.util.Collector; import java.util.Iterator; import java.util.NoSuchElementException; @@ -116,7 +115,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> { } @Override - public void run(Object checkpointLock, Collector<OUT> out) throws Exception { + public void run(SourceContext<OUT> ctx) throws Exception { while (isRunning) { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); @@ -126,7 +125,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> { } else if (nextElement == null) { break; } - out.collect(nextElement); + ctx.collect(nextElement); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 838cfa4..736cc73 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -21,8 +21,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; -import org.apache.flink.util.Collector; - public class FromElementsFunction<T> implements SourceFunction<T> { private static final long serialVersionUID = 1L; @@ -44,11 +42,11 @@ public class FromElementsFunction<T> implements SourceFunction<T> { } @Override - public void run(Object checkpointLock, Collector<T> out) throws Exception { + public void run(SourceContext<T> ctx) throws Exception { Iterator<T> it = iterable.iterator(); while (isRunning && it.hasNext()) { - out.collect(it.next()); + ctx.collect(it.next()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java index 7320d77..655710e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.functions.source; -import org.apache.flink.util.Collector; - import java.util.Iterator; public class FromIteratorFunction<T> implements SourceFunction<T> { @@ -34,9 +32,9 @@ public class FromIteratorFunction<T> implements SourceFunction<T> { } @Override - public void run(Object checkpointLock, Collector<T> out) throws Exception { + public void run(SourceContext<T> ctx) throws Exception { while (isRunning && iterator.hasNext()) { - out.collect(iterator.next()); + ctx.collect(iterator.next()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java index 61e1b7f..bc78e4d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; import java.util.Iterator; @@ -46,9 +45,9 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio } @Override - public void run(Object checkpointLock, Collector<T> out) throws Exception { + public void run(SourceContext<T> ctx) throws Exception { while (isRunning && iterator.hasNext()) { - out.collect(iterator.next()); + ctx.collect(iterator.next()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index c5800f0..fb66f16 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -26,7 +26,6 @@ import java.net.Socket; import java.net.SocketException; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,11 +63,11 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { } @Override - public void run(Object checkpointLock, Collector<String> collector) throws Exception { - streamFromSocket(collector, socket); + public void run(SourceContext<String> ctx) throws Exception { + streamFromSocket(ctx, socket); } - public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception { + public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception { try { StringBuffer buffer = new StringBuffer(); BufferedReader reader = new BufferedReader(new InputStreamReader( @@ -117,7 +116,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { } if (data == delimiter) { - collector.collect(buffer.toString()); + ctx.collect(buffer.toString()); buffer = new StringBuffer(); } else if (data != '\r') { // ignore carriage return buffer.append((char) data); @@ -125,7 +124,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { } if (buffer.length() > 0) { - collector.collect(buffer.toString()); + ctx.collect(buffer.toString()); } } finally { socket.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 8c349e9..4c6ec1a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.util.Collector; import java.io.Serializable; @@ -84,10 +83,9 @@ public interface SourceFunction<T> extends Function, Serializable { * elements. Also, the update of state and emission of elements must happen in the same * synchronized block. * - * @param checkpointLock The object to synchronize on when updating state and emitting elements. - * @param out The collector to use for emitting elements + * @param ctx The context for interaction with the outside world. */ - void run(final Object checkpointLock, Collector<T> out) throws Exception; + void run(SourceContext<T> ctx) throws Exception; /** * Cancels the source. Most sources will have a while loop inside the @@ -96,4 +94,27 @@ public interface SourceFunction<T> extends Function, Serializable { * is set to false in this method. */ void cancel(); + + /** + * Interface that source functions use to communicate with the outside world. Normally + * sources would just emit elements in a loop using {@link #collect}. If the source is a + * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve + * the checkpoint lock object and use it to protect state updates and element emission as + * described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}. + * + * @param <T> The type of the elements produced by the source. + */ + public static interface SourceContext<T> { + + /** + * Emits one element from the source. + */ + public void collect(T element); + + /** + * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources + * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}. + */ + public Object getCheckpointLock(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index e63349a..907f93a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -33,8 +33,20 @@ public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunc this.chainingStrategy = ChainingStrategy.HEAD; } - public void run(Object lockingObject, Collector<OUT> collector) throws Exception { - userFunction.run(lockingObject, collector); + public void run(final Object lockingObject, final Collector<OUT> collector) throws Exception { + SourceFunction.SourceContext<OUT> ctx = new SourceFunction.SourceContext<OUT>() { + @Override + public void collect(OUT element) { + collector.collect(element); + } + + @Override + public Object getCheckpointLock() { + return lockingObject; + } + }; + + userFunction.run(ctx); } public void cancel() { http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java index fef8a31..3ad6b8e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; import org.junit.Test; @SuppressWarnings("serial") @@ -47,7 +46,7 @@ public class ChainedRuntimeContextTest { private static class TestSource extends RichParallelSourceFunction<Integer> { @Override - public void run(Object checkpointLock, Collector<Integer> out) throws Exception { + public void run(SourceContext<Integer> ctx) throws Exception { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index 1dbbc00..399297b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -118,7 +118,7 @@ public class TypeFillTest { private static final long serialVersionUID = 1L; @Override - public void run(Object checkpointLock, Collector<T> out) throws Exception { + public void run(SourceContext<T> ctx) throws Exception { } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index 2858658..809668d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -484,11 +484,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { long cnt = 0; @Override - public void run(Object checkpointLock, - Collector<OuterPojo> out) throws Exception { + public void run(SourceContext<OuterPojo> ctx) throws Exception { for (int i = 0; i < 20; i++) { OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L); - out.collect(result); + ctx.collect(result); } } @@ -502,11 +501,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { private static final long serialVersionUID = 1L; @Override - public void run(Object checkpointLock, - Collector<Tuple2<Long, Tuple2<String, Long>>> out) throws Exception { + public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception { for (int i = 0; i < 20; i++) { Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L)); - out.collect(result); + ctx.collect(result); } } @@ -622,10 +620,9 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase { } @Override - public void run(Object checkpointLock, - Collector<Rectangle> out) throws Exception { + public void run(SourceContext<Rectangle> ctx) throws Exception { for (int i = 0; i < 100; i++) { - out.collect(rectangle); + ctx.collect(rectangle); rectangle = rectangle.next(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java index ec97984..ec8cda8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java @@ -162,9 +162,9 @@ public class WindowIntegrationTest implements Serializable { private static final long serialVersionUID = 1L; @Override - public void run(Object checkpointLock, Collector<Integer> out) throws Exception { + public void run(SourceContext<Integer> ctx) throws Exception { for (int i = 1; i <= 10; i++) { - out.collect(i); + ctx.collect(i); } } @@ -189,9 +189,9 @@ public class WindowIntegrationTest implements Serializable { } @Override - public void run(Object checkpointLock, Collector<Integer> out) throws Exception { + public void run(SourceContext<Integer> ctx) throws Exception { for (;i < 11; i += 2) { - out.collect(i); + ctx.collect(i); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java index 55f4add..9085034 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java @@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; import org.junit.Test; public class StreamVertexTest { @@ -52,10 +51,10 @@ public class StreamVertexTest { private int i = 0; @Override - public void run(Object checkpointLock, Collector<Tuple1<Integer>> out) throws Exception { + public void run(SourceContext<Tuple1<Integer>> ctx) throws Exception { for (int i = 0; i < 10; i++) { tuple.f0 = i; - out.collect(tuple); + ctx.collect(tuple); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index ef44053..c745e6c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; -import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; import org.junit.Assert; import org.junit.Test; @@ -169,7 +168,8 @@ public class SourceStreamTaskTest extends StreamTaskTestBase { } @Override - public void run(final Object lockObject, Collector<Tuple2<Long, Integer>> out) { + public void run(SourceContext<Tuple2<Long, Integer>> ctx) { + final Object lockObject = ctx.getCheckpointLock(); while (isRunning && count < maxElements) { // simulate some work try { @@ -179,7 +179,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase { } synchronized (lockObject) { - out.collect(new Tuple2<Long, Integer>(lastCheckpointId, count)); + ctx.collect(new Tuple2<Long, Integer>(lastCheckpointId, count)); count++; } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java index 77600f3..5bf3b61 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java @@ -33,8 +33,20 @@ public class MockSource<T> { ((RichSourceFunction<T>) sourceFunction).open(new Configuration()); } try { - Collector<T> collector = new MockOutput<T>(outputs); - sourceFunction.run(new Object(), collector); + final Collector<T> collector = new MockOutput<T>(outputs); + final Object lockObject = new Object(); + SourceFunction.SourceContext<T> ctx = new SourceFunction.SourceContext<T>() { + @Override + public void collect(T element) { + collector.collect(element); + } + + @Override + public Object getCheckpointLock() { + return lockObject; + } + }; + sourceFunction.run(ctx); } catch (Exception e) { throw new RuntimeException("Cannot invoke source.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 2682942..78d361d 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; @@ -114,13 +113,13 @@ public class IterateExample { private volatile boolean isRunning = true; @Override - public void run(Object checkpointLock, Collector<Tuple2<Integer, Integer>> collector) throws Exception { + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { while (isRunning) { int first = rnd.nextInt(BOUND / 2 - 1) + 1; int second = rnd.nextInt(BOUND / 2 - 1) + 1; - collector.collect(new Tuple2<Integer, Integer>(first, second)); + ctx.collect(new Tuple2<Integer, Integer>(first, second)); Thread.sleep(500L); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 8d7e5de..0fec4c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.util.Collector; import java.util.Random; @@ -112,12 +111,12 @@ public class WindowJoin { } @Override - public void run(Object checkpointLock, Collector<Tuple2<String, Integer>> out) throws Exception { + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { outTuple.f0 = names[rand.nextInt(names.length)]; outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1; Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - out.collect(outTuple); + ctx.collect(outTuple); } } @@ -146,12 +145,12 @@ public class WindowJoin { @Override - public void run(Object checkpointLock, Collector<Tuple2<String, Integer>> out) throws Exception { + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { outTuple.f0 = names[rand.nextInt(names.length)]; outTuple.f1 = rand.nextInt(SALARY_MAX) + 1; Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - out.collect(outTuple); + ctx.collect(outTuple); } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 1b30f59..48111f6 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -97,9 +97,9 @@ public class IncrementalLearningSkeleton { private volatile boolean isRunning = true; @Override - public void run(Object checkpointLock, Collector<Integer> collector) throws Exception { + public void run(SourceContext<Integer> ctx) throws Exception { while (isRunning) { - collector.collect(getNewData()); + ctx.collect(getNewData()); } } @@ -123,10 +123,10 @@ public class IncrementalLearningSkeleton { private int counter; @Override - public void run(Object checkpointLock, Collector<Integer> collector) throws Exception { + public void run(SourceContext<Integer> ctx) throws Exception { Thread.sleep(15); while (counter < 50) { - collector.collect(getNewData()); + ctx.collect(getNewData()); } } @@ -153,7 +153,7 @@ public class IncrementalLearningSkeleton { private volatile boolean isRunning = true; @Override - public void run(Object checkpointLock, Collector<Integer> collector) throws Exception { + public void run(SourceContext<Integer> collector) throws Exception { while (isRunning) { collector.collect(getTrainingData()); } @@ -181,7 +181,7 @@ public class IncrementalLearningSkeleton { private int counter = 0; @Override - public void run(Object checkpointLock, Collector<Integer> collector) throws Exception { + public void run(SourceContext<Integer> collector) throws Exception { while (counter < 8200) { collector.collect(getTrainingData()); } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 2dd9378..f8d8652 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; -import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; @@ -58,13 +57,12 @@ public class SessionWindowing { private static final long serialVersionUID = 1L; @Override - public void run(Object checkpointLock, Collector<Tuple3<String, Long, Integer>> collector) - throws Exception { + public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception { for (Tuple3<String, Long, Integer> value : input) { // We sleep three seconds between every output so we // can see whether we properly detect sessions // before the next start for a specific id - collector.collect(value); + ctx.collect(value); if (!fileOutput) { System.out.println("Collected: " + value); Thread.sleep(3000); http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java index 850e30d..657ce2a 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.Random; @@ -108,8 +107,7 @@ public class TopSpeedWindowingExample { } @Override - public void run(Object checkpointLock, Collector<Tuple4<Integer, Integer, Double, Long>> collector) - throws Exception { + public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception { while (isRunning) { Thread.sleep(1000); @@ -122,7 +120,7 @@ public class TopSpeedWindowingExample { distances[carId] += speeds[carId] / 3.6d; Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId, speeds[carId], distances[carId], System.currentTimeMillis()); - collector.collect(record); + ctx.collect(record); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index b2ccf8c..008ad6c 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.runtime.state.StateHandleProvider import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction} import org.apache.flink.types.StringValue import org.apache.flink.util.{Collector, SplittableIterator} @@ -405,12 +406,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * source functionality. * */ - def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { + def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = { require(function != null, "Function must not be null.") val sourceFunction = new SourceFunction[T] { val cleanFun = StreamExecutionEnvironment.clean(function) - override def run(lockObject: AnyRef, out: Collector[T]) { - cleanFun(out) + override def run(ctx: SourceContext[T]) { + cleanFun(ctx) } override def cancel() = {} } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index 64e1f24..f0eef9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.test.util.ForkableFlinkMiniCluster; -import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -219,7 +218,9 @@ public class StreamCheckpointingITCase { } @Override - public void run(Object lockingObject, Collector out) throws Exception { + public void run(SourceContext<String> ctx) throws Exception { + final Object lockingObject = ctx.getCheckpointLock(); + while (isRunning && index < numElements) { char first = (char) ((index % 40) + 40); @@ -230,7 +231,7 @@ public class StreamCheckpointingITCase { synchronized (lockingObject) { index += step; - out.collect(result); + ctx.collect(result); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/235b02cb/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java index 0b99e04..626b1d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import org.apache.flink.util.Collector; import org.junit.Assert; /** @@ -124,12 +123,13 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur } @Override - public void run(Object checkpointLock, Collector<Long> collector) throws Exception { + public void run(SourceContext<Long> sourceCtx) throws Exception { + final Object checkpointLock = sourceCtx.getCheckpointLock(); - StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + StreamingRuntimeContext runtimeCtx = (StreamingRuntimeContext) getRuntimeContext(); - final long stepSize = context.getNumberOfParallelSubtasks(); - final long congruence = context.getIndexOfThisSubtask(); + final long stepSize = runtimeCtx.getNumberOfParallelSubtasks(); + final long congruence = runtimeCtx.getIndexOfThisSubtask(); final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); @@ -148,7 +148,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur } synchronized (checkpointLock) { - collector.collect(collected * stepSize + congruence); + sourceCtx.collect(collected * stepSize + congruence); collected++; } }