[
https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532597#comment-14532597
]
ASF GitHub Bot commented on FLINK-1977:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/659#discussion_r29850741
--- Diff:
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
---
@@ -178,38 +182,45 @@ private void initializeConnection() {
consumerIterator = stream.iterator();
}
- /**
- * Called to forward the data from the source to the {@link DataStream}.
- *
- * @param collector
- * The Collector for sending data to the dataStream
- */
@Override
- public void run(Collector<OUT> collector) throws Exception {
- isRunning = true;
- try {
- while (isRunning && consumerIterator.hasNext()) {
- OUT out =
schema.deserialize(consumerIterator.next().message());
- if (schema.isEndOfStream(out)) {
- break;
- }
- collector.collect(out);
+ public void open(Configuration config) throws Exception {
+ initializeConnection();
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ if (nextElement != null) {
+ return true;
+ } else if (consumerIterator.hasNext()) {
+ OUT out =
schema.deserialize(consumerIterator.next().message());
+ if (schema.isEndOfStream(out)) {
+ return false;
}
- } finally {
- consumer.shutdown();
+ nextElement = out;
}
+ return false;
}
@Override
- public void open(Configuration config) throws Exception {
- initializeConnection();
+ public OUT next() throws Exception {
+ if (nextElement != null) {
+ OUT out = nextElement;
+ nextElement = null;
+ return out;
+ }
+
+ MessageWithMetadata msg;
+ OUT out = schema.deserialize(consumerIterator.next().message());
+
+ if (schema.isEndOfStream(out)) {
--- End diff --
Ah .. this event is basically an illegal state? (reachedEnd() == true,
next() called) ?
> Rework Stream Operators to always be push based
> -----------------------------------------------
>
> Key: FLINK-1977
> URL: https://issues.apache.org/jira/browse/FLINK-1977
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This is a result of the discussion on the mailing list. This is an excerpt
> from the mailing list that gives the basic idea of the change:
> I propose to change all streaming operators to be push based, with a
> slightly improved interface: In addition to collect(), which I would
> call receiveElement() I would add receivePunctuation() and
> receiveBarrier(). The first operator in the chain would also get data
> from the outside invokable that reads from the input iterator and
> calls receiveElement() for the first operator in a chain.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)