dawidwys commented on a change in pull request #12018:
URL: https://github.com/apache/flink/pull/12018#discussion_r422970769



##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
##########
@@ -228,4 +226,33 @@ protected void doCommitInternalOffsetsToKafka(
                // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
                consumerThread.setOffsetsToCommit(offsetsToCommit, 
commitCallback);
        }
+
+       private class KafkaCollector implements Collector<T> {
+               private final Queue<T> records = new ArrayDeque<>();
+
+               private boolean endOfStreamSignalled = false;
+
+               @Override
+               public void collect(T record) {
+                       // do not emit subsequent elements if the end of the 
stream reached

Review comment:
       Actually, that is the behaviour I had from the very beginning. Do you 
think it makes more sense to emit all but the end of the stream record?
   
   Side note. I find this method very confusing. I just realized that there is 
no cross partition alignment on this method. The whole task will be brought 
down if any of the assigned partitions signals the end of stream, irrespective 
of the state of the remaining partitions. Honestly I'd be in favour of dropping 
this method at some point in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to