StephanEwen commented on a change in pull request #15161:
URL: https://github.com/apache/flink/pull/15161#discussion_r599670918
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -267,6 +270,12 @@ void failJob(Throwable cause) {
}
void handleUncaughtExceptionFromAsyncCall(Throwable t) {
+ if (closed) {
+ // We do not trigger a job failure again if the source coordinator
is already closed.
+ LOG.debug("Caught exception when shutting down the
SourceCoordinatorContext.", t);
Review comment:
Users may still be confused by this message here, showing an Exception
and a stack trace.
I would suggest to not even log here, because the contract is that
exceptions after closing are suppressed. We do a similar thing during task
cancellation: as soon as the status is CANCELLED, all exceptions are
suppressed. It gives the cleanest logs and user experience.
Alternatively, if you think logging here is crucial, I would change the log
message to something like: "Ignoring async exception after closing the
coordinator." so that users know they do not need to worry about that exception.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -103,18 +106,22 @@ public SourceCoordinatorContext(
SimpleVersionedSerializer<SplitT> splitSerializer) {
this(
coordinatorExecutor,
+ Executors.newScheduledThreadPool(
+ numWorkerThreads,
+ new ExecutorThreadFactory(
+
coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),
coordinatorThreadFactory,
- numWorkerThreads,
operatorCoordinatorContext,
splitSerializer,
new SplitAssignmentTracker<>());
}
// Package private method for unit test.
+ @VisibleForTesting
Review comment:
Side note: I think this is the right pattern used here.
In the majority of cases, classes should have one core constructor that
accepts all the internal components (dependencies) as arguments (except maybe
simple utility types, like lists and maps that are initially empty). That
constructor is needed to inject special dependencies for testing (like mocks).
Then there are often utility constructors or factory methods so that not
everyone who instantiates the class needs to instantiate all dependencies as
well.
I would encourage to try and apply that pattern as much as possible, it is
quite useful in the long run.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
##########
@@ -182,6 +191,44 @@ public void testSnapshotAndRestore() throws Exception {
restoredTracker.assignmentsByCheckpointId());
}
+ @Test
+ public void testCallableInterruptedDuringShutdownDoNotFailJob() throws
InterruptedException {
+ AtomicReference<Throwable> expectedError = new AtomicReference<>(null);
+
+ ManuallyTriggeredScheduledExecutorService manualWorkerExecutor =
+ new ManuallyTriggeredScheduledExecutorService();
+ ManuallyTriggeredScheduledExecutorService manualCoordinatorExecutor =
+ new ManuallyTriggeredScheduledExecutorService();
+
+ SourceCoordinatorContext<MockSourceSplit> testingContext =
+ new SourceCoordinatorContext<>(
+ manualCoordinatorExecutor,
+ manualWorkerExecutor,
+ new
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+ TEST_OPERATOR_ID.toHexString(),
getClass().getClassLoader()),
+ operatorCoordinatorContext,
+ new MockSourceSplitSerializer(),
+ splitSplitAssignmentTracker);
+
+ testingContext.callAsync(
+ () -> {
+ throw new InterruptedException();
+ },
+ (ignored, e) -> {
+ if (e != null) {
+ expectedError.set(e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ manualWorkerExecutor.triggerAll();
Review comment:
Is this needed, or "just in case"?
(it looks like the `callAsync(...)` calls go directly to worker executor).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
##########
@@ -60,6 +60,12 @@
}
}
if (!toLookup.isEmpty()) {
+ // First check the committed offsets.
Review comment:
What actually happens here when committed offsets do not exist, because
the user didn't specify a group ID?
I assume then the group ID is null, will the PartitionOffsetsRetriever throw
an exception in that case?
Is it desirable that we always go through the steps [(1) specified (2)
committed (3) fallback], or is it a separate case and the previous case [(1)
specified (2) fallback] is equally valid?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -88,11 +90,15 @@ public void init() throws Exception {
input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);
}
+ CountingOutput<T> countingOutput =
Review comment:
How hard is it to add the counting to the `AsyncDataOutputToOutput`
class instead, instead of adding the `CountingOutput` as a wrapper?
This path is the most performance critical path in the system, the pushing
of events through the chain. Any wrapping here has the potential of adding
performance overhead, through virtual method calls. The exact effect is hard to
estimate, because it depends in the end on how well the JIT can profile and
optimize the whole thing.
From everything I understand about the commonly used JIT, the `Output`
interface and its subclasses are already a tricky case, because it has so many
implementations that are used and mixed at the same time. We are more JIT
friendly when we don't add more subclasses (and I think we should in fact try
to reduce the subclasses that exist already).
This may be a very small bit individually, but these small bits tend to add
up in the long run as well, so always a good idea to continuously keep them in
mind.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -435,6 +435,7 @@ private void parseAndSetRequiredProperties() {
true);
maybeOverride(
ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new
Random().nextLong(), false);
+ maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
false);
Review comment:
I think the old Kafka Source always set this to `false` and did not
allow an override.
Is there ever a case when we want "auto offset commit" to be used?
Committing on checkpoint completion is more meaningful, I think, when users
want offsets in Kafka.
I am not opposed to allowing this, I am just curious if this can lead to
surprises by users who do not understand well that Flink manages the offsets
directly.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
##########
@@ -74,7 +74,8 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
closed = true;
- coordinator.closeAsync(closingTimeoutMs);
+ // Wait for coordinator close before destructing any user class loader.
+ coordinator.closeAsync(closingTimeoutMs).get();
Review comment:
I don't think we can do this here. All methods form the coordinator must
be non-blocking, because they are called by the JobManager's scheduler thread.
And that thread must not block, otherwise the whole job may become unresponsive.
I'd like to understand a bit better how this is necessary to avoid
`ClassNotFoundException`. Where is that exception thrown and can we prevent
this differently, similar to the case where exceptions from callbacks are
ignored once the SourceEnumeratorContext is closed.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -442,10 +442,8 @@ private void parseAndSetRequiredProperties() {
true);
// If the source is bounded, do not run periodic partition discovery.
- if (maybeOverride(
- KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
- "-1",
- boundedness == Boundedness.BOUNDED)) {
+ if (boundedness == Boundedness.BOUNDED &&
Review comment:
I don't quite agree with this. Is there a ticket that describes the
reasoning behind this setting?
Partition discovery is good to support, but I wouldn't activate it by
default. The reasons are that
1. This is a surprising change form the behavior of the previous Kafka
Source
2. I have met quite a few users that appreciate the "stable" subscription
as a default model. Additional partitions suddenly appearing is a challenge in
many setups anyways, and also frequently messes with order guarantees.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -407,7 +407,12 @@ public PartitionOffsetsRetrieverImpl(
.thenApply(
result -> {
Map<TopicPartition, Long> offsets = new
HashMap<>();
- result.forEach((tp, oam) ->
offsets.put(tp, oam.offset()));
+ result.forEach(
Review comment:
Is it reasonable to guard this with a unit test, or is that caught as
part of some other tests (like broader tests) already?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]