This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new cbc7c3fb7f5 Revert "[To rel/1.1][IOTDB-6106] Fixed the timeout
parameter not working in thrift asyncClient
cbc7c3fb7f5 is described below
commit cbc7c3fb7f51669e842e299a354db836f76bd4b5
Author: Potato <[email protected]>
AuthorDate: Wed Aug 16 14:14:22 2023 +0800
Revert "[To rel/1.1][IOTDB-6106] Fixed the timeout parameter not working in
thrift asyncClient
---
.../iot/client/AsyncIoTConsensusServiceClient.java | 1 -
.../AsyncConfigNodeHeartbeatServiceClient.java | 1 -
.../async/AsyncConfigNodeIServiceClient.java | 1 -
.../async/AsyncDataNodeHeartbeatServiceClient.java | 1 -
.../async/AsyncDataNodeInternalServiceClient.java | 1 -
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 1 -
.../iotdb/commons/client/ClientManagerTest.java | 128 +--------------------
7 files changed, 4 insertions(+), 130 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index e5fe708ab9a..f84b5839247 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -56,7 +56,6 @@ public class AsyncIoTConsensusServiceClient extends
IoTConsensusIService.AsyncCl
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 9044c826d2d..86de8b6f02b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -51,7 +51,6 @@ public class AsyncConfigNodeHeartbeatServiceClient extends
IConfigNodeRPCService
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index eea02364f2e..91d64ab257b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -55,7 +55,6 @@ public class AsyncConfigNodeIServiceClient extends
IConfigNodeRPCService.AsyncCl
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index e074cc878e9..50c0540fd00 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -51,7 +51,6 @@ public class AsyncDataNodeHeartbeatServiceClient extends
IDataNodeRPCService.Asy
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index d38f4e49b30..46c6eec275e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -57,7 +57,6 @@ public class AsyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Asyn
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 87a2476ef17..7b056fc3504 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -56,7 +56,6 @@ public class AsyncDataNodeMPPDataExchangeServiceClient
extends MPPDataExchangeSe
protocolFactory,
tClientManager,
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(),
connectionTimeout));
- setTimeout(connectionTimeout);
this.endpoint = endpoint;
this.clientManager = clientManager;
}
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 431ad50ebe6..94f7454cef0 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import
org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
@@ -34,8 +33,6 @@ import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -43,41 +40,20 @@ import org.junit.Test;
import java.io.IOException;
import java.util.NoSuchElementException;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ClientManagerTest {
private final TEndPoint endPoint = new TEndPoint("localhost", 10730);
- private static final int CONNECTION_TIMEOUT = 5_000;
-
private MockInternalRPCService service;
- @SuppressWarnings("java:S2925")
@Before
- public void setUp() throws StartupException, TException {
+ public void setUp() throws StartupException {
service = new MockInternalRPCService(endPoint);
- IDataNodeRPCService.Iface processor =
mock(IDataNodeRPCService.Iface.class);
- // timeout method
- when(processor.clearCache())
- .thenAnswer(
- invocation -> {
- Thread.sleep(CONNECTION_TIMEOUT + 1000);
- return new TSStatus();
- });
- // normal method
- when(processor.merge())
- .thenAnswer(
- invocation -> {
- Thread.sleep(1000);
- return new TSStatus();
- });
- service.initSyncedServiceImpl(processor);
+ service.initSyncedServiceImpl(mock(IDataNodeRPCService.Iface.class));
service.start();
}
@@ -102,8 +78,6 @@ public class ClientManagerTest {
invalidSyncClientReturnTest();
invalidAsyncClientReturnTest();
borrowNullTest();
- syncClientTimeoutTest();
- asyncClientTimeoutTest();
}
public void normalSyncTest() throws Exception {
@@ -487,97 +461,6 @@ public class ClientManagerTest {
Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
}
- public void syncClientTimeoutTest() throws Exception {
- // init syncClientManager
- ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncClusterManager =
- (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
- new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
- .createClientManager(new
TestSyncDataNodeInternalServiceClientPoolFactory());
-
- // normal RPC
- try (SyncDataNodeInternalServiceClient syncClient =
syncClusterManager.borrowClient(endPoint)) {
- syncClient.merge();
- } catch (Exception e) {
- Assert.fail("There should be no timeout here");
- }
- Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
- Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
-
- // timeout RPC
- try (SyncDataNodeInternalServiceClient syncClient =
syncClusterManager.borrowClient(endPoint)) {
- syncClient.clearCache();
- Assert.fail("A timeout exception should occur here");
- } catch (Exception ignored) {
- // no handling
- }
- Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
- Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
-
- syncClusterManager.close();
- }
-
- public void asyncClientTimeoutTest() throws Exception {
- // init asyncClientManager
- ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncClusterManager =
- (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
- new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
- .createClientManager(new
TestAsyncDataNodeInternalServiceClientPoolFactory());
-
- // normal RPC
- AsyncDataNodeInternalServiceClient asyncClient =
asyncClusterManager.borrowClient(endPoint);
- CountDownLatch latch = new CountDownLatch(1);
- AtomicBoolean failed = new AtomicBoolean(false);
- CountDownLatch finalLatch = latch;
- AtomicBoolean finalFailed = failed;
- asyncClient.merge(
- new AsyncMethodCallback<TSStatus>() {
- @Override
- public void onComplete(TSStatus response) {
- finalLatch.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- finalFailed.set(true);
- finalLatch.countDown();
- }
- });
- latch.await();
- if (failed.get()) {
- Assert.fail("There should be no timeout here");
- }
- Assert.assertEquals(0,
asyncClusterManager.getPool().getNumActive(endPoint));
- Assert.assertEquals(1, asyncClusterManager.getPool().getNumIdle(endPoint));
-
- // timeout RPC
- asyncClient = asyncClusterManager.borrowClient(endPoint);
- latch = new CountDownLatch(1);
- failed = new AtomicBoolean(false);
- AtomicBoolean finalFailed1 = failed;
- CountDownLatch finalLatch1 = latch;
- asyncClient.clearCache(
- new AsyncMethodCallback<TSStatus>() {
- @Override
- public void onComplete(TSStatus response) {
- finalFailed1.set(true);
- finalLatch1.countDown();
- }
-
- @Override
- public void onError(Exception exception) {
- finalLatch1.countDown();
- }
- });
- latch.await();
- if (failed.get()) {
- Assert.fail("A timeout exception should occur here");
- }
- Assert.assertEquals(0,
asyncClusterManager.getPool().getNumActive(endPoint));
- Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
-
- asyncClusterManager.close();
- }
-
public static class TestSyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
SyncDataNodeInternalServiceClient> {
@@ -586,10 +469,7 @@ public class ClientManagerTest {
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
- manager,
- new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(CONNECTION_TIMEOUT)
- .build()),
+ manager, new ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
}
}
@@ -603,7 +483,7 @@ public class ClientManagerTest {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
- new
ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(),
+ new ThriftClientProperty.Builder().build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
}