Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114793528
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) 
throws Exception {
                        throw new Exception("The partitions were not set for 
the consumer");
                }
     
    -           // we need only do work, if we actually have partitions assigned
    -           if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -                   // create the fetcher that will communicate with the 
Kafka brokers
    -                   final AbstractFetcher<T, ?> fetcher = createFetcher(
    -                                   sourceContext,
    -                                   subscribedPartitionsToStartOffsets,
    -                                   periodicWatermarkAssigner,
    -                                   punctuatedWatermarkAssigner,
    -                                   (StreamingRuntimeContext) 
getRuntimeContext(),
    -                                   offsetCommitMode);
    -
    -                   // publish the reference, for snapshot-, commit-, and 
cancel calls
    -                   // IMPORTANT: We can only do that now, because only now 
will calls to
    -                   //            the fetchers 'snapshotCurrentState()' 
method return at least
    -                   //            the restored offsets
    -                   this.kafkaFetcher = fetcher;
    -                   if (!running) {
    -                           return;
    -                   }
    -                   
    -                   // (3) run the fetcher' main work method
    -                   fetcher.runFetchLoop();
    +           this.runThread = Thread.currentThread();
    +
    +           // mark the subtask as temporarily idle if there are no initial 
seed partitions;
    +           // once this subtask discovers some partitions and starts 
collecting records, the subtask's
    +           // status will automatically be triggered back to be active.
    +           if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +                   sourceContext.markAsTemporarilyIdle();
                }
    -           else {
    -                   // this source never completes, so emit a 
Long.MAX_VALUE watermark
    -                   // to not block watermark forwarding
    -                   sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
     
    -                   // wait until this is canceled
    -                   final Object waitLock = new Object();
    +           // create the fetcher that will communicate with the Kafka 
brokers
    +           final AbstractFetcher<T, ?> fetcher = createFetcher(
    +                           sourceContext,
    +                           subscribedPartitionsToStartOffsets,
    +                           periodicWatermarkAssigner,
    +                           punctuatedWatermarkAssigner,
    +                           (StreamingRuntimeContext) getRuntimeContext(),
    +                           offsetCommitMode);
    +
    +           // publish the reference, for snapshot-, commit-, and cancel 
calls
    +           // IMPORTANT: We can only do that now, because only now will 
calls to
    +           //            the fetchers 'snapshotCurrentState()' method 
return at least
    +           //            the restored offsets
    +           this.kafkaFetcher = fetcher;
    +
    +           if (!running) {
    +                   return;
    +           }
    +
    +           // depending on whether we were restored with the current state 
version (1.3),
    +           // remaining logic branches off into 2 paths:
    +           //  1) New state - main fetcher loop executed as separate 
thread, with this
    +           //                 thread running the partition discovery loop
    +           //  2) Old state - partition discovery is disabled, simply 
going into the main fetcher loop
    +
    +           if (!restoredFromOldState) {
    +                   final AtomicReference<Exception> fetcherErrorRef = new 
AtomicReference<>();
    +                   Thread fetcherThread = new Thread(new Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   try {
    +                                           // run the fetcher' main work 
method
    +                                           kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Hmm, there actually isn't any good reason that this is required, as I can 
think of.
    
    one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher 
actually had always been emitting elements from different threads. So I didn't 
really assume which thread (main or separate) runs the fetcher loop and which 
one runs the discovery loop.
    
    but I think it's also ok to swap this here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to