This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0af1aede62 Add SyncThriftClientErrorHandler for proxy to clean the
code (#5710)
0af1aede62 is described below
commit 0af1aede628f5b986d0fb21c2dedec0a45474e12
Author: JiaXin Zhang <[email protected]>
AuthorDate: Mon May 2 20:58:57 2022 +0800
Add SyncThriftClientErrorHandler for proxy to clean the code (#5710)
---
.../iotdb/confignode/client/AsyncClientPool.java | 8 ---
node-commons/pom.xml | 6 ++
.../apache/iotdb/commons/client/ClientManager.java | 4 +-
.../client/sync/SyncConfigNodeIServiceClient.java | 20 +++++--
.../sync/SyncDataNodeDataBlockServiceClient.java | 20 +++++--
.../sync/SyncDataNodeInternalServiceClient.java | 17 ++++--
.../commons/client/sync/SyncThriftClient.java | 25 +++++++++
.../sync/SyncThriftClientWithErrorHandler.java | 64 ++++++++++++++++++++++
.../apache/iotdb/commons/ClientManagerTest.java | 50 ++++++++++-------
.../db/mpp/execution/datatransfer/SinkHandle.java | 42 ++------------
.../mpp/execution/datatransfer/SourceHandle.java | 43 ++-------------
.../scheduler/AbstractFragInsStateTracker.java | 21 ++-----
.../scheduler/SimpleFragInstanceDispatcher.java | 25 ++-------
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 16 +-----
14 files changed, 192 insertions(+), 169 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
index 017b61e6c5..89384d4da8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncClientPool.java
@@ -54,10 +54,6 @@ public class AsyncClientPool {
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
- if (client == null) {
- LOGGER.error("Can't get client for DataNode {}", endPoint);
- return;
- }
client.createSchemaRegion(req, handler);
} catch (IOException e) {
LOGGER.error("Can't connect to DataNode {}", endPoint, e);
@@ -76,10 +72,6 @@ public class AsyncClientPool {
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(endPoint);
- if (client == null) {
- LOGGER.error("Can't get client for DataNode {}", endPoint);
- return;
- }
client.createDataRegion(req, handler);
} catch (IOException e) {
LOGGER.error("Can't connect to DataNode {}", endPoint, e);
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 780a533cd6..1043fce93c 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -34,8 +34,14 @@
<commons.test.skip>false</commons.test.skip>
<commons.it.skip>${commons.test.skip}</commons.it.skip>
<commons.ut.skip>${commons.test.skip}</commons.ut.skip>
+ <cglib.version>3.1</cglib.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>${cglib.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 54a4d124de..70bd2edb29 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -56,7 +56,9 @@ public class ClientManager<K, V> implements IClientManager<K,
V> {
throw e;
} catch (Exception e) {
// external doesn't care of other exceptions
- logger.error(String.format("Borrow client from pool for node %s
failed.", node), e);
+ String errorMessage = String.format("Borrow client from pool for node %s
failed.", node);
+ logger.error(errorMessage, e);
+ throw new IOException(errorMessage, e);
}
return client;
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 6966241f57..f57becd12c 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -34,9 +34,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import java.lang.reflect.Constructor;
import java.net.SocketException;
-public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
+public class SyncConfigNodeIServiceClient extends ConfigIService.Client
+ implements SyncThriftClient, AutoCloseable {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient>
clientManager;
@@ -60,7 +62,7 @@ public class SyncConfigNodeIServiceClient extends
ConfigIService.Client {
getInputProtocol().getTransport().open();
}
- public void returnSelf() {
+ public void close() {
if (clientManager != null) {
clientManager.returnClient(endpoint, this);
}
@@ -71,7 +73,7 @@ public class SyncConfigNodeIServiceClient extends
ConfigIService.Client {
((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
}
- public void close() {
+ public void invalidate() {
getInputProtocol().getTransport().close();
}
@@ -95,14 +97,22 @@ public class SyncConfigNodeIServiceClient extends
ConfigIService.Client {
@Override
public void destroyObject(
TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient>
pooledObject) {
- pooledObject.getObject().close();
+ pooledObject.getObject().invalidate();
}
@Override
public PooledObject<SyncConfigNodeIServiceClient> makeObject(TEndPoint
endpoint)
throws Exception {
+ Constructor<SyncConfigNodeIServiceClient> constructor =
+ SyncConfigNodeIServiceClient.class.getConstructor(
+ clientFactoryProperty.getProtocolFactory().getClass(),
+ int.class,
+ endpoint.getClass(),
+ clientManager.getClass());
return new DefaultPooledObject<>(
- new SyncConfigNodeIServiceClient(
+ SyncThriftClientWithErrorHandler.newErrorHandler(
+ SyncConfigNodeIServiceClient.class,
+ constructor,
clientFactoryProperty.getProtocolFactory(),
clientFactoryProperty.getConnectionTimeoutMs(),
endpoint,
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 913fc4324f..6152614684 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -34,9 +34,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import java.lang.reflect.Constructor;
import java.net.SocketException;
-public class SyncDataNodeDataBlockServiceClient extends
DataBlockService.Client {
+public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
+ implements SyncThriftClient, AutoCloseable {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
clientManager;
@@ -60,7 +62,7 @@ public class SyncDataNodeDataBlockServiceClient extends
DataBlockService.Client
getInputProtocol().getTransport().open();
}
- public void returnSelf() {
+ public void close() {
if (clientManager != null) {
clientManager.returnClient(endpoint, this);
}
@@ -71,7 +73,7 @@ public class SyncDataNodeDataBlockServiceClient extends
DataBlockService.Client
((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
}
- public void close() {
+ public void invalidate() {
getInputProtocol().getTransport().close();
}
@@ -96,14 +98,22 @@ public class SyncDataNodeDataBlockServiceClient extends
DataBlockService.Client
@Override
public void destroyObject(
TEndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient>
pooledObject) {
- pooledObject.getObject().close();
+ pooledObject.getObject().invalidate();
}
@Override
public PooledObject<SyncDataNodeDataBlockServiceClient>
makeObject(TEndPoint endpoint)
throws Exception {
+ Constructor<SyncDataNodeDataBlockServiceClient> constructor =
+ SyncDataNodeDataBlockServiceClient.class.getConstructor(
+ clientFactoryProperty.getProtocolFactory().getClass(),
+ int.class,
+ endpoint.getClass(),
+ clientManager.getClass());
return new DefaultPooledObject<>(
- new SyncDataNodeDataBlockServiceClient(
+ SyncThriftClientWithErrorHandler.newErrorHandler(
+ SyncDataNodeDataBlockServiceClient.class,
+ constructor,
clientFactoryProperty.getProtocolFactory(),
clientFactoryProperty.getConnectionTimeoutMs(),
endpoint,
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 843efd4396..23d2c56f43 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -35,9 +35,11 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import java.lang.reflect.Constructor;
import java.net.SocketException;
-public class SyncDataNodeInternalServiceClient extends InternalService.Client {
+public class SyncDataNodeInternalServiceClient extends InternalService.Client
+ implements SyncThriftClient, AutoCloseable {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
clientManager;
@@ -71,7 +73,7 @@ public class SyncDataNodeInternalServiceClient extends
InternalService.Client {
return clientManager;
}
- public void returnSelf() {
+ public void close() {
if (clientManager != null) {
clientManager.returnClient(endpoint, this);
}
@@ -82,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends
InternalService.Client {
((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
}
- public void close() {
+ public void invalidate() {
getInputProtocol().getTransport().close();
}
@@ -107,14 +109,19 @@ public class SyncDataNodeInternalServiceClient extends
InternalService.Client {
@Override
public void destroyObject(
TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient>
pooledObject) {
- pooledObject.getObject().close();
+ pooledObject.getObject().invalidate();
}
@Override
public PooledObject<SyncDataNodeInternalServiceClient>
makeObject(TEndPoint endpoint)
throws Exception {
+ Constructor<SyncDataNodeInternalServiceClient> constructor =
+ SyncDataNodeInternalServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(),
clientManager.getClass());
return new DefaultPooledObject<>(
- new SyncDataNodeInternalServiceClient(
+ SyncThriftClientWithErrorHandler.newErrorHandler(
+ SyncDataNodeInternalServiceClient.class,
+ constructor,
clientFactoryProperty.getProtocolFactory(),
clientFactoryProperty.getConnectionTimeoutMs(),
endpoint,
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
new file mode 100644
index 0000000000..38eaa252b2
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.client.sync;
+
+public interface SyncThriftClient {
+
+ /** close the connection */
+ void invalidate();
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
new file mode 100644
index 0000000000..2792a2ba67
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.client.sync;
+
+import net.sf.cglib.proxy.Enhancer;
+import net.sf.cglib.proxy.MethodInterceptor;
+import net.sf.cglib.proxy.MethodProxy;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SyncThriftClientWithErrorHandler.class);
+
+ public static <V extends SyncThriftClient> V newErrorHandler(
+ Class<V> targetClass, Constructor<V> constructor, Object... args) {
+ Enhancer enhancer = new Enhancer();
+ enhancer.setSuperclass(targetClass);
+ enhancer.setCallback(new SyncThriftClientWithErrorHandler());
+ if (constructor == null) {
+ return (V) enhancer.create();
+ }
+ return (V) enhancer.create(constructor.getParameterTypes(), args);
+ }
+
+ @Override
+ public Object intercept(Object o, Method method, Object[] objects,
MethodProxy methodProxy)
+ throws Throwable {
+ try {
+ return methodProxy.invokeSuper(o, objects);
+ } catch (InvocationTargetException e) {
+ if (e.getTargetException() instanceof TException) {
+ LOGGER.error(
+ "Error in calling method {}, err: {}", method.getName(),
e.getTargetException());
+ ((SyncThriftClient) o).invalidate();
+ }
+ throw new TException("Error in calling method " + method.getName(),
e.getTargetException());
+ } catch (Exception e) {
+ throw new TException("Error in calling method " + method.getName(), e);
+ }
+ }
+}
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
index 42f8210665..44a4e3f777 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ClientManagerTest.java
@@ -84,12 +84,12 @@ public class ClientManagerTest {
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
// return one sync client
- syncClient1.returnSelf();
+ syncClient1.close();
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
// return another sync client
- syncClient2.returnSelf();
+ syncClient2.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(2, syncClusterManager.getPool().getNumIdle(endPoint));
@@ -186,12 +186,12 @@ public class ClientManagerTest {
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
// return one sync client
- syncClient1.returnSelf();
+ syncClient1.close();
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
// return another sync client, clientManager should destroy this client
- syncClient2.returnSelf();
+ syncClient2.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
@@ -236,15 +236,21 @@ public class ClientManagerTest {
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
- // get another sync client, should throw error and return null
- long start = System.nanoTime();
- SyncDataNodeInternalServiceClient syncClient2 =
syncClusterManager.borrowClient(endPoint);
- long end = System.nanoTime();
- Assert.assertTrue(end - start >= DefaultProperty.WAIT_CLIENT_TIMEOUT_MS *
1_000_000);
+ // get another sync client, should wait waitClientTimeoutMS ms, throw error
+ SyncDataNodeInternalServiceClient syncClient2 = null;
+ long start = 0, end;
+ try {
+ start = System.nanoTime();
+ syncClient2 = syncClusterManager.borrowClient(endPoint);
+ } catch (IOException e) {
+ end = System.nanoTime();
+ Assert.assertTrue(end - start >= DefaultProperty.WAIT_CLIENT_TIMEOUT_MS
* 1_000_000);
+ Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for
node"));
+ }
Assert.assertNull(syncClient2);
// return one sync client
- syncClient1.returnSelf();
+ syncClient1.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
@@ -255,7 +261,7 @@ public class ClientManagerTest {
Assert.assertEquals(syncClient1, syncClient2);
// return the only client
- syncClient2.returnSelf();
+ syncClient2.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
@@ -303,15 +309,19 @@ public class ClientManagerTest {
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
- // get another sync client, should wait waitClientTimeoutMS ms, throw
error and return null
- long start = System.nanoTime();
- SyncDataNodeInternalServiceClient syncClient2 =
syncClusterManager.borrowClient(endPoint);
- long end = System.nanoTime();
- Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
- Assert.assertNull(syncClient2);
+ // get another sync client, should wait waitClientTimeoutMS ms, throw error
+ long start = 0, end;
+ try {
+ start = System.nanoTime();
+ syncClusterManager.borrowClient(endPoint);
+ } catch (IOException e) {
+ end = System.nanoTime();
+ Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
+ Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for
node"));
+ }
// return one sync client
- syncClient1.returnSelf();
+ syncClient1.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
@@ -349,13 +359,13 @@ public class ClientManagerTest {
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
// return one sync client
- syncClient1.returnSelf();
+ syncClient1.close();
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
// invalid another sync client and return
syncClient2.getInputProtocol().getTransport().close();
- syncClient2.returnSelf();
+ syncClient2.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
index 9114472522..c06a3f1980 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.java
@@ -169,17 +169,9 @@ public class SinkHandle implements ISinkHandle {
nextSequenceId - 1);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
- SyncDataNodeDataBlockServiceClient client = null;
- try {
- client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
- if (client == null) {
- logger.warn("can't get client for node {}", remoteEndpoint);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw new TException("Can't get client for node " +
remoteEndpoint);
- }
- } else {
- client.onEndOfDataBlockEvent(endOfDataBlockEvent);
- }
+ try (SyncDataNodeDataBlockServiceClient client =
+ dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
} catch (TException e) {
logger.error(
@@ -189,9 +181,6 @@ public class SinkHandle implements ISinkHandle {
e.getMessage(),
attempt,
e);
- if (client != null) {
- client.close();
- }
if (attempt == MAX_ATTEMPT_TIMES) {
throw e;
}
@@ -200,10 +189,6 @@ public class SinkHandle implements ISinkHandle {
if (attempt == MAX_ATTEMPT_TIMES) {
throw e;
}
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
}
@@ -364,22 +349,11 @@ public class SinkHandle implements ISinkHandle {
blockSizes);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
- SyncDataNodeDataBlockServiceClient client = null;
- try {
- client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
- if (client == null) {
- logger.warn("can't get client for node {}", remoteEndpoint);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw new TException("Can't get client for node " +
remoteEndpoint);
- }
- } else {
- client.onNewDataBlockEvent(newDataBlockEvent);
- }
+ try (SyncDataNodeDataBlockServiceClient client =
+ dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (Throwable e) {
- if (e instanceof TException && client != null) {
- client.close();
- }
logger.error(
"Failed to send new data block event to plan node {} of {} due
to {}, attempt times: {}",
remotePlanNodeId,
@@ -390,10 +364,6 @@ public class SinkHandle implements ISinkHandle {
if (attempt == MAX_ATTEMPT_TIMES) {
sinkHandleListener.onFailure(SinkHandle.this, e);
}
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 3c346f75b3..1c8bf70880 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -313,16 +312,8 @@ public class SourceHandle implements ISourceHandle {
int attempt = 0;
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
- SyncDataNodeDataBlockServiceClient client = null;
- try {
- client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
- if (client == null) {
- logger.warn("can't get client for node {}", remoteEndpoint);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw new TException("Can't get client for node " +
remoteEndpoint);
- }
- continue;
- }
+ try (SyncDataNodeDataBlockServiceClient client =
+ dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
TGetDataBlockResponse resp = client.getDataBlock(req);
List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
@@ -344,9 +335,6 @@ public class SourceHandle implements ISourceHandle {
new SendAcknowledgeDataBlockEventTask(startSequenceId,
endSequenceId));
break;
} catch (Throwable e) {
- if (e instanceof TException && client != null) {
- client.close();
- }
logger.error(
"Failed to get data block from {} due to {}, attempt times: {}",
remoteFragmentInstanceId,
@@ -361,10 +349,6 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
}
@@ -392,22 +376,11 @@ public class SourceHandle implements ISourceHandle {
new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId,
startSequenceId, endSequenceId);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
- SyncDataNodeDataBlockServiceClient client = null;
- try {
- client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
- if (client == null) {
- logger.warn("can't get client for node {}", remoteEndpoint);
- if (attempt == MAX_ATTEMPT_TIMES) {
- throw new TException("Can't get client for node " +
remoteEndpoint);
- }
- } else {
- client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
- break;
- }
+ try (SyncDataNodeDataBlockServiceClient client =
+ dataBlockServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
+ break;
} catch (Throwable e) {
- if (e instanceof TException && client != null) {
- client.close();
- }
logger.error(
"Failed to send ack data block event [{}, {}) to {} due to {},
attempt times: {}",
startSequenceId,
@@ -420,10 +393,6 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 2d5b0d7bc7..424b58eee1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -65,26 +65,13 @@ public abstract class AbstractFragInsStateTracker
implements IFragInstanceStateT
protected FragmentInstanceState fetchState(FragmentInstance instance)
throws TException, IOException {
- SyncDataNodeInternalServiceClient client = null;
- try {
- // TODO: (jackie tien) change the port
- TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
- client = internalServiceClientManager.borrowClient(endPoint);
- if (client == null) {
- throw new TException("Can't get client for node " + endPoint);
- }
+ // TODO: (jackie tien) change the port
+ TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
TFragmentInstanceStateResp resp =
client.fetchFragmentInstanceState(new
TFetchFragmentInstanceStateReq(getTId(instance)));
return FragmentInstanceState.valueOf(resp.state);
- } catch (Throwable t) {
- if (t instanceof TException && client != null) {
- client.close();
- }
- throw t;
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 6d275a74ef..fec10957bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,14 +57,10 @@ public class SimpleFragInstanceDispatcher implements
IFragInstanceDispatcher {
() -> {
TSendFragmentInstanceResp resp = new
TSendFragmentInstanceResp(false);
for (FragmentInstance instance : instances) {
- SyncDataNodeInternalServiceClient client = null;
TEndPoint endPoint =
instance.getHostDataNode().getInternalEndPoint();
- try {
- // TODO: (jackie tien) change the port
- client = internalServiceClientManager.borrowClient(endPoint);
- if (client == null) {
- throw new TException("Can't get client for node " + endPoint);
- }
+ // TODO: (jackie tien) change the port
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
// TODO: (xingtanzjr) consider how to handle the buffer here
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
instance.serializeRequest(buffer);
@@ -78,20 +73,8 @@ public class SimpleFragInstanceDispatcher implements
IFragInstanceDispatcher {
} catch (IOException e) {
LOGGER.error("can't connect to node {}", endPoint, e);
throw e;
- } catch (TException e) {
- LOGGER.error("sendFragmentInstance failed for node {}",
endPoint, e);
- if (client != null) {
- client.close();
- }
- throw e;
- } catch (Exception e) {
- LOGGER.error("unexpected exception", e);
- throw e;
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
+
if (!resp.accepted) {
break;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 8b3d718959..df6f08f7c2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -64,26 +64,14 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
() -> {
for (TEndPoint endPoint : relatedHost) {
// TODO (jackie tien) change the port
- SyncDataNodeInternalServiceClient client = null;
- try {
- client = internalServiceClientManager.borrowClient(endPoint);
- if (client == null) {
- throw new TException("Can't get client for node " + endPoint);
- }
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
client.cancelQuery(new TCancelQueryReq(queryId.getId()));
} catch (IOException e) {
LOGGER.error("can't connect to node {}", endPoint, e);
return false;
} catch (TException e) {
- LOGGER.error("cancelQuery failed for node {}", endPoint, e);
- if (client != null) {
- client.close();
- }
return false;
- } finally {
- if (client != null) {
- client.returnSelf();
- }
}
}
return true;