Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114760511
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) 
throws Exception {
                        throw new Exception("The partitions were not set for 
the consumer");
                }
     
    -           // we need only do work, if we actually have partitions assigned
    -           if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -                   // create the fetcher that will communicate with the 
Kafka brokers
    -                   final AbstractFetcher<T, ?> fetcher = createFetcher(
    -                                   sourceContext,
    -                                   subscribedPartitionsToStartOffsets,
    -                                   periodicWatermarkAssigner,
    -                                   punctuatedWatermarkAssigner,
    -                                   (StreamingRuntimeContext) 
getRuntimeContext(),
    -                                   offsetCommitMode);
    -
    -                   // publish the reference, for snapshot-, commit-, and 
cancel calls
    -                   // IMPORTANT: We can only do that now, because only now 
will calls to
    -                   //            the fetchers 'snapshotCurrentState()' 
method return at least
    -                   //            the restored offsets
    -                   this.kafkaFetcher = fetcher;
    -                   if (!running) {
    -                           return;
    -                   }
    -                   
    -                   // (3) run the fetcher' main work method
    -                   fetcher.runFetchLoop();
    +           this.runThread = Thread.currentThread();
    +
    +           // mark the subtask as temporarily idle if there are no initial 
seed partitions;
    +           // once this subtask discovers some partitions and starts 
collecting records, the subtask's
    +           // status will automatically be triggered back to be active.
    +           if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +                   sourceContext.markAsTemporarilyIdle();
                }
    -           else {
    -                   // this source never completes, so emit a 
Long.MAX_VALUE watermark
    -                   // to not block watermark forwarding
    -                   sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
     
    -                   // wait until this is canceled
    -                   final Object waitLock = new Object();
    +           // create the fetcher that will communicate with the Kafka 
brokers
    +           final AbstractFetcher<T, ?> fetcher = createFetcher(
    +                           sourceContext,
    +                           subscribedPartitionsToStartOffsets,
    +                           periodicWatermarkAssigner,
    +                           punctuatedWatermarkAssigner,
    +                           (StreamingRuntimeContext) getRuntimeContext(),
    +                           offsetCommitMode);
    +
    +           // publish the reference, for snapshot-, commit-, and cancel 
calls
    +           // IMPORTANT: We can only do that now, because only now will 
calls to
    +           //            the fetchers 'snapshotCurrentState()' method 
return at least
    +           //            the restored offsets
    +           this.kafkaFetcher = fetcher;
    +
    +           if (!running) {
    +                   return;
    +           }
    +
    +           // depending on whether we were restored with the current state 
version (1.3),
    +           // remaining logic branches off into 2 paths:
    +           //  1) New state - main fetcher loop executed as separate 
thread, with this
    +           //                 thread running the partition discovery loop
    +           //  2) Old state - partition discovery is disabled, simply 
going into the main fetcher loop
    +
    +           if (!restoredFromOldState) {
    +                   final AtomicReference<Exception> fetcherErrorRef = new 
AtomicReference<>();
    +                   Thread fetcherThread = new Thread(new Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   try {
    +                                           // run the fetcher' main work 
method
    +                                           kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Before this, the Fetcher was run in the Task thread. I'm not sure that's 
strictly necessary anymore but in the past there were always problems if a 
Thread that is not the main Thread of a Task was emitting stuff.
    
    Is there a good reason for not starting the partition discoverer in a 
separate thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to