This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 519d8ac5b9 KAFKA-14147: Prevent deferredTaskUpdates map from growing
monotonically in KafkaConfigBackingStore (#12490)
519d8ac5b9 is described below
commit 519d8ac5b97bce1914eb302882fe6bdddaba4ea5
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Aug 22 19:43:43 2022 +0530
KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in
KafkaConfigBackingStore (#12490)
Reviewers: Chris Egerton <[email protected]>
---
.../org/apache/kafka/connect/storage/KafkaConfigBackingStore.java | 4 +++-
.../org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java | 3 +++
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 76c626964e..6edd0e5a76 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -272,7 +272,8 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
private volatile SessionKey sessionKey;
// Connector -> Map[ConnectorTaskId -> Configs]
- private final Map<String, Map<ConnectorTaskId, Map<String, String>>>
deferredTaskUpdates = new HashMap<>();
+ // visible for testing
+ final Map<String, Map<ConnectorTaskId, Map<String, String>>>
deferredTaskUpdates = new HashMap<>();
final Map<String, TargetState> connectorTargetStates = new HashMap<>();
@@ -853,6 +854,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId ->
taskId.connector().equals(connectorName));
+ deferredTaskUpdates.remove(connectorName);
removed = true;
} else {
// Connector configs can be applied and callbacks invoked
immediately
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index b374f8f5d2..f3b74f0666 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -842,6 +842,9 @@ public class KafkaConfigBackingStoreTest {
// Task configs for the deleted connector should also be removed from
the snapshot
assertEquals(Collections.emptyList(),
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+ // Ensure that the deleted connector's deferred task updates have been
cleaned up
+ // in order to prevent unbounded growth of the map
+ assertEquals(Collections.emptyMap(),
configStorage.deferredTaskUpdates);
configStorage.stop();