This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6832eee [ISSUE-2661] SyncClientPool And AsyncClientPool have
concurrency problem (#2791)
6832eee is described below
commit 6832eee54dbc8c07e53263e36fe5f00f4ca5bf12
Author: wangchao316 <[email protected]>
AuthorDate: Wed Mar 10 15:37:46 2021 +0800
[ISSUE-2661] SyncClientPool And AsyncClientPool have concurrency problem
(#2791)
---
.../iotdb/cluster/client/DataClientProvider.java | 4 +-
.../cluster/client/async/AsyncClientPool.java | 34 ++++--
.../iotdb/cluster/client/sync/SyncClientPool.java | 59 +++++----
.../cluster/client/DataClientProviderTest.java | 136 +++++++++++++++++++++
.../cluster/client/sync/SyncClientPoolTest.java | 17 ++-
5 files changed, 204 insertions(+), 46 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 5277466..8b954ec 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -51,11 +51,11 @@ public class DataClientProvider {
}
}
- private AsyncClientPool getDataAsyncClientPool() {
+ AsyncClientPool getDataAsyncClientPool() {
return dataAsyncClientPool;
}
- private SyncClientPool getDataSyncClientPool() {
+ SyncClientPool getDataSyncClientPool() {
return dataSyncClientPool;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index e28a20e..1ed9d23 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.utils.ClusterNode;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.async.TAsyncMethodCall;
import org.slf4j.Logger;
@@ -87,10 +88,15 @@ public class AsyncClientPool {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
- client = waitForClient(clientStack, clusterNode, nodeClientNum);
+ client = waitForClient(clientStack, clusterNode);
} else {
- nodeClientNumMap.put(clusterNode, nodeClientNum + 1);
client = asyncClientFactory.getAsyncClient(clusterNode, this);
+ nodeClientNumMap.compute(
+ clusterNode,
+ (n, oldValue) -> {
+ if (oldValue == null) return 1;
+ return oldValue + 1;
+ });
}
} else {
client = clientStack.pop();
@@ -105,14 +111,13 @@ public class AsyncClientPool {
* synchronize on the pool.
*
* @param clientStack
- * @param node
- * @param nodeClientNum
+ * @param clusterNode
* @return
* @throws IOException
*/
@SuppressWarnings({"squid:S2273"}) // synchronized outside
- private AsyncClient waitForClient(
- Deque<AsyncClient> clientStack, ClusterNode node, int nodeClientNum)
throws IOException {
+ private AsyncClient waitForClient(Deque<AsyncClient> clientStack,
ClusterNode clusterNode)
+ throws IOException {
// wait for an available client
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
@@ -121,16 +126,16 @@ public class AsyncClientPool {
if (clientStack.isEmpty()
&& System.currentTimeMillis() - waitStart >=
WAIT_CLIENT_TIMEOUT_MS) {
logger.warn(
- "Cannot get an available client after {}ms, create a new one,
factory {} now is {}",
+ "Cannot get an available client after {}ms, create a new one.",
WAIT_CLIENT_TIMEOUT_MS,
- asyncClientFactory,
- nodeClientNum);
- nodeClientNumMap.put(node, nodeClientNum + 1);
- return asyncClientFactory.getAsyncClient(node, this);
+ asyncClientFactory);
+ AsyncClient asyncClient =
asyncClientFactory.getAsyncClient(clusterNode, this);
+ nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) ->
oldValue + 1);
+ return asyncClient;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.warn("Interrupted when waiting for an available client of {}",
node);
+ logger.warn("Interrupted when waiting for an available client of {}",
clusterNode);
return null;
}
}
@@ -203,4 +208,9 @@ public class AsyncClientPool {
this.notifyAll();
}
}
+
+ @TestOnly
+ public Map<ClusterNode, Integer> getNodeClientNumMap() {
+ return nodeClientNumMap;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 0ea6c84..95aba99 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -24,13 +24,12 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.utils.ClusterNode;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
@@ -54,8 +53,8 @@ public class SyncClientPool {
/**
* See getClient(Node node, boolean activatedOnly)
*
- * @param node
- * @return
+ * @param node the node want to connect
+ * @return if the node can connect, return the client, otherwise null
*/
public Client getClient(Node node) {
return getClient(node, true);
@@ -84,10 +83,22 @@ public class SyncClientPool {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
- return waitForClient(clientStack, clusterNode, nodeClientNum);
+ return waitForClient(clientStack, clusterNode);
} else {
- nodeClientNumMap.put(clusterNode, nodeClientNum + 1);
- return createClient(clusterNode, nodeClientNum);
+ Client client = null;
+ try {
+ client = syncClientFactory.getSyncClient(clusterNode, this);
+ } catch (TTransportException e) {
+ logger.error("Cannot open transport for client {}", node, e);
+ return null;
+ }
+ nodeClientNumMap.compute(
+ clusterNode,
+ (n, oldValue) -> {
+ if (oldValue == null) return 1;
+ return oldValue + 1;
+ });
+ return client;
}
} else {
return clientStack.pop();
@@ -96,7 +107,7 @@ public class SyncClientPool {
}
@SuppressWarnings("squid:S2273") // synchronized outside
- private Client waitForClient(Deque<Client> clientStack, ClusterNode node,
int nodeClientNum) {
+ private Client waitForClient(Deque<Client> clientStack, ClusterNode
clusterNode) {
// wait for an available client
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
@@ -107,12 +118,16 @@ public class SyncClientPool {
logger.warn(
"Cannot get an available client after {}ms, create a new one",
WAIT_CLIENT_TIMEOUT_MS);
- nodeClientNumMap.computeIfPresent(node, (n, oldValue) -> oldValue +
1);
- return createClient(node, nodeClientNum);
+ Client client = syncClientFactory.getSyncClient(clusterNode, this);
+ nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) ->
oldValue + 1);
+ return client;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.warn("Interrupted when waiting for an available client of {}",
node);
+ logger.warn("Interrupted when waiting for an available client of {}",
clusterNode);
+ return null;
+ } catch (TTransportException e) {
+ logger.error("Cannot open transport for client {}", clusterNode, e);
return null;
}
}
@@ -122,10 +137,10 @@ public class SyncClientPool {
/**
* Return a client of a node to the pool. Closed client should not be
returned.
*
- * @param node
- * @param client
+ * @param node connection node
+ * @param client push client to pool
*/
- void putClient(Node node, Client client) {
+ public void putClient(Node node, Client client) {
ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n ->
new ArrayDeque<>());
@@ -147,18 +162,8 @@ public class SyncClientPool {
}
}
- private Client createClient(ClusterNode node, int nodeClientNum) {
- try {
- return syncClientFactory.getSyncClient(node, this);
- } catch (TTransportException e) {
- if (e.getCause() instanceof ConnectException
- || e.getCause() instanceof SocketTimeoutException) {
- logger.debug("Cannot open transport for client {} : {}", node,
e.getMessage());
- } else {
- logger.error("Cannot open transport for client {}", node, e);
- }
- nodeClientNumMap.put(node, nodeClientNum);
- return null;
- }
+ @TestOnly
+ public Map<ClusterNode, Integer> getNodeClientNumMap() {
+ return nodeClientNumMap;
}
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
index 3df9b26..5115956 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.cluster.client;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterNode;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -32,6 +34,8 @@ import org.junit.Test;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.junit.Assert.assertNotNull;
@@ -85,4 +89,136 @@ public class DataClientProviderTest {
listenThread.join();
}
}
+
+ @Test
+ public void testSyncConcurrency() throws IOException, InterruptedException {
+ Node node = new ClusterNode();
+ node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
+ ServerSocket serverSocket = new ServerSocket(node.getDataPort());
+ Thread listenThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ serverSocket.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ listenThread.start();
+
+ try {
+ boolean useAsyncServer =
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
+ ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(false);
+
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(2);
+ DataClientProvider provider = new DataClientProvider(new Factory());
+ SyncDataClient client = null;
+ try {
+ client = provider.getSyncDataClient(node, 100);
+ } catch (TException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertNotNull(client);
+
+ // now try to test multi thread
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 5; i++) {
+ service.submit(() -> provider.getSyncDataClient(node, 100));
+ }
+
+ // wait time should be great then 5000ms
+ Thread.currentThread().sleep(6000);
+ int totalNumber =
provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
+ Assert.assertEquals(6, totalNumber);
+
+ for (int i = 0; i < 4; i++) {
+ service.submit(() -> provider.getSyncDataClient(node, 100));
+ }
+
+ Thread.currentThread().sleep(1000);
+ // return one client to pool
+ provider.getDataSyncClientPool().putClient(node, client);
+ // wait all finish
+ Thread.currentThread().sleep(6000);
+ totalNumber =
provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
+
+ // 6 + 4 - 1
+ Assert.assertEquals(9, totalNumber);
+
+
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ serverSocket.close();
+ listenThread.interrupt();
+ listenThread.join();
+ }
+ }
+
+ @Test
+ public void testAsyncConcurrency() throws IOException, InterruptedException {
+ Node node = new ClusterNode();
+ node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
+ ServerSocket serverSocket = new ServerSocket(node.getDataPort());
+ Thread listenThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ serverSocket.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ listenThread.start();
+
+ try {
+ boolean useAsyncServer =
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
+ ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
+
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(2);
+ DataClientProvider provider = new DataClientProvider(new Factory());
+ AsyncDataClient client = null;
+ try {
+ client = provider.getAsyncDataClient(node, 100);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertNotNull(client);
+
+ // now try to test multi thread
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 5; i++) {
+ service.submit(() -> provider.getAsyncDataClient(node, 100));
+ }
+
+ // wait time should be great then 5000ms
+ Thread.currentThread().sleep(6000);
+ int totalNumber =
provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
+ Assert.assertEquals(6, totalNumber);
+
+ for (int i = 0; i < 4; i++) {
+ service.submit(() -> provider.getAsyncDataClient(node, 100));
+ }
+
+ Thread.currentThread().sleep(1000);
+ // return one client to pool
+ provider.getDataAsyncClientPool().putClient(node, client);
+ // wait all finish
+ Thread.currentThread().sleep(6000);
+ totalNumber =
provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
+
+ // 6 + 4 - 1
+ Assert.assertEquals(9, totalNumber);
+
+
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ serverSocket.close();
+ listenThread.interrupt();
+ listenThread.join();
+ }
+ }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
index 19407a2..d14a31b 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
@@ -11,6 +11,8 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -28,9 +30,18 @@ public class SyncClientPoolTest {
@Mock private SyncClientFactory testSyncClientFactory;
+ @Before
+ public void setUp() {
+ testSyncClientFactory = new TestSyncClientFactory();
+ }
+
+ @After
+ public void tearDown() {
+ testSyncClientFactory = null;
+ }
+
@Test
public void testTestClient() {
- testSyncClientFactory = new TestSyncClientFactory();
getClient();
putClient();
}
@@ -65,7 +76,6 @@ public class SyncClientPoolTest {
@Test
public void testPutBadClient() {
- testSyncClientFactory = new TestSyncClientFactory();
SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
Client client = syncClientPool.getClient(TestUtils.getNode(0));
client.getInputProtocol().getTransport().close();
@@ -78,7 +88,6 @@ public class SyncClientPoolTest {
public void testMaxClient() {
int maxClientNum =
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(5);
- testSyncClientFactory = new TestSyncClientFactory();
SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
for (int i = 0; i < 5; i++) {
@@ -98,7 +107,6 @@ public class SyncClientPoolTest {
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
try {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(10);
- testSyncClientFactory = new TestSyncClientFactory();
SyncClientPool syncClientPool = new
SyncClientPool(testSyncClientFactory);
Node node = TestUtils.getNode(0);
@@ -141,7 +149,6 @@ public class SyncClientPoolTest {
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
try {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(1);
- testSyncClientFactory = new TestSyncClientFactory();
SyncClientPool syncClientPool = new
SyncClientPool(testSyncClientFactory);
Node node = TestUtils.getNode(0);