This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0af1aede62 Add SyncThriftClientErrorHandler for proxy to clean the 
code (#5710)
0af1aede62 is described below

commit 0af1aede628f5b986d0fb21c2dedec0a45474e12
Author: JiaXin Zhang <[email protected]>
AuthorDate: Mon May 2 20:58:57 2022 +0800

    Add SyncThriftClientErrorHandler for proxy to clean the code (#5710)
---
 .../iotdb/confignode/client/AsyncClientPool.java   |  8 ---
 node-commons/pom.xml                               |  6 ++
 .../apache/iotdb/commons/client/ClientManager.java |  4 +-
 .../client/sync/SyncConfigNodeIServiceClient.java  | 20 +++++--
 .../sync/SyncDataNodeDataBlockServiceClient.java   | 20 +++++--
 .../sync/SyncDataNodeInternalServiceClient.java    | 17 ++++--
 .../commons/client/sync/SyncThriftClient.java      | 25 +++++++++
 .../sync/SyncThriftClientWithErrorHandler.java     | 64 ++++++++++++++++++++++
 .../apache/iotdb/commons/ClientManagerTest.java    | 50 ++++++++++-------
 .../db/mpp/execution/datatransfer/SinkHandle.java  | 42 ++------------
 .../mpp/execution/datatransfer/SourceHandle.java   | 43 ++-------------
 .../scheduler/AbstractFragInsStateTracker.java     | 21 ++-----
 .../scheduler/SimpleFragInstanceDispatcher.java    | 25 ++-------
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 16 +-----
 14 files changed, 192 insertions(+), 169 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
index 017b61e6c5..89384d4da8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
@@ -54,10 +54,6 @@ public class AsyncClientPool {
     AsyncDataNodeInternalServiceClient client;
     try {
       client = clientManager.borrowClient(endPoint);
-      if (client == null) {
-        LOGGER.error("Can't get client for DataNode {}", endPoint);
-        return;
-      }
       client.createSchemaRegion(req, handler);
     } catch (IOException e) {
       LOGGER.error("Can't connect to DataNode {}", endPoint, e);
@@ -76,10 +72,6 @@ public class AsyncClientPool {
     AsyncDataNodeInternalServiceClient client;
     try {
       client = clientManager.borrowClient(endPoint);
-      if (client == null) {
-        LOGGER.error("Can't get client for DataNode {}", endPoint);
-        return;
-      }
       client.createDataRegion(req, handler);
     } catch (IOException e) {
       LOGGER.error("Can't connect to DataNode {}", endPoint, e);
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 780a533cd6..1043fce93c 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -34,8 +34,14 @@
         <commons.test.skip>false</commons.test.skip>
         <commons.it.skip>${commons.test.skip}</commons.it.skip>
         <commons.ut.skip>${commons.test.skip}</commons.ut.skip>
+        <cglib.version>3.1</cglib.version>
     </properties>
     <dependencies>
+        <dependency>
+            <groupId>cglib</groupId>
+            <artifactId>cglib-nodep</artifactId>
+            <version>${cglib.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 54a4d124de..70bd2edb29 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -56,7 +56,9 @@ public class ClientManager<K, V> implements IClientManager<K, 
V> {
       throw e;
     } catch (Exception e) {
       // external doesn't care of other exceptions
-      logger.error(String.format("Borrow client from pool for node %s 
failed.", node), e);
+      String errorMessage = String.format("Borrow client from pool for node %s 
failed.", node);
+      logger.error(errorMessage, e);
+      throw new IOException(errorMessage, e);
     }
     return client;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 6966241f57..f57becd12c 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -34,9 +34,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
+public class SyncConfigNodeIServiceClient extends ConfigIService.Client
+    implements SyncThriftClient, AutoCloseable {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> 
clientManager;
@@ -60,7 +62,7 @@ public class SyncConfigNodeIServiceClient extends 
ConfigIService.Client {
     getInputProtocol().getTransport().open();
   }
 
-  public void returnSelf() {
+  public void close() {
     if (clientManager != null) {
       clientManager.returnClient(endpoint, this);
     }
@@ -71,7 +73,7 @@ public class SyncConfigNodeIServiceClient extends 
ConfigIService.Client {
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
-  public void close() {
+  public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
@@ -95,14 +97,22 @@ public class SyncConfigNodeIServiceClient extends 
ConfigIService.Client {
     @Override
     public void destroyObject(
         TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> 
pooledObject) {
-      pooledObject.getObject().close();
+      pooledObject.getObject().invalidate();
     }
 
     @Override
     public PooledObject<SyncConfigNodeIServiceClient> makeObject(TEndPoint 
endpoint)
         throws Exception {
+      Constructor<SyncConfigNodeIServiceClient> constructor =
+          SyncConfigNodeIServiceClient.class.getConstructor(
+              clientFactoryProperty.getProtocolFactory().getClass(),
+              int.class,
+              endpoint.getClass(),
+              clientManager.getClass());
       return new DefaultPooledObject<>(
-          new SyncConfigNodeIServiceClient(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              SyncConfigNodeIServiceClient.class,
+              constructor,
               clientFactoryProperty.getProtocolFactory(),
               clientFactoryProperty.getConnectionTimeoutMs(),
               endpoint,
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 913fc4324f..6152614684 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -34,9 +34,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncDataNodeDataBlockServiceClient extends 
DataBlockService.Client {
+public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
+    implements SyncThriftClient, AutoCloseable {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> 
clientManager;
@@ -60,7 +62,7 @@ public class SyncDataNodeDataBlockServiceClient extends 
DataBlockService.Client
     getInputProtocol().getTransport().open();
   }
 
-  public void returnSelf() {
+  public void close() {
     if (clientManager != null) {
       clientManager.returnClient(endpoint, this);
     }
@@ -71,7 +73,7 @@ public class SyncDataNodeDataBlockServiceClient extends 
DataBlockService.Client
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
-  public void close() {
+  public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
@@ -96,14 +98,22 @@ public class SyncDataNodeDataBlockServiceClient extends 
DataBlockService.Client
     @Override
     public void destroyObject(
         TEndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> 
pooledObject) {
-      pooledObject.getObject().close();
+      pooledObject.getObject().invalidate();
     }
 
     @Override
     public PooledObject<SyncDataNodeDataBlockServiceClient> 
makeObject(TEndPoint endpoint)
         throws Exception {
+      Constructor<SyncDataNodeDataBlockServiceClient> constructor =
+          SyncDataNodeDataBlockServiceClient.class.getConstructor(
+              clientFactoryProperty.getProtocolFactory().getClass(),
+              int.class,
+              endpoint.getClass(),
+              clientManager.getClass());
       return new DefaultPooledObject<>(
-          new SyncDataNodeDataBlockServiceClient(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              SyncDataNodeDataBlockServiceClient.class,
+              constructor,
               clientFactoryProperty.getProtocolFactory(),
               clientFactoryProperty.getConnectionTimeoutMs(),
               endpoint,
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 843efd4396..23d2c56f43 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -35,9 +35,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncDataNodeInternalServiceClient extends InternalService.Client {
+public class SyncDataNodeInternalServiceClient extends InternalService.Client
+    implements SyncThriftClient, AutoCloseable {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager;
@@ -71,7 +73,7 @@ public class SyncDataNodeInternalServiceClient extends 
InternalService.Client {
     return clientManager;
   }
 
-  public void returnSelf() {
+  public void close() {
     if (clientManager != null) {
       clientManager.returnClient(endpoint, this);
     }
@@ -82,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends 
InternalService.Client {
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
-  public void close() {
+  public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
@@ -107,14 +109,19 @@ public class SyncDataNodeInternalServiceClient extends 
InternalService.Client {
     @Override
     public void destroyObject(
         TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> 
pooledObject) {
-      pooledObject.getObject().close();
+      pooledObject.getObject().invalidate();
     }
 
     @Override
     public PooledObject<SyncDataNodeInternalServiceClient> 
makeObject(TEndPoint endpoint)
         throws Exception {
+      Constructor<SyncDataNodeInternalServiceClient> constructor =
+          SyncDataNodeInternalServiceClient.class.getConstructor(
+              TProtocolFactory.class, int.class, endpoint.getClass(), 
clientManager.getClass());
       return new DefaultPooledObject<>(
-          new SyncDataNodeInternalServiceClient(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              SyncDataNodeInternalServiceClient.class,
+              constructor,
               clientFactoryProperty.getProtocolFactory(),
               clientFactoryProperty.getConnectionTimeoutMs(),
               endpoint,
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
new file mode 100644
index 0000000000..38eaa252b2
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.client.sync;
+
+public interface SyncThriftClient {
+
+  /** close the connection */
+  void invalidate();
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
new file mode 100644
index 0000000000..2792a2ba67
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.client.sync;
+
+import net.sf.cglib.proxy.Enhancer;
+import net.sf.cglib.proxy.MethodInterceptor;
+import net.sf.cglib.proxy.MethodProxy;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SyncThriftClientWithErrorHandler.class);
+
+  public static <V extends SyncThriftClient> V newErrorHandler(
+      Class<V> targetClass, Constructor<V> constructor, Object... args) {
+    Enhancer enhancer = new Enhancer();
+    enhancer.setSuperclass(targetClass);
+    enhancer.setCallback(new SyncThriftClientWithErrorHandler());
+    if (constructor == null) {
+      return (V) enhancer.create();
+    }
+    return (V) enhancer.create(constructor.getParameterTypes(), args);
+  }
+
+  @Override
+  public Object intercept(Object o, Method method, Object[] objects, 
MethodProxy methodProxy)
+      throws Throwable {
+    try {
+      return methodProxy.invokeSuper(o, objects);
+    } catch (InvocationTargetException e) {
+      if (e.getTargetException() instanceof TException) {
+        LOGGER.error(
+            "Error in calling method {}, err: {}", method.getName(), 
e.getTargetException());
+        ((SyncThriftClient) o).invalidate();
+      }
+      throw new TException("Error in calling method " + method.getName(), 
e.getTargetException());
+    } catch (Exception e) {
+      throw new TException("Error in calling method " + method.getName(), e);
+    }
+  }
+}
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java 
b/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
index 42f8210665..44a4e3f777 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
@@ -84,12 +84,12 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // return one sync client
-    syncClient1.returnSelf();
+    syncClient1.close();
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // return another sync client
-    syncClient2.returnSelf();
+    syncClient2.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(2, syncClusterManager.getPool().getNumIdle(endPoint));
 
@@ -186,12 +186,12 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // return one sync client
-    syncClient1.returnSelf();
+    syncClient1.close();
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // return another sync client, clientManager should destroy this client
-    syncClient2.returnSelf();
+    syncClient2.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
@@ -236,15 +236,21 @@ public class ClientManagerTest {
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
-    // get another sync client, should throw error and return null
-    long start = System.nanoTime();
-    SyncDataNodeInternalServiceClient syncClient2 = 
syncClusterManager.borrowClient(endPoint);
-    long end = System.nanoTime();
-    Assert.assertTrue(end - start >= DefaultProperty.WAIT_CLIENT_TIMEOUT_MS * 
1_000_000);
+    // get another sync client, should wait waitClientTimeoutMS ms, throw error
+    SyncDataNodeInternalServiceClient syncClient2 = null;
+    long start = 0, end;
+    try {
+      start = System.nanoTime();
+      syncClient2 = syncClusterManager.borrowClient(endPoint);
+    } catch (IOException e) {
+      end = System.nanoTime();
+      Assert.assertTrue(end - start >= DefaultProperty.WAIT_CLIENT_TIMEOUT_MS 
* 1_000_000);
+      Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for 
node"));
+    }
     Assert.assertNull(syncClient2);
 
     // return one sync client
-    syncClient1.returnSelf();
+    syncClient1.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
@@ -255,7 +261,7 @@ public class ClientManagerTest {
     Assert.assertEquals(syncClient1, syncClient2);
 
     // return the only client
-    syncClient2.returnSelf();
+    syncClient2.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
@@ -303,15 +309,19 @@ public class ClientManagerTest {
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
-    // get another sync client, should wait waitClientTimeoutMS ms, throw 
error and return null
-    long start = System.nanoTime();
-    SyncDataNodeInternalServiceClient syncClient2 = 
syncClusterManager.borrowClient(endPoint);
-    long end = System.nanoTime();
-    Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
-    Assert.assertNull(syncClient2);
+    // get another sync client, should wait waitClientTimeoutMS ms, throw error
+    long start = 0, end;
+    try {
+      start = System.nanoTime();
+      syncClusterManager.borrowClient(endPoint);
+    } catch (IOException e) {
+      end = System.nanoTime();
+      Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
+      Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for 
node"));
+    }
 
     // return one sync client
-    syncClient1.returnSelf();
+    syncClient1.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
@@ -349,13 +359,13 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // return one sync client
-    syncClient1.returnSelf();
+    syncClient1.close();
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // invalid another sync client and return
     syncClient2.getInputProtocol().getTransport().close();
-    syncClient2.returnSelf();
+    syncClient2.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
index 9114472522..c06a3f1980 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
@@ -169,17 +169,9 @@ public class SinkHandle implements ISinkHandle {
             nextSequenceId - 1);
     while (attempt < MAX_ATTEMPT_TIMES) {
       attempt += 1;
-      SyncDataNodeDataBlockServiceClient client = null;
-      try {
-        client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
-        if (client == null) {
-          logger.warn("can't get client for node {}", remoteEndpoint);
-          if (attempt == MAX_ATTEMPT_TIMES) {
-            throw new TException("Can't get client for node " + 
remoteEndpoint);
-          }
-        } else {
-          client.onEndOfDataBlockEvent(endOfDataBlockEvent);
-        }
+      try (SyncDataNodeDataBlockServiceClient client =
+          dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+        client.onEndOfDataBlockEvent(endOfDataBlockEvent);
         break;
       } catch (TException e) {
         logger.error(
@@ -189,9 +181,6 @@ public class SinkHandle implements ISinkHandle {
             e.getMessage(),
             attempt,
             e);
-        if (client != null) {
-          client.close();
-        }
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
@@ -200,10 +189,6 @@ public class SinkHandle implements ISinkHandle {
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
-      } finally {
-        if (client != null) {
-          client.returnSelf();
-        }
       }
     }
   }
@@ -364,22 +349,11 @@ public class SinkHandle implements ISinkHandle {
               blockSizes);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
-        SyncDataNodeDataBlockServiceClient client = null;
-        try {
-          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
-          if (client == null) {
-            logger.warn("can't get client for node {}", remoteEndpoint);
-            if (attempt == MAX_ATTEMPT_TIMES) {
-              throw new TException("Can't get client for node " + 
remoteEndpoint);
-            }
-          } else {
-            client.onNewDataBlockEvent(newDataBlockEvent);
-          }
+        try (SyncDataNodeDataBlockServiceClient client =
+            dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+          client.onNewDataBlockEvent(newDataBlockEvent);
           break;
         } catch (Throwable e) {
-          if (e instanceof TException && client != null) {
-            client.close();
-          }
           logger.error(
               "Failed to send new data block event to plan node {} of {} due 
to {}, attempt times: {}",
               remotePlanNodeId,
@@ -390,10 +364,6 @@ public class SinkHandle implements ISinkHandle {
           if (attempt == MAX_ATTEMPT_TIMES) {
             sinkHandleListener.onFailure(SinkHandle.this, e);
           }
-        } finally {
-          if (client != null) {
-            client.returnSelf();
-          }
         }
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 3c346f75b3..1c8bf70880 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -313,16 +312,8 @@ public class SourceHandle implements ISourceHandle {
       int attempt = 0;
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
-        SyncDataNodeDataBlockServiceClient client = null;
-        try {
-          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
-          if (client == null) {
-            logger.warn("can't get client for node {}", remoteEndpoint);
-            if (attempt == MAX_ATTEMPT_TIMES) {
-              throw new TException("Can't get client for node " + 
remoteEndpoint);
-            }
-            continue;
-          }
+        try (SyncDataNodeDataBlockServiceClient client =
+            dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
           TGetDataBlockResponse resp = client.getDataBlock(req);
           List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
           for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
@@ -344,9 +335,6 @@ public class SourceHandle implements ISourceHandle {
               new SendAcknowledgeDataBlockEventTask(startSequenceId, 
endSequenceId));
           break;
         } catch (Throwable e) {
-          if (e instanceof TException && client != null) {
-            client.close();
-          }
           logger.error(
               "Failed to get data block from {} due to {}, attempt times: {}",
               remoteFragmentInstanceId,
@@ -361,10 +349,6 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
-        } finally {
-          if (client != null) {
-            client.returnSelf();
-          }
         }
       }
     }
@@ -392,22 +376,11 @@ public class SourceHandle implements ISourceHandle {
           new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, 
startSequenceId, endSequenceId);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
-        SyncDataNodeDataBlockServiceClient client = null;
-        try {
-          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
-          if (client == null) {
-            logger.warn("can't get client for node {}", remoteEndpoint);
-            if (attempt == MAX_ATTEMPT_TIMES) {
-              throw new TException("Can't get client for node " + 
remoteEndpoint);
-            }
-          } else {
-            client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
-            break;
-          }
+        try (SyncDataNodeDataBlockServiceClient client =
+            dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+          client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
+          break;
         } catch (Throwable e) {
-          if (e instanceof TException && client != null) {
-            client.close();
-          }
           logger.error(
               "Failed to send ack data block event [{}, {}) to {} due to {}, 
attempt times: {}",
               startSequenceId,
@@ -420,10 +393,6 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
-        } finally {
-          if (client != null) {
-            client.returnSelf();
-          }
         }
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 2d5b0d7bc7..424b58eee1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -65,26 +65,13 @@ public abstract class AbstractFragInsStateTracker 
implements IFragInstanceStateT
 
   protected FragmentInstanceState fetchState(FragmentInstance instance)
       throws TException, IOException {
-    SyncDataNodeInternalServiceClient client = null;
-    try {
-      // TODO: (jackie tien) change the port
-      TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
-      client = internalServiceClientManager.borrowClient(endPoint);
-      if (client == null) {
-        throw new TException("Can't get client for node " + endPoint);
-      }
+    // TODO: (jackie tien) change the port
+    TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
       TFragmentInstanceStateResp resp =
           client.fetchFragmentInstanceState(new 
TFetchFragmentInstanceStateReq(getTId(instance)));
       return FragmentInstanceState.valueOf(resp.state);
-    } catch (Throwable t) {
-      if (t instanceof TException && client != null) {
-        client.close();
-      }
-      throw t;
-    } finally {
-      if (client != null) {
-        client.returnSelf();
-      }
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 6d275a74ef..fec10957bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,14 +57,10 @@ public class SimpleFragInstanceDispatcher implements 
IFragInstanceDispatcher {
         () -> {
           TSendFragmentInstanceResp resp = new 
TSendFragmentInstanceResp(false);
           for (FragmentInstance instance : instances) {
-            SyncDataNodeInternalServiceClient client = null;
             TEndPoint endPoint = 
instance.getHostDataNode().getInternalEndPoint();
-            try {
-              // TODO: (jackie tien) change the port
-              client = internalServiceClientManager.borrowClient(endPoint);
-              if (client == null) {
-                throw new TException("Can't get client for node " + endPoint);
-              }
+            // TODO: (jackie tien) change the port
+            try (SyncDataNodeInternalServiceClient client =
+                internalServiceClientManager.borrowClient(endPoint)) {
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
@@ -78,20 +73,8 @@ public class SimpleFragInstanceDispatcher implements 
IFragInstanceDispatcher {
             } catch (IOException e) {
               LOGGER.error("can't connect to node {}", endPoint, e);
               throw e;
-            } catch (TException e) {
-              LOGGER.error("sendFragmentInstance failed for node {}", 
endPoint, e);
-              if (client != null) {
-                client.close();
-              }
-              throw e;
-            } catch (Exception e) {
-              LOGGER.error("unexpected exception", e);
-              throw e;
-            } finally {
-              if (client != null) {
-                client.returnSelf();
-              }
             }
+
             if (!resp.accepted) {
               break;
             }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 8b3d718959..df6f08f7c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -64,26 +64,14 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
         () -> {
           for (TEndPoint endPoint : relatedHost) {
             // TODO (jackie tien) change the port
-            SyncDataNodeInternalServiceClient client = null;
-            try {
-              client = internalServiceClientManager.borrowClient(endPoint);
-              if (client == null) {
-                throw new TException("Can't get client for node " + endPoint);
-              }
+            try (SyncDataNodeInternalServiceClient client =
+                internalServiceClientManager.borrowClient(endPoint)) {
               client.cancelQuery(new TCancelQueryReq(queryId.getId()));
             } catch (IOException e) {
               LOGGER.error("can't connect to node {}", endPoint, e);
               return false;
             } catch (TException e) {
-              LOGGER.error("cancelQuery failed for node {}", endPoint, e);
-              if (client != null) {
-                client.close();
-              }
               return false;
-            } finally {
-              if (client != null) {
-                client.returnSelf();
-              }
             }
           }
           return true;

Reply via email to