[
https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532674#comment-14532674
]
ASF GitHub Bot commented on FLINK-1977:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/659#discussion_r29854075
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
---
@@ -17,33 +17,56 @@
package org.apache.flink.streaming.api.operators;
-import java.io.Serializable;
+import java.util.Timer;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.Source;
-public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements
Serializable {
+public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT,
Source<OUT>> implements StreamOperator<OUT> {
- private static final long serialVersionUID = 1L;
+ private transient Timer timer;
+ private boolean isRunning = true;
- public StreamSource(SourceFunction<OUT> sourceFunction) {
+ public StreamSource(Source<OUT> sourceFunction) {
super(sourceFunction);
}
- @Override
- public void run() {
- callUserFunctionAndLogException();
+ public void run() throws Exception {
+ while (isRunning) {
+
+ synchronized (userFunction) {
+ if (userFunction.reachedEnd()) {
+ break;
+ }
+
+ OUT result = userFunction.next();
+
+ output.collect(result);
+ }
+ Thread.yield();
+ }
}
@Override
- @SuppressWarnings("unchecked")
- protected void callUserFunction() throws Exception {
- ((SourceFunction<OUT>) userFunction).run(collector);
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ timer = new Timer("punctuation timer");
+// timer.scheduleAtFixedRate(new TimerTask() {
+// @Override
+// public void run() {
+// synchronized (userFunction) {
+// Instant watermark =
userFunction.getLowWatermark();
+// System.out.println("Emitting watermark:
" + watermark);
+// }
+// }
+// }, 0, Duration.standardSeconds(1).getMillis());
--- End diff --
?
> Rework Stream Operators to always be push based
> -----------------------------------------------
>
> Key: FLINK-1977
> URL: https://issues.apache.org/jira/browse/FLINK-1977
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This is a result of the discussion on the mailing list. This is an excerpt
> from the mailing list that gives the basic idea of the change:
> I propose to change all streaming operators to be push based, with a
> slightly improved interface: In addition to collect(), which I would
> call receiveElement() I would add receivePunctuation() and
> receiveBarrier(). The first operator in the chain would also get data
> from the outside invokable that reads from the input iterator and
> calls receiveElement() for the first operator in a chain.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)