This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch jira_1447 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 12f7dd6eb7d962eb7ab4de7e5990d80cfb18db6f Author: jt2594838 <[email protected]> AuthorDate: Mon Jun 21 21:23:48 2021 +0800 fix synchronization key --- .../cluster/client/async/AsyncClientPool.java | 22 +++++++++++----------- .../iotdb/cluster/client/sync/SyncClientPool.java | 6 +++--- .../cluster/server/member/MetaGroupMember.java | 14 +++++++++++++- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java index a7441e4..8fc46f5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java @@ -85,7 +85,7 @@ public class AsyncClientPool { // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety. Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); - synchronized (this) { + synchronized (clientStack) { if (clientStack.isEmpty()) { int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0); if (nodeClientNum >= maxConnectionForEachNode) { @@ -126,9 +126,9 @@ public class AsyncClientPool { this.wait(waitClientTimeutMS); if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) { logger.warn( - "Cannot get an available client after {}ms, create a new one.", - waitClientTimeutMS, - asyncClientFactory); + "{} Cannot get an available client after {}ms, create a new one.", + asyncClientFactory, + waitClientTimeutMS); AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this); nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1); return asyncClient; @@ -171,9 +171,9 @@ public class AsyncClientPool { void onError(Node node) { ClusterNode clusterNode = new ClusterNode(node); // clean all cached clients when network fails - synchronized (this) { - Deque<AsyncClient> clientStack = - clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); + Deque<AsyncClient> clientStack = + clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); + synchronized (clientStack) { while (!clientStack.isEmpty()) { AsyncClient client = clientStack.pop(); if (client instanceof AsyncDataClient) { @@ -195,9 +195,9 @@ public class AsyncClientPool { void recreateClient(Node node) { ClusterNode clusterNode = new ClusterNode(node); - synchronized (this) { - Deque<AsyncClient> clientStack = - clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); + Deque<AsyncClient> clientStack = + clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); + synchronized (clientStack) { try { AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this); clientStack.push(asyncClient); @@ -205,7 +205,7 @@ public class AsyncClientPool { logger.error("Cannot create a new client for {}", node, e); nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1); } - this.notifyAll(); + clientStack.notifyAll(); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java index 758296f..0c4e3f1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java @@ -80,7 +80,7 @@ public class SyncClientPool { // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety. Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); - synchronized (this) { + synchronized (clientStack) { if (clientStack.isEmpty()) { int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0); if (nodeClientNum >= maxConnectionForEachNode) { @@ -144,7 +144,7 @@ public class SyncClientPool { ClusterNode clusterNode = new ClusterNode(node); // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety. Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>()); - synchronized (this) { + synchronized (clientStack) { if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) { clientStack.push(client); NodeStatusManager.getINSTANCE().activate(node); @@ -158,7 +158,7 @@ public class SyncClientPool { NodeStatusManager.getINSTANCE().deactivate(node); } } - this.notifyAll(); + clientStack.notifyAll(); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 0224c29..21dfd68 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -1001,6 +1001,11 @@ public class MetaGroupMember extends RaftMember { consistentNum.set(1); inconsistentNum.set(0); checkSeedNodesStatusOnce(consistentNum, inconsistentNum); + logger.debug( + "Status check result: {}-{}/{}", + consistentNum.get(), + inconsistentNum.get(), + getAllNodes().size()); canEstablishCluster = analyseStartUpCheckResult( consistentNum.get(), inconsistentNum.get(), getAllNodes().size()); @@ -1030,7 +1035,14 @@ public class MetaGroupMember extends RaftMember { } pool.submit( () -> { - CheckStatusResponse response = checkStatus(seedNode); + logger.debug("Checking status with {}", seedNode); + CheckStatusResponse response = null; + try { + response = checkStatus(seedNode); + } catch (Exception e) { + logger.warn("Exception during status check", e); + } + logger.debug("CheckStatusResponse from {}: {}", seedNode, response); if (response != null) { // check the response ClusterUtils.examineCheckStatusResponse(
