becketqin commented on a change in pull request #14239:
URL: https://github.com/apache/flink/pull/14239#discussion_r531331794



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##########
@@ -170,6 +178,70 @@ public void testSnapshotAndRestore() throws Exception {
 
        }
 
+       @Test
+       public void testCallableInterruptedDuringShutdownDoNotFailJob() throws 
InterruptedException {

Review comment:
       Very good suggestion! This should works quite well.
   
   I was actually struggling on this test a little bit. The tricky part is that 
`testingContext.close()` also closes the two executors. So it is not guaranteed 
that the error handling logic is actually tested every time the test runs. The 
`ManuallyTriggeredScheduledExecutorService` works well in this case because 
`triggerAll()` will still fire even after the executor has shutdown. It is 
probably not the generally correct behavior, but really helps in this 
particular test :)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -321,4 +328,37 @@ private void registerReader() {
        ListState<SplitT> getReaderState() {
                return readerState;
        }
+
+       // ----------- private class --------------
+
+       private static class CountingDataOutput<T> implements DataOutput<T> {

Review comment:
       Sure, I updated the patch to move this to the 
`SourceOperatorStreamTask`. It was actually my first choice as well. The 
reasons I eventually put it in the `SourceOperator` is that the 
`CountingOutput` are create in `AbstractStreamOperator` and 
`AbstractStreamOperatorV2`, which is at the operator level.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -131,7 +131,14 @@ public void start() {
                        LOG.info("Starting the KafkaSourceEnumerator for 
consumer group {} " +
                                "without periodic partition discovery.", 
consumerGroupId);
                        context.callAsync(
-                                       
this::discoverAndInitializePartitionSplit,
+                                       () -> {
+                                               try {
+                                                       return 
discoverAndInitializePartitionSplit();
+                                               } finally {
+                                                       // Close the admin 
client early because we won't use it anymore.
+                                                       adminClient.close();

Review comment:
       The admin client is created in the `start()` method, and closed in two 
different ways.
   1. If the partition discovery is performed periodically, the admin client is 
closed when the enumerator is closed. In this case, the 
`KafkaSourceEnumerator.close()` method closes the admin client. That is also 
why it is an instance variable.
   2. If the partition discovery is not performed periodically but just once at 
the beginning, the admin client is closed in the `SourceCoordinatorContext` 
worker thread after it completes the partition discovery. So we don't have the 
admin client running while not being used.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to