StephanEwen commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r599670918



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -267,6 +270,12 @@ void failJob(Throwable cause) {
     }
 
     void handleUncaughtExceptionFromAsyncCall(Throwable t) {
+        if (closed) {
+            // We do not trigger a job failure again if the source coordinator 
is already closed.
+            LOG.debug("Caught exception when shutting down the 
SourceCoordinatorContext.", t);

Review comment:
       Users may still be confused by this message here, showing an Exception 
and a stack trace.
   I would suggest to not even log here, because the contract is that 
exceptions after closing are suppressed. We do a similar thing during task 
cancellation: as soon as the status is CANCELLED, all exceptions are 
suppressed. It gives the cleanest logs and user experience.
   
   Alternatively, if you think logging here is crucial, I would change the log 
message to something like: "Ignoring async exception after closing the 
coordinator." so that users know they do not need to worry about that exception.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -103,18 +106,22 @@ public SourceCoordinatorContext(
             SimpleVersionedSerializer<SplitT> splitSerializer) {
         this(
                 coordinatorExecutor,
+                Executors.newScheduledThreadPool(
+                        numWorkerThreads,
+                        new ExecutorThreadFactory(
+                                
coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),
                 coordinatorThreadFactory,
-                numWorkerThreads,
                 operatorCoordinatorContext,
                 splitSerializer,
                 new SplitAssignmentTracker<>());
     }
 
     // Package private method for unit test.
+    @VisibleForTesting

Review comment:
       Side note: I think this is the right pattern used here.
   
   In the majority of cases, classes should have one core constructor that 
accepts all the internal components (dependencies) as arguments (except maybe 
simple utility types, like lists and maps that are initially empty). That 
constructor is needed to inject special dependencies for testing (like mocks).
   
    Then there are often utility constructors or factory methods so that not 
everyone who instantiates the class needs to instantiate all dependencies as 
well.
   
   I would encourage to try and apply that pattern as much as possible, it is 
quite useful in the long run.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##########
@@ -182,6 +191,44 @@ public void testSnapshotAndRestore() throws Exception {
                 restoredTracker.assignmentsByCheckpointId());
     }
 
+    @Test
+    public void testCallableInterruptedDuringShutdownDoNotFailJob() throws 
InterruptedException {
+        AtomicReference<Throwable> expectedError = new AtomicReference<>(null);
+
+        ManuallyTriggeredScheduledExecutorService manualWorkerExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        ManuallyTriggeredScheduledExecutorService manualCoordinatorExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+
+        SourceCoordinatorContext<MockSourceSplit> testingContext =
+                new SourceCoordinatorContext<>(
+                        manualCoordinatorExecutor,
+                        manualWorkerExecutor,
+                        new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+                                TEST_OPERATOR_ID.toHexString(), 
getClass().getClassLoader()),
+                        operatorCoordinatorContext,
+                        new MockSourceSplitSerializer(),
+                        splitSplitAssignmentTracker);
+
+        testingContext.callAsync(
+                () -> {
+                    throw new InterruptedException();
+                },
+                (ignored, e) -> {
+                    if (e != null) {
+                        expectedError.set(e);
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        manualWorkerExecutor.triggerAll();

Review comment:
       Is this needed, or "just in case"?
   
   (it looks like the `callAsync(...)` calls go directly to worker executor).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
##########
@@ -60,6 +60,12 @@
             }
         }
         if (!toLookup.isEmpty()) {
+            // First check the committed offsets.

Review comment:
       What actually happens here when committed offsets do not exist, because 
the user didn't specify a group ID?
   I assume then the group ID is null, will the PartitionOffsetsRetriever throw 
an exception in that case?
   
   Is it desirable that we always go through the steps [(1) specified (2) 
committed (3) fallback], or is it a separate case and the previous case [(1) 
specified (2) fallback] is equally valid?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -88,11 +90,15 @@ public void init() throws Exception {
             input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
         }
 
+        CountingOutput<T> countingOutput =

Review comment:
       How hard is it to add the counting to the `AsyncDataOutputToOutput` 
class instead, instead of adding the `CountingOutput` as a wrapper?
   
   This path is the most performance critical path in the system, the pushing 
of events through the chain. Any wrapping here has the potential of adding 
performance overhead, through virtual method calls. The exact effect is hard to 
estimate, because it depends in the end on how well the JIT can profile and 
optimize the whole thing.
   
   From everything I understand about the commonly used JIT, the `Output` 
interface and its subclasses are already a tricky case, because it has so many 
implementations that are used and mixed at the same time. We are more JIT 
friendly when we don't add more subclasses (and I think we should in fact try 
to reduce the subclasses that exist already).
   
   This may be a very small bit individually, but these small bits tend to add 
up in the long run as well, so always a good idea to continuously keep them in 
mind.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -435,6 +435,7 @@ private void parseAndSetRequiredProperties() {
                 true);
         maybeOverride(
                 ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new 
Random().nextLong(), false);
+        maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", 
false);

Review comment:
       I think the old Kafka Source always set this to `false` and did not 
allow an override.
   Is there ever a case when we want "auto offset commit" to be used? 
Committing on checkpoint completion is more meaningful, I think, when users 
want offsets in Kafka.
   
   I am not opposed to allowing this, I am just curious if this can lead to 
surprises by users who do not understand well that Flink manages the offsets 
directly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
##########
@@ -74,7 +74,8 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         closed = true;
-        coordinator.closeAsync(closingTimeoutMs);
+        // Wait for coordinator close before destructing any user class loader.
+        coordinator.closeAsync(closingTimeoutMs).get();

Review comment:
       I don't think we can do this here. All methods form the coordinator must 
be non-blocking, because they are called by the JobManager's scheduler thread. 
And that thread must not block, otherwise the whole job may become unresponsive.
   
   I'd like to understand a bit better how this is necessary to avoid 
`ClassNotFoundException`. Where is that exception thrown and can we prevent 
this differently, similar to the case where exceptions from callbacks are 
ignored once the SourceEnumeratorContext is closed.
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -442,10 +442,8 @@ private void parseAndSetRequiredProperties() {
                 true);
 
         // If the source is bounded, do not run periodic partition discovery.
-        if (maybeOverride(
-                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
-                "-1",
-                boundedness == Boundedness.BOUNDED)) {
+        if (boundedness == Boundedness.BOUNDED &&

Review comment:
       I don't quite agree with this. Is there a ticket that describes the 
reasoning behind this setting?
   
   Partition discovery is good to support, but I wouldn't activate it by 
default. The reasons are that
     1. This is a surprising change form the behavior of the previous Kafka 
Source
     2. I have met quite a few users that appreciate the "stable" subscription 
as a default model. Additional partitions suddenly appearing is a challenge in 
many setups anyways, and also frequently messes with order guarantees.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -407,7 +407,12 @@ public PartitionOffsetsRetrieverImpl(
                         .thenApply(
                                 result -> {
                                     Map<TopicPartition, Long> offsets = new 
HashMap<>();
-                                    result.forEach((tp, oam) -> 
offsets.put(tp, oam.offset()));
+                                    result.forEach(

Review comment:
       Is it reasonable to guard this with a unit test, or is that caught as 
part of some other tests (like broader tests) already?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to