This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d8e93c368d KAFKA-13971:Atomicity violations caused by improper usage 
of ConcurrentHashMap (#12277)
d8e93c368d is described below

commit d8e93c368db2862502c8180e4d536edb20ac4d72
Author: Kvicii <42023367+kvi...@users.noreply.github.com>
AuthorDate: Thu Aug 18 00:32:24 2022 +0800

    KAFKA-13971:Atomicity violations caused by improper usage of 
ConcurrentHashMap (#12277)
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Chris Egerton 
<fearthecel...@gmail.com>
---
 .../connect/runtime/WorkerConfigTransformer.java   | 25 +++++++++-------------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 4d9c4c1f74..989de85e0c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -78,25 +78,20 @@ public class WorkerConfigTransformer implements 
AutoCloseable {
     }
 
     private void scheduleReload(String connectorName, String path, long ttl) {
-        Map<String, HerderRequest> connectorRequests = 
requests.get(connectorName);
-        if (connectorRequests == null) {
-            connectorRequests = new ConcurrentHashMap<>();
-            requests.put(connectorName, connectorRequests);
-        } else {
-            HerderRequest previousRequest = connectorRequests.get(path);
+        Map<String, HerderRequest> connectorRequests = 
requests.computeIfAbsent(connectorName, s -> new ConcurrentHashMap<>());
+        connectorRequests.compute(path, (s, previousRequest) -> {
             if (previousRequest != null) {
                 // Delete previous request for ttl which is now stale
                 previousRequest.cancel();
             }
-        }
-        log.info("Scheduling a restart of connector {} in {} ms", 
connectorName, ttl);
-        Callback<Void> cb = (error, result) -> {
-            if (error != null) {
-                log.error("Unexpected error during connector restart: ", 
error);
-            }
-        };
-        HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, cb);
-        connectorRequests.put(path, request);
+            log.info("Scheduling a restart of connector {} in {} ms", 
connectorName, ttl);
+            Callback<Void> cb = (error, result) -> {
+                if (error != null) {
+                    log.error("Unexpected error during connector restart: ", 
error);
+                }
+            };
+            return worker.herder().restartConnector(ttl, connectorName, cb);
+        });
     }
 
     @Override

Reply via email to