This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2d8090da0a0 Improve the high availability of IoTDB
2d8090da0a0 is described below
commit 2d8090da0a07dec97d3feb51e800cf281abcfdc4
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Oct 26 20:01:03 2023 +0800
Improve the high availability of IoTDB
---
.../iot/client/AsyncIoTConsensusServiceClient.java | 6 +-
.../consensus/iot/client/DispatchLogHandler.java | 18 +++--
.../metadata/SeriesOverflowException.java | 5 +-
.../plan/planner/plan/FragmentInstance.java | 11 +++
.../plan/scheduler/AsyncPlanNodeSender.java | 83 +++++++++++++++-------
.../plan/scheduler/AsyncSendPlanNodeHandler.java | 27 ++++++-
.../scheduler/FragmentInstanceDispatcherImpl.java | 32 ++++++++-
.../apache/iotdb/commons/client/ThriftClient.java | 13 +++-
.../async/AsyncConfigNodeIServiceClient.java | 6 +-
.../async/AsyncDataNodeInternalServiceClient.java | 6 +-
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 6 +-
.../async/AsyncPipeDataTransferServiceClient.java | 6 +-
12 files changed, 174 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index ed32300dd58..f2b77a499ec 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
@@ -110,7 +111,10 @@ public class AsyncIoTConsensusServiceClient extends
IoTConsensusIService.AsyncCl
checkReady();
return true;
} catch (Exception e) {
- logger.info("Unexpected exception occurs in {} :", this, e);
+ logger.info(
+ "Unexpected exception occurs in {}, error msg is {}",
+ this,
+ ExceptionUtils.getRootCause(e).toString());
return false;
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index b851b29981b..a10d031e91f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
}
- private boolean needRetry(int statusCode) {
+ public static boolean needRetry(int statusCode) {
return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
|| statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
@@ -77,12 +78,15 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
@Override
public void onError(Exception exception) {
- logger.warn(
- "Can not send {} to peer for {} times {} because {}",
- batch,
- thread.getPeer(),
- ++retryCount,
- exception);
+ ++retryCount;
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "Can not send {} to peer for {} times {} because {}",
+ batch,
+ thread.getPeer(),
+ retryCount,
+ ExceptionUtils.getRootCause(exception).toString());
+ }
sleepCorrespondingTimeAndRetryAsynchronous();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
index f8def6004cf..72dd6b75cce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
@@ -28,10 +28,7 @@ public class SeriesOverflowException extends
MetadataException {
public SeriesOverflowException(long memoryUsage, long seriesNum) {
super(
String.format(
- "There are too many timeseries in memory. "
- + "Current memory usage is %s and series num is %s. "
- + "Please increase MAX_HEAP_SIZE in datanode-env.sh/bat, "
- + "restart and create timeseries again.",
+ "Too many timeseries in memory without device template(current
memory: %s, series num: %s). To optimize memory, DEVICE TEMPLATE is more
recommended when devices have same time series.",
memoryUsage, seriesNum),
TSStatusCode.SERIES_OVERFLOW.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 9a211c8384a..fcef7b54a5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -74,6 +74,9 @@ public class FragmentInstance implements IConsensusRequest {
private boolean isHighestPriority;
+ // indicate which index we are retrying
+ private transient int nextRetryIndex = 0;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -271,6 +274,14 @@ public class FragmentInstance implements IConsensusRequest
{
return hostDataNode;
}
+ public TDataNodeLocation getNextRetriedHostDataNode() {
+ nextRetryIndex =
+ (nextRetryIndex + 1) %
executorType.getRegionReplicaSet().getDataNodeLocations().size();
+ this.hostDataNode =
+
executorType.getRegionReplicaSet().getDataNodeLocations().get(nextRetryIndex);
+ return hostDataNode;
+ }
+
public long getTimeOut() {
return timeOut;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index 525397123ba..c004356a5f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -35,15 +35,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
-import static com.google.common.util.concurrent.Futures.immediateFuture;
-
public class AsyncPlanNodeSender {
private static final Logger logger =
LoggerFactory.getLogger(AsyncPlanNodeSender.class);
@@ -53,8 +51,11 @@ public class AsyncPlanNodeSender {
private final Map<TEndPoint, BatchRequestWithIndex> batchRequests;
private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+
+ private final List<Integer> needRetryInstanceIndex;
+
private final AtomicLong pendingNumber;
- private final long startSendTime;
+ private long startSendTime;
public AsyncPlanNodeSender(
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -77,6 +78,7 @@ public class AsyncPlanNodeSender {
instances.get(i).getRegionReplicaSet().getRegionId()));
}
this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size() + 1, 1);
+ this.needRetryInstanceIndex = Collections.synchronizedList(new
ArrayList<>());
this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
}
@@ -84,7 +86,11 @@ public class AsyncPlanNodeSender {
for (Map.Entry<TEndPoint, BatchRequestWithIndex> entry :
batchRequests.entrySet()) {
AsyncSendPlanNodeHandler handler =
new AsyncSendPlanNodeHandler(
- entry.getValue().getIndexes(), pendingNumber,
instanceId2RespMap, startSendTime);
+ entry.getValue().getIndexes(),
+ pendingNumber,
+ instanceId2RespMap,
+ needRetryInstanceIndex,
+ startSendTime);
try {
AsyncDataNodeInternalServiceClient client =
asyncInternalServiceClientManager.borrowClient(entry.getKey());
@@ -135,26 +141,55 @@ public class AsyncPlanNodeSender {
return failureStatusList;
}
- public Future<FragInstanceDispatchResult> getResult() {
- for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry :
instanceId2RespMap.entrySet()) {
- if (!entry.getValue().accepted) {
- logger.warn(
- "dispatch write failed. status: {}, code: {}, message: {}, node
{}",
- entry.getValue().status,
- TSStatusCode.representOf(entry.getValue().status.code),
- entry.getValue().message,
-
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
- if (entry.getValue().getStatus() == null) {
- return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.WRITE_PROCESS_ERROR,
entry.getValue().getMessage())));
- } else {
- return immediateFuture(new
FragInstanceDispatchResult(entry.getValue().getStatus()));
- }
- }
+ public boolean needRetry() {
+ // retried FI list is not empty and data region replica number is greater
than 1
+ return !needRetryInstanceIndex.isEmpty()
+ && instances.get(0).getRegionReplicaSet().dataNodeLocations.size() > 1;
+ }
+
+ /**
+ * This function should be called after all last batch responses were
received. This function will
+ * do the cleaning work and caller won't need to do the cleaning work
outside this function. We
+ * will retry all failed FIs in last batch whose id were all saved in
needRetryInstanceIds, if
+ * there are still failed FIs this time, they will be also saved in
needRetryInstanceIds.
+ *
+ * <p>It's a sync function which means that once this function returned, the
results of this retry
+ * have been received, and you don't need to call waitUntilCompleted.
+ */
+ public void retry() throws InterruptedException {
+ // 1. rebuild the batchRequests using remaining failed FIs, change the
replica for each failed
+ // FI in this step
+ batchRequests.clear();
+ for (int fragmentInstanceIndex : needRetryInstanceIndex) {
+ this.batchRequests
+ .computeIfAbsent(
+ instances
+ .get(fragmentInstanceIndex)
+ .getNextRetriedHostDataNode()
+ .getInternalEndPoint(),
+ x -> new BatchRequestWithIndex())
+ .addSinglePlanNodeReq(
+ fragmentInstanceIndex,
+ new TSendSinglePlanNodeReq(
+ new TPlanNode(
+ instances
+ .get(fragmentInstanceIndex)
+ .getFragment()
+ .getPlanNodeTree()
+ .serializeToByteBuffer()),
+
instances.get(fragmentInstanceIndex).getRegionReplicaSet().getRegionId()));
}
- return immediateFuture(new FragInstanceDispatchResult(true));
+
+ // 2. reset the pendingNumber, needRetryInstanceIds and startSendTime
+ needRetryInstanceIndex.clear();
+ pendingNumber.set(batchRequests.keySet().size());
+ startSendTime = System.nanoTime();
+
+ // 3. call sendAll() to retry
+ sendAll();
+
+ // 4. call waitUntilCompleted() to wait for the responses
+ waitUntilCompleted();
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
index 63af1ffedb9..ed7601b0b3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -20,22 +20,27 @@
package org.apache.iotdb.db.queryengine.plan.scheduler;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.async.AsyncMethodCallback;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.commons.client.ThriftClient.isConnectionBroken;
+
public class AsyncSendPlanNodeHandler implements
AsyncMethodCallback<TSendBatchPlanNodeResp> {
private final List<Integer> instanceIds;
private final AtomicLong pendingNumber;
private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+ private final List<Integer> needRetryInstanceIndex;
private final long sendTime;
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
PerformanceOverviewMetrics.getInstance();
@@ -44,17 +49,23 @@ public class AsyncSendPlanNodeHandler implements
AsyncMethodCallback<TSendBatchP
List<Integer> instanceIds,
AtomicLong pendingNumber,
Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap,
+ List<Integer> needRetryInstanceIndex,
long sendTime) {
this.instanceIds = instanceIds;
this.pendingNumber = pendingNumber;
this.instanceId2RespMap = instanceId2RespMap;
+ this.needRetryInstanceIndex = needRetryInstanceIndex;
this.sendTime = sendTime;
}
@Override
public void onComplete(TSendBatchPlanNodeResp sendBatchPlanNodeResp) {
for (int i = 0; i < sendBatchPlanNodeResp.getResponses().size(); i++) {
- instanceId2RespMap.put(instanceIds.get(i),
sendBatchPlanNodeResp.getResponses().get(i));
+ TSendSinglePlanNodeResp singlePlanNodeResp =
sendBatchPlanNodeResp.getResponses().get(i);
+ instanceId2RespMap.put(instanceIds.get(i), singlePlanNodeResp);
+ if (needRetry(singlePlanNodeResp)) {
+ needRetryInstanceIndex.add(instanceIds.get(i));
+ }
}
if (pendingNumber.decrementAndGet() == 0) {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime()
- sendTime);
@@ -66,6 +77,9 @@ public class AsyncSendPlanNodeHandler implements
AsyncMethodCallback<TSendBatchP
@Override
public void onError(Exception e) {
+ if (needRetry(e)) {
+ needRetryInstanceIndex.addAll(instanceIds);
+ }
TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
String errorMsg = String.format("Fail to send plan node, exception
message: %s", e);
resp.setAccepted(false);
@@ -80,4 +94,15 @@ public class AsyncSendPlanNodeHandler implements
AsyncMethodCallback<TSendBatchP
}
}
}
+
+ private boolean needRetry(Exception e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ // if the exception is SocketException and its error message is Broken
pipe, it means that the
+ // remote node may go offline
+ return isConnectionBroken(rootCause);
+ }
+
+ private boolean needRetry(TSendSinglePlanNodeResp resp) {
+ return !resp.accepted && DispatchLogHandler.needRetry(resp.status.code);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 2e8929402d1..a1ab241fa6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -210,6 +212,25 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
// wait until remote dispatch done
try {
asyncPlanNodeSender.waitUntilCompleted();
+
+ if (asyncPlanNodeSender.needRetry()) {
+ // retry failed remote FIs
+ int retry = 0;
+ final int maxRetryTimes = 10;
+ long waitMillis = getRetrySleepTime(retry);
+
+ while (asyncPlanNodeSender.needRetry()) {
+ retry++;
+ asyncPlanNodeSender.retry();
+ if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) {
+ break;
+ }
+ // still need to retry, sleep some time before make another retry.
+ Thread.sleep(waitMillis);
+ waitMillis = getRetrySleepTime(retry);
+ }
+ }
+
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted when dispatching write async", e);
@@ -239,6 +260,12 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
}
+ private long getRetrySleepTime(int retryTimes) {
+ return Math.min(
+ (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),
+ TimeUnit.SECONDS.toMillis(20));
+ }
+
private void dispatchOneInstance(FragmentInstance instance)
throws FragmentInstanceDispatchException {
TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
@@ -313,7 +340,10 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
String.format("unknown read type [%s]",
instance.getType())));
}
} catch (ClientManagerException | TException e) {
- logger.warn("can't connect to node {}", endPoint, e);
+ logger.warn(
+ "can't connect to node {}, error msg is {}.",
+ endPoint,
+ ExceptionUtils.getRootCause(e).toString());
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
status.setMessage("can't connect to node " + endPoint);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index e9d3b9c36ce..92d933397e6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -25,7 +25,9 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
import java.net.SocketException;
/**
@@ -88,8 +90,8 @@ public interface ThriftClient {
if (o.printLogWhenEncounterException()) {
logger.info(
"Broken pipe error happened in sending RPC,"
- + " we need to clear all previous cached connection",
- t);
+ + " we need to clear all previous cached connection, error
msg is {}",
+ rootCause.toString());
}
o.invalidateAll();
}
@@ -105,6 +107,11 @@ public interface ThriftClient {
static boolean isConnectionBroken(Throwable cause) {
return (cause instanceof SocketException &&
cause.getMessage().contains("Broken pipe"))
|| (cause instanceof TTransportException
- && cause.getMessage().contains("Socket is closed by peer"));
+ && (cause.getMessage().contains("Socket is closed by peer")
+ || cause.getMessage().contains("Read call frame size failed")))
+ || (cause instanceof IOException
+ && (cause.getMessage().contains("Connection reset by peer")
+ || cause.getMessage().contains("Broken pipe")))
+ || (cause instanceof ConnectException &&
cause.getMessage().contains("Connection refused"));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 6f3c00de2a5..36d273c7b78 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
@@ -110,7 +111,10 @@ public class AsyncConfigNodeIServiceClient extends
IConfigNodeRPCService.AsyncCl
return true;
} catch (Exception e) {
if (printLogWhenEncounterException) {
- logger.error("Unexpected exception occurs in {} : {}", this,
e.getMessage());
+ logger.error(
+ "Unexpected exception occurs in {}, error msg is {}",
+ this,
+ ExceptionUtils.getRootCause(e).toString());
}
return false;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 15f6c662d9f..7b617788c3e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
@@ -123,7 +124,10 @@ public class AsyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Asyn
return true;
} catch (Exception e) {
if (printLogWhenEncounterException) {
- logger.error("Unexpected exception occurs in {} : {}", this,
e.getMessage());
+ logger.error(
+ "Unexpected exception occurs in {}, error msg is {}",
+ this,
+ ExceptionUtils.getRootCause(e).toString());
}
return false;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 97c5b1584de..6434ec7f017 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
@@ -111,7 +112,10 @@ public class AsyncDataNodeMPPDataExchangeServiceClient
extends MPPDataExchangeSe
return true;
} catch (Exception e) {
if (printLogWhenEncounterException) {
- logger.error("Unexpected exception occurs in {} : {}", this,
e.getMessage());
+ logger.error(
+ "Unexpected exception occurs in {}, error msg is {}",
+ this,
+ ExceptionUtils.getRootCause(e).toString());
}
return false;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index fc45ab60398..83f47956812 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
@@ -127,7 +128,10 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
return true;
} catch (Exception e) {
if (printLogWhenEncounterException) {
- LOGGER.error("Unexpected exception occurs in {} : {}", this,
e.getMessage());
+ LOGGER.error(
+ "Unexpected exception occurs in {}, error msg is {}",
+ this,
+ ExceptionUtils.getRootCause(e).toString());
}
return false;
}