This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6832eee  [ISSUE-2661] SyncClientPool And AsyncClientPool have 
concurrency problem (#2791)
6832eee is described below

commit 6832eee54dbc8c07e53263e36fe5f00f4ca5bf12
Author: wangchao316 <[email protected]>
AuthorDate: Wed Mar 10 15:37:46 2021 +0800

    [ISSUE-2661] SyncClientPool And AsyncClientPool have concurrency problem 
(#2791)
---
 .../iotdb/cluster/client/DataClientProvider.java   |   4 +-
 .../cluster/client/async/AsyncClientPool.java      |  34 ++++--
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  59 +++++----
 .../cluster/client/DataClientProviderTest.java     | 136 +++++++++++++++++++++
 .../cluster/client/sync/SyncClientPoolTest.java    |  17 ++-
 5 files changed, 204 insertions(+), 46 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 5277466..8b954ec 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -51,11 +51,11 @@ public class DataClientProvider {
     }
   }
 
-  private AsyncClientPool getDataAsyncClientPool() {
+  AsyncClientPool getDataAsyncClientPool() {
     return dataAsyncClientPool;
   }
 
-  private SyncClientPool getDataSyncClientPool() {
+  SyncClientPool getDataSyncClientPool() {
     return dataSyncClientPool;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index e28a20e..1ed9d23 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
+import org.apache.iotdb.db.utils.TestOnly;
 
 import org.apache.thrift.async.TAsyncMethodCall;
 import org.slf4j.Logger;
@@ -87,10 +88,15 @@ public class AsyncClientPool {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
-          client = waitForClient(clientStack, clusterNode, nodeClientNum);
+          client = waitForClient(clientStack, clusterNode);
         } else {
-          nodeClientNumMap.put(clusterNode, nodeClientNum + 1);
           client = asyncClientFactory.getAsyncClient(clusterNode, this);
+          nodeClientNumMap.compute(
+              clusterNode,
+              (n, oldValue) -> {
+                if (oldValue == null) return 1;
+                return oldValue + 1;
+              });
         }
       } else {
         client = clientStack.pop();
@@ -105,14 +111,13 @@ public class AsyncClientPool {
    * synchronize on the pool.
    *
    * @param clientStack
-   * @param node
-   * @param nodeClientNum
+   * @param clusterNode
    * @return
    * @throws IOException
    */
   @SuppressWarnings({"squid:S2273"}) // synchronized outside
-  private AsyncClient waitForClient(
-      Deque<AsyncClient> clientStack, ClusterNode node, int nodeClientNum) 
throws IOException {
+  private AsyncClient waitForClient(Deque<AsyncClient> clientStack, 
ClusterNode clusterNode)
+      throws IOException {
     // wait for an available client
     long waitStart = System.currentTimeMillis();
     while (clientStack.isEmpty()) {
@@ -121,16 +126,16 @@ public class AsyncClientPool {
         if (clientStack.isEmpty()
             && System.currentTimeMillis() - waitStart >= 
WAIT_CLIENT_TIMEOUT_MS) {
           logger.warn(
-              "Cannot get an available client after {}ms, create a new one, 
factory {} now is {}",
+              "Cannot get an available client after {}ms, create a new one.",
               WAIT_CLIENT_TIMEOUT_MS,
-              asyncClientFactory,
-              nodeClientNum);
-          nodeClientNumMap.put(node, nodeClientNum + 1);
-          return asyncClientFactory.getAsyncClient(node, this);
+              asyncClientFactory);
+          AsyncClient asyncClient = 
asyncClientFactory.getAsyncClient(clusterNode, this);
+          nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> 
oldValue + 1);
+          return asyncClient;
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Interrupted when waiting for an available client of {}", 
node);
+        logger.warn("Interrupted when waiting for an available client of {}", 
clusterNode);
         return null;
       }
     }
@@ -203,4 +208,9 @@ public class AsyncClientPool {
       this.notifyAll();
     }
   }
+
+  @TestOnly
+  public Map<ClusterNode, Integer> getNodeClientNumMap() {
+    return nodeClientNumMap;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 0ea6c84..95aba99 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -24,13 +24,12 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
+import org.apache.iotdb.db.utils.TestOnly;
 
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Map;
@@ -54,8 +53,8 @@ public class SyncClientPool {
   /**
    * See getClient(Node node, boolean activatedOnly)
    *
-   * @param node
-   * @return
+   * @param node the node want to connect
+   * @return if the node can connect, return the client, otherwise null
    */
   public Client getClient(Node node) {
     return getClient(node, true);
@@ -84,10 +83,22 @@ public class SyncClientPool {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
-          return waitForClient(clientStack, clusterNode, nodeClientNum);
+          return waitForClient(clientStack, clusterNode);
         } else {
-          nodeClientNumMap.put(clusterNode, nodeClientNum + 1);
-          return createClient(clusterNode, nodeClientNum);
+          Client client = null;
+          try {
+            client = syncClientFactory.getSyncClient(clusterNode, this);
+          } catch (TTransportException e) {
+            logger.error("Cannot open transport for client {}", node, e);
+            return null;
+          }
+          nodeClientNumMap.compute(
+              clusterNode,
+              (n, oldValue) -> {
+                if (oldValue == null) return 1;
+                return oldValue + 1;
+              });
+          return client;
         }
       } else {
         return clientStack.pop();
@@ -96,7 +107,7 @@ public class SyncClientPool {
   }
 
   @SuppressWarnings("squid:S2273") // synchronized outside
-  private Client waitForClient(Deque<Client> clientStack, ClusterNode node, 
int nodeClientNum) {
+  private Client waitForClient(Deque<Client> clientStack, ClusterNode 
clusterNode) {
     // wait for an available client
     long waitStart = System.currentTimeMillis();
     while (clientStack.isEmpty()) {
@@ -107,12 +118,16 @@ public class SyncClientPool {
           logger.warn(
               "Cannot get an available client after {}ms, create a new one",
               WAIT_CLIENT_TIMEOUT_MS);
-          nodeClientNumMap.computeIfPresent(node, (n, oldValue) -> oldValue + 
1);
-          return createClient(node, nodeClientNum);
+          Client client = syncClientFactory.getSyncClient(clusterNode, this);
+          nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> 
oldValue + 1);
+          return client;
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("Interrupted when waiting for an available client of {}", 
node);
+        logger.warn("Interrupted when waiting for an available client of {}", 
clusterNode);
+        return null;
+      } catch (TTransportException e) {
+        logger.error("Cannot open transport for client {}", clusterNode, e);
         return null;
       }
     }
@@ -122,10 +137,10 @@ public class SyncClientPool {
   /**
    * Return a client of a node to the pool. Closed client should not be 
returned.
    *
-   * @param node
-   * @param client
+   * @param node connection node
+   * @param client push client to pool
    */
-  void putClient(Node node, Client client) {
+  public void putClient(Node node, Client client) {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> 
new ArrayDeque<>());
@@ -147,18 +162,8 @@ public class SyncClientPool {
     }
   }
 
-  private Client createClient(ClusterNode node, int nodeClientNum) {
-    try {
-      return syncClientFactory.getSyncClient(node, this);
-    } catch (TTransportException e) {
-      if (e.getCause() instanceof ConnectException
-          || e.getCause() instanceof SocketTimeoutException) {
-        logger.debug("Cannot open transport for client {} : {}", node, 
e.getMessage());
-      } else {
-        logger.error("Cannot open transport for client {}", node, e);
-      }
-      nodeClientNumMap.put(node, nodeClientNum);
-      return null;
-    }
+  @TestOnly
+  public Map<ClusterNode, Integer> getNodeClientNumMap() {
+    return nodeClientNumMap;
   }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
index 3df9b26..5115956 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.cluster.client;
 
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterNode;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -32,6 +34,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -85,4 +89,136 @@ public class DataClientProviderTest {
       listenThread.join();
     }
   }
+
+  @Test
+  public void testSyncConcurrency() throws IOException, InterruptedException {
+    Node node = new ClusterNode();
+    node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
+    ServerSocket serverSocket = new ServerSocket(node.getDataPort());
+    Thread listenThread =
+        new Thread(
+            () -> {
+              while (!Thread.interrupted()) {
+                try {
+                  serverSocket.accept();
+                } catch (IOException e) {
+                  return;
+                }
+              }
+            });
+    listenThread.start();
+
+    try {
+      boolean useAsyncServer = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
+      ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(false);
+      
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(2);
+      DataClientProvider provider = new DataClientProvider(new Factory());
+      SyncDataClient client = null;
+      try {
+        client = provider.getSyncDataClient(node, 100);
+      } catch (TException e) {
+        Assert.fail(e.getMessage());
+      }
+      assertNotNull(client);
+
+      // now try to test multi thread
+      ExecutorService service = Executors.newFixedThreadPool(10);
+      for (int i = 0; i < 5; i++) {
+        service.submit(() -> provider.getSyncDataClient(node, 100));
+      }
+
+      // wait time should be great then 5000ms
+      Thread.currentThread().sleep(6000);
+      int totalNumber = 
provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
+      Assert.assertEquals(6, totalNumber);
+
+      for (int i = 0; i < 4; i++) {
+        service.submit(() -> provider.getSyncDataClient(node, 100));
+      }
+
+      Thread.currentThread().sleep(1000);
+      // return one client to pool
+      provider.getDataSyncClientPool().putClient(node, client);
+      // wait all finish
+      Thread.currentThread().sleep(6000);
+      totalNumber = 
provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
+
+      // 6 + 4 - 1
+      Assert.assertEquals(9, totalNumber);
+
+      
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    } finally {
+      serverSocket.close();
+      listenThread.interrupt();
+      listenThread.join();
+    }
+  }
+
+  @Test
+  public void testAsyncConcurrency() throws IOException, InterruptedException {
+    Node node = new ClusterNode();
+    node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
+    ServerSocket serverSocket = new ServerSocket(node.getDataPort());
+    Thread listenThread =
+        new Thread(
+            () -> {
+              while (!Thread.interrupted()) {
+                try {
+                  serverSocket.accept();
+                } catch (IOException e) {
+                  return;
+                }
+              }
+            });
+    listenThread.start();
+
+    try {
+      boolean useAsyncServer = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
+      ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
+      
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(2);
+      DataClientProvider provider = new DataClientProvider(new Factory());
+      AsyncDataClient client = null;
+      try {
+        client = provider.getAsyncDataClient(node, 100);
+      } catch (IOException e) {
+        Assert.fail(e.getMessage());
+      }
+      assertNotNull(client);
+
+      // now try to test multi thread
+      ExecutorService service = Executors.newFixedThreadPool(10);
+      for (int i = 0; i < 5; i++) {
+        service.submit(() -> provider.getAsyncDataClient(node, 100));
+      }
+
+      // wait time should be great then 5000ms
+      Thread.currentThread().sleep(6000);
+      int totalNumber = 
provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
+      Assert.assertEquals(6, totalNumber);
+
+      for (int i = 0; i < 4; i++) {
+        service.submit(() -> provider.getAsyncDataClient(node, 100));
+      }
+
+      Thread.currentThread().sleep(1000);
+      // return one client to pool
+      provider.getDataAsyncClientPool().putClient(node, client);
+      // wait all finish
+      Thread.currentThread().sleep(6000);
+      totalNumber = 
provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
+
+      // 6 + 4 - 1
+      Assert.assertEquals(9, totalNumber);
+
+      
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    } finally {
+      serverSocket.close();
+      listenThread.interrupt();
+      listenThread.join();
+    }
+  }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
index 19407a2..d14a31b 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
@@ -11,6 +11,8 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -28,9 +30,18 @@ public class SyncClientPoolTest {
 
   @Mock private SyncClientFactory testSyncClientFactory;
 
+  @Before
+  public void setUp() {
+    testSyncClientFactory = new TestSyncClientFactory();
+  }
+
+  @After
+  public void tearDown() {
+    testSyncClientFactory = null;
+  }
+
   @Test
   public void testTestClient() {
-    testSyncClientFactory = new TestSyncClientFactory();
     getClient();
     putClient();
   }
@@ -65,7 +76,6 @@ public class SyncClientPoolTest {
 
   @Test
   public void testPutBadClient() {
-    testSyncClientFactory = new TestSyncClientFactory();
     SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
     Client client = syncClientPool.getClient(TestUtils.getNode(0));
     client.getInputProtocol().getTransport().close();
@@ -78,7 +88,6 @@ public class SyncClientPoolTest {
   public void testMaxClient() {
     int maxClientNum = 
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
     
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(5);
-    testSyncClientFactory = new TestSyncClientFactory();
     SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
 
     for (int i = 0; i < 5; i++) {
@@ -98,7 +107,6 @@ public class SyncClientPoolTest {
         
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
     try {
       
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(10);
-      testSyncClientFactory = new TestSyncClientFactory();
       SyncClientPool syncClientPool = new 
SyncClientPool(testSyncClientFactory);
 
       Node node = TestUtils.getNode(0);
@@ -141,7 +149,6 @@ public class SyncClientPoolTest {
         
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
     try {
       
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(1);
-      testSyncClientFactory = new TestSyncClientFactory();
       SyncClientPool syncClientPool = new 
SyncClientPool(testSyncClientFactory);
 
       Node node = TestUtils.getNode(0);

Reply via email to