This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch revert_thrift_timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1485bb045543817d89d4e7e83745c7230bbba02c
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Aug 16 14:10:49 2023 +0800

    Revert "[To rel/1.1][IOTDB-6106] Fixed the timeout parameter not working in 
thrift asyncClient  (#10828)"
    
    This reverts commit 1afc93323562344e9356142eb89bc8800b7dd7b2.
---
 .../iot/client/AsyncIoTConsensusServiceClient.java |   1 -
 .../AsyncConfigNodeHeartbeatServiceClient.java     |   1 -
 .../async/AsyncConfigNodeIServiceClient.java       |   1 -
 .../async/AsyncDataNodeHeartbeatServiceClient.java |   1 -
 .../async/AsyncDataNodeInternalServiceClient.java  |   1 -
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |   1 -
 .../iotdb/commons/client/ClientManagerTest.java    | 128 +--------------------
 7 files changed, 4 insertions(+), 130 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index e5fe708ab9a..f84b5839247 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -56,7 +56,6 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 9044c826d2d..86de8b6f02b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -51,7 +51,6 @@ public class AsyncConfigNodeHeartbeatServiceClient extends 
IConfigNodeRPCService
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index eea02364f2e..91d64ab257b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -55,7 +55,6 @@ public class AsyncConfigNodeIServiceClient extends 
IConfigNodeRPCService.AsyncCl
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index e074cc878e9..50c0540fd00 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -51,7 +51,6 @@ public class AsyncDataNodeHeartbeatServiceClient extends 
IDataNodeRPCService.Asy
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index d38f4e49b30..46c6eec275e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -57,7 +57,6 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 87a2476ef17..7b056fc3504 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -56,7 +56,6 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
         protocolFactory,
         tClientManager,
         TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
-    setTimeout(connectionTimeout);
     this.endpoint = endpoint;
     this.clientManager = clientManager;
   }
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 431ad50ebe6..94f7454cef0 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.commons.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import 
org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
@@ -34,8 +33,6 @@ import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,41 +40,20 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class ClientManagerTest {
 
   private final TEndPoint endPoint = new TEndPoint("localhost", 10730);
 
-  private static final int CONNECTION_TIMEOUT = 5_000;
-
   private MockInternalRPCService service;
 
-  @SuppressWarnings("java:S2925")
   @Before
-  public void setUp() throws StartupException, TException {
+  public void setUp() throws StartupException {
     service = new MockInternalRPCService(endPoint);
-    IDataNodeRPCService.Iface processor = 
mock(IDataNodeRPCService.Iface.class);
-    // timeout method
-    when(processor.clearCache())
-        .thenAnswer(
-            invocation -> {
-              Thread.sleep(CONNECTION_TIMEOUT + 1000);
-              return new TSStatus();
-            });
-    // normal method
-    when(processor.merge())
-        .thenAnswer(
-            invocation -> {
-              Thread.sleep(1000);
-              return new TSStatus();
-            });
-    service.initSyncedServiceImpl(processor);
+    service.initSyncedServiceImpl(mock(IDataNodeRPCService.Iface.class));
     service.start();
   }
 
@@ -102,8 +78,6 @@ public class ClientManagerTest {
     invalidSyncClientReturnTest();
     invalidAsyncClientReturnTest();
     borrowNullTest();
-    syncClientTimeoutTest();
-    asyncClientTimeoutTest();
   }
 
   public void normalSyncTest() throws Exception {
@@ -487,97 +461,6 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
-  public void syncClientTimeoutTest() throws Exception {
-    // init syncClientManager
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
-                .createClientManager(new 
TestSyncDataNodeInternalServiceClientPoolFactory());
-
-    // normal RPC
-    try (SyncDataNodeInternalServiceClient syncClient = 
syncClusterManager.borrowClient(endPoint)) {
-      syncClient.merge();
-    } catch (Exception e) {
-      Assert.fail("There should be no timeout here");
-    }
-    Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
-    Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
-
-    // timeout RPC
-    try (SyncDataNodeInternalServiceClient syncClient = 
syncClusterManager.borrowClient(endPoint)) {
-      syncClient.clearCache();
-      Assert.fail("A timeout exception should occur here");
-    } catch (Exception ignored) {
-      // no handling
-    }
-    Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
-    Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
-
-    syncClusterManager.close();
-  }
-
-  public void asyncClientTimeoutTest() throws Exception {
-    // init asyncClientManager
-    ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> 
asyncClusterManager =
-        (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, 
AsyncDataNodeInternalServiceClient>()
-                .createClientManager(new 
TestAsyncDataNodeInternalServiceClientPoolFactory());
-
-    // normal RPC
-    AsyncDataNodeInternalServiceClient asyncClient = 
asyncClusterManager.borrowClient(endPoint);
-    CountDownLatch latch = new CountDownLatch(1);
-    AtomicBoolean failed = new AtomicBoolean(false);
-    CountDownLatch finalLatch = latch;
-    AtomicBoolean finalFailed = failed;
-    asyncClient.merge(
-        new AsyncMethodCallback<TSStatus>() {
-          @Override
-          public void onComplete(TSStatus response) {
-            finalLatch.countDown();
-          }
-
-          @Override
-          public void onError(Exception exception) {
-            finalFailed.set(true);
-            finalLatch.countDown();
-          }
-        });
-    latch.await();
-    if (failed.get()) {
-      Assert.fail("There should be no timeout here");
-    }
-    Assert.assertEquals(0, 
asyncClusterManager.getPool().getNumActive(endPoint));
-    Assert.assertEquals(1, asyncClusterManager.getPool().getNumIdle(endPoint));
-
-    // timeout RPC
-    asyncClient = asyncClusterManager.borrowClient(endPoint);
-    latch = new CountDownLatch(1);
-    failed = new AtomicBoolean(false);
-    AtomicBoolean finalFailed1 = failed;
-    CountDownLatch finalLatch1 = latch;
-    asyncClient.clearCache(
-        new AsyncMethodCallback<TSStatus>() {
-          @Override
-          public void onComplete(TSStatus response) {
-            finalFailed1.set(true);
-            finalLatch1.countDown();
-          }
-
-          @Override
-          public void onError(Exception exception) {
-            finalLatch1.countDown();
-          }
-        });
-    latch.await();
-    if (failed.get()) {
-      Assert.fail("A timeout exception should occur here");
-    }
-    Assert.assertEquals(0, 
asyncClusterManager.getPool().getNumActive(endPoint));
-    Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
-
-    asyncClusterManager.close();
-  }
-
   public static class TestSyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, 
SyncDataNodeInternalServiceClient> {
 
@@ -586,10 +469,7 @@ public class ClientManagerTest {
         ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new SyncDataNodeInternalServiceClient.Factory(
-              manager,
-              new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs(CONNECTION_TIMEOUT)
-                  .build()),
+              manager, new ThriftClientProperty.Builder().build()),
           new 
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
@@ -603,7 +483,7 @@ public class ClientManagerTest {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
               manager,
-              new 
ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(),
+              new ThriftClientProperty.Builder().build(),
               ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
           new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
     }

Reply via email to