shanzi commented on code in PR #28069:
URL: https://github.com/apache/flink/pull/28069#discussion_r3241433314


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -225,15 +225,8 @@ private void setCurrentReader(int index) {
         } catch (Exception e) {
             throw new RuntimeException("Failed tp create reader", e);
         }
-        reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        availabilityFuture.complete(null);
-        LOG.debug(
-                "Reader started: subtask={} sourceIndex={} {}",
-                readerContext.getIndexOfSubtask(),
-                currentSourceIndex,
-                reader);
         // add restored splits
         if (!restoredSplits.isEmpty()) {

Review Comment:
   You mean, also move `currentSourceIndex = index` and `currentReader = 
reader` down?
   
   At line 241, `addSplits` is called, so before it is called we must make sure 
the current index and reader is already switched, or the `addSplits` method 
will add splits to the wrong reader (previous reader, or null).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to