This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 519d8ac5b9 KAFKA-14147: Prevent deferredTaskUpdates map from growing 
monotonically in KafkaConfigBackingStore (#12490)
519d8ac5b9 is described below

commit 519d8ac5b97bce1914eb302882fe6bdddaba4ea5
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Aug 22 19:43:43 2022 +0530

    KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in 
KafkaConfigBackingStore (#12490)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../org/apache/kafka/connect/storage/KafkaConfigBackingStore.java     | 4 +++-
 .../org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java | 3 +++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 76c626964e..6edd0e5a76 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -272,7 +272,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     private volatile SessionKey sessionKey;
 
     // Connector -> Map[ConnectorTaskId -> Configs]
-    private final Map<String, Map<ConnectorTaskId, Map<String, String>>> 
deferredTaskUpdates = new HashMap<>();
+    // visible for testing
+    final Map<String, Map<ConnectorTaskId, Map<String, String>>> 
deferredTaskUpdates = new HashMap<>();
 
     final Map<String, TargetState> connectorTargetStates = new HashMap<>();
 
@@ -853,6 +854,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> 
taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
                 removed = true;
             } else {
                 // Connector configs can be applied and callbacks invoked 
immediately
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index b374f8f5d2..f3b74f0666 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -842,6 +842,9 @@ public class KafkaConfigBackingStoreTest {
         // Task configs for the deleted connector should also be removed from 
the snapshot
         assertEquals(Collections.emptyList(), 
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
         assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+        // Ensure that the deleted connector's deferred task updates have been 
cleaned up
+        // in order to prevent unbounded growth of the map
+        assertEquals(Collections.emptyMap(), 
configStorage.deferredTaskUpdates);
 
         configStorage.stop();
 

Reply via email to