bowenli-oai commented on code in PR #277:
URL:
https://github.com/apache/flink-connector-kafka/pull/277#discussion_r3470452697
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java:
##########
@@ -1243,6 +1246,12 @@ private Set<String> findMetrics(InMemoryReporter
inMemoryReporter, String groupP
.collect(Collectors.toSet());
}
+ private Set<String> findKafkaClusterMetrics(InMemoryReporter
inMemoryReporter) {
+ return findMetrics(inMemoryReporter,
DYNAMIC_KAFKA_SOURCE_METRIC_GROUP).stream()
Review Comment:
Could we avoid selecting a single arbitrary `DynamicKafkaSource` group here?
This PR registers `activeSplitCount` directly on the parent group, so
`findGroup(DYNAMIC_KAFKA_SOURCE_METRIC_GROUP)` can return that parent;
`getMetricsByGroup()` then contains only the root gauge, and the
`.kafkaCluster.` filter yields an empty set. That is consistent with the
current 2.2.1/JDK 21 failure at line 1125. Please aggregate
`findGroups(...)`/all matching groups or explicitly select kafka-cluster child
groups. The same single-group pattern in `hasKafkaClusterMetrics()` can also
make disappearance checks pass prematurely.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java:
##########
@@ -630,6 +643,20 @@ private void closeAllReadersAndClearState() {
}
clusterReaderMap.clear();
clustersProperties.clear();
+ activeSplitCount.set(0);
Review Comment:
Can we avoid publishing `0` from the intermediate clear step? A normal
metadata rebuild can preserve assigned splits, but metric reporters read this
`AtomicInteger` concurrently and can sample `0` after this line before
`refreshActiveSplitCount()` publishes the rebuilt count at line 357. For an
autoscaler-facing stable signal, that transient is indistinguishable from “all
local splits removed.” Since this method’s only caller refreshes after
rebuilding, it seems safer to publish only the final count and keep the
explicit zero in `close()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]