bowenli86 commented on code in PR #274:
URL:
https://github.com/apache/flink-connector-kafka/pull/274#discussion_r3455868212
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -754,6 +864,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
@Override
public void close() throws IOException {
try {
+ kafkaMetadataServiceDiscoveryContext.close();
Review Comment:
DynamicKafkaSourceEnumerator.close() waits for
kafkaMetadataServiceDiscoveryContext.close() before closing
kafkaMetadataService. Since the discovery context uses shutdownNow() and then
waits indefinitely, a custom KafkaMetadataService with an in-flight metadata
call that does not respond to interruption, or only unblocks when close() is
called, could hang source coordinator shutdown.
Could we either close/unblock the metadata service before waiting for the
discovery worker, avoid serializing close() behind active metadata calls, or
use a bounded wait with explicit failure/logging?
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -587,17 +653,38 @@ private void startAllEnumerators() {
}
private void closeAllEnumeratorsAndContexts() {
- clusterEnumeratorMap.forEach(
+ Map<String, StoppableKafkaEnumContextProxy>
closingClusterEnumContextMap =
+ new HashMap<>(clusterEnumContextMap);
+ Map<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
+ closingClusterEnumeratorMap = new
HashMap<>(clusterEnumeratorMap);
+ closingClusterEnumContextMap
+ .values()
+ .forEach(StoppableKafkaEnumContextProxy::prepareForClose);
+ clusterEnumContextMap.clear();
+ clusterEnumeratorMap.clear();
+
+ enumeratorClosingExecutor.execute(
+ () ->
+ closeEnumeratorsAndContexts(
+ closingClusterEnumContextMap,
closingClusterEnumeratorMap));
+ }
+
+ private void closeEnumeratorsAndContexts(
+ Map<String, StoppableKafkaEnumContextProxy>
closingClusterEnumContextMap,
+ Map<String, SplitEnumerator<KafkaPartitionSplit,
KafkaSourceEnumState>>
+ closingClusterEnumeratorMap) {
+ closingClusterEnumeratorMap.forEach(
(cluster, subEnumerator) -> {
try {
- clusterEnumContextMap.get(cluster).close();
+ closingClusterEnumContextMap.get(cluster).close();
subEnumerator.close();
} catch (Exception e) {
- throw new RuntimeException(e);
+ enumContext.runInCoordinatorThread(
Review Comment:
The async stale-enumerator close path catches exceptions and posts a
throwing runnable back through enumContext.runInCoordinatorThread(). During
coordinator shutdown, that posted runnable can race with context shutdown and
potentially be dropped/rejected, while DynamicKafkaSourceEnumerator.close()
only waits for enumeratorClosingExecutor termination.
Could we retain the first async close failure, for example with a Future or
AtomicReference, and rethrow or at least log it from close() after awaiting the
close executor? That would make close failures observable even during shutdown.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -725,6 +812,29 @@ private Set<SplitAndAssignmentStatus> filterStateByTopics(
.collect(Collectors.toSet());
}
+ private void tuneEnumeratorAdminClientTimeouts(Properties consumerProps) {
+ tuneEnumeratorAdminClientTimeout(consumerProps,
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ tuneEnumeratorAdminClientTimeout(
+ consumerProps,
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ }
+
+ private void tuneEnumeratorAdminClientTimeout(Properties consumerProps,
String propertyKey) {
+ String configuredTimeoutMs = consumerProps.getProperty(propertyKey);
+ if (configuredTimeoutMs == null) {
+ return;
+ }
+
+ long readerTimeoutMs = Long.parseLong(configuredTimeoutMs);
+ long enumeratorTimeoutMs =
+ Math.max(1L, readerTimeoutMs /
ENUMERATOR_ADMIN_CLIENT_TIMEOUT_DIVISOR);
+ consumerProps.setProperty(propertyKey,
Long.toString(enumeratorTimeoutMs));
Review Comment:
The sub-enumerator timeout tuning silently rewrites `user/cluster-provided
request.timeout.ms` and `default.api.timeout.ms` to half their configured
values before constructing the Kafka sub-enumerator. I understand the
mitigation goal, but this is a behavior change for jobs that intentionally
configured those Kafka client timeouts.
Could we make this opt-in / dynamic-source-specific, or document why it is
safe to override user-supplied Kafka timeout settings here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]