This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch revert_thrift_timeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1485bb045543817d89d4e7e83745c7230bbba02c Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Aug 16 14:10:49 2023 +0800 Revert "[To rel/1.1][IOTDB-6106] Fixed the timeout parameter not working in thrift asyncClient (#10828)" This reverts commit 1afc93323562344e9356142eb89bc8800b7dd7b2. --- .../iot/client/AsyncIoTConsensusServiceClient.java | 1 - .../AsyncConfigNodeHeartbeatServiceClient.java | 1 - .../async/AsyncConfigNodeIServiceClient.java | 1 - .../async/AsyncDataNodeHeartbeatServiceClient.java | 1 - .../async/AsyncDataNodeInternalServiceClient.java | 1 - .../AsyncDataNodeMPPDataExchangeServiceClient.java | 1 - .../iotdb/commons/client/ClientManagerTest.java | 128 +-------------------- 7 files changed, 4 insertions(+), 130 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java index e5fe708ab9a..f84b5839247 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java @@ -56,7 +56,6 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java index 9044c826d2d..86de8b6f02b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java @@ -51,7 +51,6 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java index eea02364f2e..91d64ab257b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java @@ -55,7 +55,6 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java index e074cc878e9..50c0540fd00 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java @@ -51,7 +51,6 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index d38f4e49b30..46c6eec275e 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -57,7 +57,6 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java index 87a2476ef17..7b056fc3504 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java @@ -56,7 +56,6 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe protocolFactory, tClientManager, TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); - setTimeout(connectionTimeout); this.endpoint = endpoint; this.clientManager = clientManager; } diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 431ad50ebe6..94f7454cef0 100644 --- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -20,7 +20,6 @@ package org.apache.iotdb.commons.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException; import org.apache.iotdb.commons.client.exception.ClientManagerException; @@ -34,8 +33,6 @@ import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService; import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -43,41 +40,20 @@ import org.junit.Test; import java.io.IOException; import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class ClientManagerTest { private final TEndPoint endPoint = new TEndPoint("localhost", 10730); - private static final int CONNECTION_TIMEOUT = 5_000; - private MockInternalRPCService service; - @SuppressWarnings("java:S2925") @Before - public void setUp() throws StartupException, TException { + public void setUp() throws StartupException { service = new MockInternalRPCService(endPoint); - IDataNodeRPCService.Iface processor = mock(IDataNodeRPCService.Iface.class); - // timeout method - when(processor.clearCache()) - .thenAnswer( - invocation -> { - Thread.sleep(CONNECTION_TIMEOUT + 1000); - return new TSStatus(); - }); - // normal method - when(processor.merge()) - .thenAnswer( - invocation -> { - Thread.sleep(1000); - return new TSStatus(); - }); - service.initSyncedServiceImpl(processor); + service.initSyncedServiceImpl(mock(IDataNodeRPCService.Iface.class)); service.start(); } @@ -102,8 +78,6 @@ public class ClientManagerTest { invalidSyncClientReturnTest(); invalidAsyncClientReturnTest(); borrowNullTest(); - syncClientTimeoutTest(); - asyncClientTimeoutTest(); } public void normalSyncTest() throws Exception { @@ -487,97 +461,6 @@ public class ClientManagerTest { Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint)); } - public void syncClientTimeoutTest() throws Exception { - // init syncClientManager - ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager = - (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>) - new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() - .createClientManager(new TestSyncDataNodeInternalServiceClientPoolFactory()); - - // normal RPC - try (SyncDataNodeInternalServiceClient syncClient = syncClusterManager.borrowClient(endPoint)) { - syncClient.merge(); - } catch (Exception e) { - Assert.fail("There should be no timeout here"); - } - Assert.assertEquals(0, syncClusterManager.getPool().getNumActive(endPoint)); - Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint)); - - // timeout RPC - try (SyncDataNodeInternalServiceClient syncClient = syncClusterManager.borrowClient(endPoint)) { - syncClient.clearCache(); - Assert.fail("A timeout exception should occur here"); - } catch (Exception ignored) { - // no handling - } - Assert.assertEquals(0, syncClusterManager.getPool().getNumActive(endPoint)); - Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint)); - - syncClusterManager.close(); - } - - public void asyncClientTimeoutTest() throws Exception { - // init asyncClientManager - ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager = - (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>) - new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>() - .createClientManager(new TestAsyncDataNodeInternalServiceClientPoolFactory()); - - // normal RPC - AsyncDataNodeInternalServiceClient asyncClient = asyncClusterManager.borrowClient(endPoint); - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean failed = new AtomicBoolean(false); - CountDownLatch finalLatch = latch; - AtomicBoolean finalFailed = failed; - asyncClient.merge( - new AsyncMethodCallback<TSStatus>() { - @Override - public void onComplete(TSStatus response) { - finalLatch.countDown(); - } - - @Override - public void onError(Exception exception) { - finalFailed.set(true); - finalLatch.countDown(); - } - }); - latch.await(); - if (failed.get()) { - Assert.fail("There should be no timeout here"); - } - Assert.assertEquals(0, asyncClusterManager.getPool().getNumActive(endPoint)); - Assert.assertEquals(1, asyncClusterManager.getPool().getNumIdle(endPoint)); - - // timeout RPC - asyncClient = asyncClusterManager.borrowClient(endPoint); - latch = new CountDownLatch(1); - failed = new AtomicBoolean(false); - AtomicBoolean finalFailed1 = failed; - CountDownLatch finalLatch1 = latch; - asyncClient.clearCache( - new AsyncMethodCallback<TSStatus>() { - @Override - public void onComplete(TSStatus response) { - finalFailed1.set(true); - finalLatch1.countDown(); - } - - @Override - public void onError(Exception exception) { - finalLatch1.countDown(); - } - }); - latch.await(); - if (failed.get()) { - Assert.fail("A timeout exception should occur here"); - } - Assert.assertEquals(0, asyncClusterManager.getPool().getNumActive(endPoint)); - Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint)); - - asyncClusterManager.close(); - } - public static class TestSyncDataNodeInternalServiceClientPoolFactory implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> { @@ -586,10 +469,7 @@ public class ClientManagerTest { ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( new SyncDataNodeInternalServiceClient.Factory( - manager, - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(CONNECTION_TIMEOUT) - .build()), + manager, new ThriftClientProperty.Builder().build()), new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig()); } } @@ -603,7 +483,7 @@ public class ClientManagerTest { return new GenericKeyedObjectPool<>( new AsyncDataNodeInternalServiceClient.Factory( manager, - new ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(), + new ThriftClientProperty.Builder().build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig()); }
