This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d8e93c368d KAFKA-13971:Atomicity violations caused by improper usage
of ConcurrentHashMap (#12277)
d8e93c368d is described below
commit d8e93c368db2862502c8180e4d536edb20ac4d72
Author: Kvicii <[email protected]>
AuthorDate: Thu Aug 18 00:32:24 2022 +0800
KAFKA-13971:Atomicity violations caused by improper usage of
ConcurrentHashMap (#12277)
Reviewers: Divij Vaidya <[email protected]>, Chris Egerton
<[email protected]>
---
.../connect/runtime/WorkerConfigTransformer.java | 25 +++++++++-------------
1 file changed, 10 insertions(+), 15 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 4d9c4c1f74..989de85e0c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -78,25 +78,20 @@ public class WorkerConfigTransformer implements
AutoCloseable {
}
private void scheduleReload(String connectorName, String path, long ttl) {
- Map<String, HerderRequest> connectorRequests =
requests.get(connectorName);
- if (connectorRequests == null) {
- connectorRequests = new ConcurrentHashMap<>();
- requests.put(connectorName, connectorRequests);
- } else {
- HerderRequest previousRequest = connectorRequests.get(path);
+ Map<String, HerderRequest> connectorRequests =
requests.computeIfAbsent(connectorName, s -> new ConcurrentHashMap<>());
+ connectorRequests.compute(path, (s, previousRequest) -> {
if (previousRequest != null) {
// Delete previous request for ttl which is now stale
previousRequest.cancel();
}
- }
- log.info("Scheduling a restart of connector {} in {} ms",
connectorName, ttl);
- Callback<Void> cb = (error, result) -> {
- if (error != null) {
- log.error("Unexpected error during connector restart: ",
error);
- }
- };
- HerderRequest request = worker.herder().restartConnector(ttl,
connectorName, cb);
- connectorRequests.put(path, request);
+ log.info("Scheduling a restart of connector {} in {} ms",
connectorName, ttl);
+ Callback<Void> cb = (error, result) -> {
+ if (error != null) {
+ log.error("Unexpected error during connector restart: ",
error);
+ }
+ };
+ return worker.herder().restartConnector(ttl, connectorName, cb);
+ });
}
@Override