This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d8e93c368d KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277) d8e93c368d is described below commit d8e93c368db2862502c8180e4d536edb20ac4d72 Author: Kvicii <42023367+kvi...@users.noreply.github.com> AuthorDate: Thu Aug 18 00:32:24 2022 +0800 KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap (#12277) Reviewers: Divij Vaidya <di...@amazon.com>, Chris Egerton <fearthecel...@gmail.com> --- .../connect/runtime/WorkerConfigTransformer.java | 25 +++++++++------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 4d9c4c1f74..989de85e0c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -78,25 +78,20 @@ public class WorkerConfigTransformer implements AutoCloseable { } private void scheduleReload(String connectorName, String path, long ttl) { - Map<String, HerderRequest> connectorRequests = requests.get(connectorName); - if (connectorRequests == null) { - connectorRequests = new ConcurrentHashMap<>(); - requests.put(connectorName, connectorRequests); - } else { - HerderRequest previousRequest = connectorRequests.get(path); + Map<String, HerderRequest> connectorRequests = requests.computeIfAbsent(connectorName, s -> new ConcurrentHashMap<>()); + connectorRequests.compute(path, (s, previousRequest) -> { if (previousRequest != null) { // Delete previous request for ttl which is now stale previousRequest.cancel(); } - } - log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl); - Callback<Void> cb = (error, result) -> { - if (error != null) { - log.error("Unexpected error during connector restart: ", error); - } - }; - HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb); - connectorRequests.put(path, request); + log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl); + Callback<Void> cb = (error, result) -> { + if (error != null) { + log.error("Unexpected error during connector restart: ", error); + } + }; + return worker.herder().restartConnector(ttl, connectorName, cb); + }); } @Override