becketqin commented on a change in pull request #14239: URL: https://github.com/apache/flink/pull/14239#discussion_r531331794
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java ########## @@ -170,6 +178,70 @@ public void testSnapshotAndRestore() throws Exception { } + @Test + public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException { Review comment: Very good suggestion! This should works quite well. I was actually struggling on this test a little bit. The tricky part is that `testingContext.close()` also closes the two executors. So it is not guaranteed that the error handling logic is actually tested every time the test runs. The `ManuallyTriggeredScheduledExecutorService` works well in this case because `triggerAll()` will still fire even after the executor has shutdown. It is probably not the generally correct behavior, but really helps in this particular test :) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java ########## @@ -321,4 +328,37 @@ private void registerReader() { ListState<SplitT> getReaderState() { return readerState; } + + // ----------- private class -------------- + + private static class CountingDataOutput<T> implements DataOutput<T> { Review comment: Sure, I updated the patch to move this to the `SourceOperatorStreamTask`. It was actually my first choice as well. The reasons I eventually put it in the `SourceOperator` is that the `CountingOutput` are create in `AbstractStreamOperator` and `AbstractStreamOperatorV2`, which is at the operator level. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java ########## @@ -131,7 +131,14 @@ public void start() { LOG.info("Starting the KafkaSourceEnumerator for consumer group {} " + "without periodic partition discovery.", consumerGroupId); context.callAsync( - this::discoverAndInitializePartitionSplit, + () -> { + try { + return discoverAndInitializePartitionSplit(); + } finally { + // Close the admin client early because we won't use it anymore. + adminClient.close(); Review comment: The admin client is created in the `start()` method, and closed in two different ways. 1. If the partition discovery is performed periodically, the admin client is closed when the enumerator is closed. In this case, the `KafkaSourceEnumerator.close()` method closes the admin client. That is also why it is an instance variable. 2. If the partition discovery is not performed periodically but just once at the beginning, the admin client is closed in the `SourceCoordinatorContext` worker thread after it completes the partition discovery. So we don't have the admin client running while not being used. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org