dawidwys commented on a change in pull request #12018:
URL: https://github.com/apache/flink/pull/12018#discussion_r422970769
##########
File path:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
##########
@@ -228,4 +226,33 @@ protected void doCommitInternalOffsetsToKafka(
// record the work to be committed by the main consumer thread
and make sure the consumer notices that
consumerThread.setOffsetsToCommit(offsetsToCommit,
commitCallback);
}
+
+ private class KafkaCollector implements Collector<T> {
+ private final Queue<T> records = new ArrayDeque<>();
+
+ private boolean endOfStreamSignalled = false;
+
+ @Override
+ public void collect(T record) {
+ // do not emit subsequent elements if the end of the
stream reached
Review comment:
Actually, that is the behaviour I had from the very beginning. Do you
think it makes more sense to emit all but the end of the stream record?
Side note. I find this method very confusing. I just realized that there is
no cross partition alignment on this method. The whole task will be brought
down if any of the assigned partitions signals the end of stream, irrespective
of the state of the remaining partitions. Honestly I'd be in favour of dropping
this method at some point in the future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]