This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5260 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 452e2a5cdaa2beda7dff73847c939b7ccb245f63 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Dec 21 16:41:43 2022 +0800 Refactor ClientManager API and Exception Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../async/AsyncConfigNodeHeartbeatClientPool.java | 6 +- .../client/async/AsyncDataNodeClientPool.java | 5 +- .../async/AsyncDataNodeHeartbeatClientPool.java | 6 +- .../client/sync/SyncConfigNodeClientPool.java | 6 +- .../client/sync/SyncDataNodeClientPool.java | 8 +-- .../confignode/manager/cq/CQScheduleTask.java | 4 +- .../consensus/iot/IoTConsensusServerImpl.java | 17 ++--- .../consensus/iot/logdispatcher/LogDispatcher.java | 3 +- .../iotdb/consensus/ratis/RatisConsensus.java | 21 +++--- .../java/org/apache/iotdb/it/env/AbstractEnv.java | 19 +++--- .../org/apache/iotdb/it/env/RemoteServerEnv.java | 18 ++--- .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 3 +- .../apache/iotdb/commons/client/ClientManager.java | 76 +++++++++------------- .../iotdb/commons/client/ClientPoolProperty.java | 6 +- .../iotdb/commons/client/IClientManager.java | 18 ++--- .../ClientManagerException.java} | 29 ++------- .../iotdb/commons/service/ThriftServiceThread.java | 2 +- .../iotdb/commons/client/ClientManagerTest.java | 22 ++++--- .../iotdb/db/auth/ClusterAuthorityFetcher.java | 10 +-- .../metadata/template/ClusterTemplateManager.java | 6 +- .../db/mpp/execution/exchange/SinkHandle.java | 4 +- .../db/mpp/execution/exchange/SourceHandle.java | 4 +- .../apache/iotdb/db/mpp/plan/TestRPCClient.java | 13 ++-- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 3 +- .../mpp/plan/analyze/ClusterPartitionFetcher.java | 17 +++-- .../db/mpp/plan/analyze/cache/PartitionCache.java | 11 ++-- .../config/executor/ClusterConfigTaskExecutor.java | 57 ++++++++-------- .../scheduler/AbstractFragInsStateTracker.java | 4 +- .../scheduler/FixedRateFragInsStateTracker.java | 4 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 4 +- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 7 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 6 +- .../db/trigger/executor/TriggerFireVisitor.java | 7 +- .../db/mpp/execution/exchange/SinkHandleTest.java | 9 +-- .../mpp/execution/exchange/SourceHandleTest.java | 12 ++-- 35 files changed, 203 insertions(+), 244 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java index 8dd67da315..3c2e3072f3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java @@ -42,12 +42,8 @@ public class AsyncConfigNodeHeartbeatClientPool { */ public void getConfigNodeHeartBeat( TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) { - AsyncConfigNodeHeartbeatServiceClient client; try { - client = clientManager.purelyBorrowClient(endPoint); - if (client != null) { - client.getConfigNodeHeartBeat(timestamp, handler); - } + clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(timestamp, handler); } catch (Exception ignore) { // Just ignore } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java index 3973689b3b..7f0404cc69 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler; @@ -60,8 +61,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */ public class AsyncDataNodeClientPool { @@ -350,7 +349,7 @@ public class AsyncDataNodeClientPool { } public AsyncDataNodeInternalServiceClient getAsyncClient(TDataNodeLocation targetDataNode) - throws IOException { + throws ClientManagerException { return clientManager.borrowClient(targetDataNode.getInternalEndPoint()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java index a42616c722..1a92dd6ac9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java @@ -44,12 +44,8 @@ public class AsyncDataNodeHeartbeatClientPool { */ public void getDataNodeHeartBeat( TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) { - AsyncDataNodeHeartbeatServiceClient client; try { - client = clientManager.purelyBorrowClient(endPoint); - if (client != null) { - client.getDataNodeHeartBeat(req, handler); - } + clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler); } catch (Exception ignore) { // Just ignore } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java index cb8e62b595..0d0f5336f3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.client.ConfigNodeRequestType; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; @@ -35,7 +36,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.TimeUnit; /** Synchronously send RPC requests to ConfigNode. See confignode.thrift for more details. */ @@ -93,7 +93,7 @@ public class SyncConfigNodeClientPool { return RpcUtils.getStatus( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType); } - } catch (Throwable e) { + } catch (Exception e) { lastException = e; LOGGER.warn( "{} failed on ConfigNode {}, because {}, retrying {}...", @@ -118,7 +118,7 @@ public class SyncConfigNodeClientPool { */ public TSStatus removeConfigNode( TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient client) - throws TException, IOException, InterruptedException { + throws ClientManagerException, TException, InterruptedException { TSStatus status = client.removeConfigNode(configNodeLocation); while (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { TimeUnit.MILLISECONDS.sleep(2000); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index 3de327ecbb..3136e05da1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; @@ -42,7 +43,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.TimeUnit; /** Synchronously send RPC requests to DataNodes. See mpp.thrift for more details. */ @@ -67,7 +67,7 @@ public class SyncDataNodeClientPool { for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { return executeSyncRequest(requestType, client, req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { lastException = e; if (retry != DEFAULT_RETRY_NUM - 1) { LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1); @@ -86,7 +86,7 @@ public class SyncDataNodeClientPool { for (int retry = 0; retry < retryNum; retry++) { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { return executeSyncRequest(requestType, client, req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { lastException = e; if (retry != retryNum - 1) { LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1); @@ -167,7 +167,7 @@ public class SyncDataNodeClientPool { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(dataNode)) { TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId, newLeaderNode); status = client.changeRegionLeader(req); - } catch (IOException e) { + } catch (ClientManagerException e) { LOGGER.error("Can't connect to Data node: {}", dataNode, e); status = new TSStatus(TSStatusCode.CAN_NOT_CONNECT_DATANODE.getStatusCode()); status.setMessage(e.getMessage()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index f60bc09080..b5854c0c18 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -172,8 +172,8 @@ public class CQScheduleTask implements Runnable { AsyncDataNodeInternalServiceClient client = AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get()); client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, endTime)); - } catch (Throwable t) { - LOGGER.warn("Execute CQ {} failed", cqId, t); + } catch (Exception e) { + LOGGER.warn("Execute CQ {} failed", cqId, e); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index d66d29cc6e..1d668c281a 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -332,7 +333,7 @@ public class IoTConsensusServerImpl { reader.close(); } } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when send snapshot file to %s", targetPeer), e); } @@ -403,7 +404,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when inactivating %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when inactivating %s", peer), e); } @@ -420,7 +421,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when triggering snapshot load %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s", peer), e); } @@ -435,7 +436,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s", peer), e); } @@ -470,7 +471,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("build sync log channel failed from %s to %s", peer, targetPeer)); } - } catch (IOException | TException e) { + } catch (Exception e) { // We use a simple way to deal with the connection issue when notifying other nodes to // build sync log. If the un-responsible peer is the peer which will be removed, we cannot // suspend the operation and need to skip it. In order to keep the mechanism works fine, @@ -513,7 +514,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("remove sync log channel failed from %s to %s", peer, targetPeer)); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when removing sync log channel to %s", peer), e); } @@ -545,7 +546,7 @@ public class IoTConsensusServerImpl { res.safeIndex); Thread.sleep(checkIntervalInMs); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { throw new ConsensusGroupModifyPeerException( String.format( "error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()), @@ -750,7 +751,7 @@ public class IoTConsensusServerImpl { String.format( "cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("cleanup remote snapshot failed of %s", targetPeer), e); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index fecccc1403..752a392ed1 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader; import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan; import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -450,7 +449,7 @@ public class LogDispatcher { batch.getEndIndex(), peer.getGroupId().convertToTConsensusGroupId()); client.syncLogEntries(req, handler); - } catch (IOException | TException e) { + } catch (Exception e) { logger.error("Can not sync logs to peer {} because", peer, e); handler.onError(e); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 151bf17bb4..e00374c347 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ClientPoolProperty; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.IClientPoolFactory; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -73,7 +74,6 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.function.CheckedSupplier; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +243,7 @@ class RatisConsensus implements IConsensus { if (isLeader(consensusGroupId) && CommonDescriptor.getInstance().getConfig().isReadOnly()) { try { forceStepDownLeader(raftGroup); - } catch (IOException e) { + } catch (Exception e) { logger.warn("leader {} read only, force step down failed due to {}", myself, e); } return failedWrite(new NodeReadOnlyException(myself)); @@ -284,7 +284,7 @@ class RatisConsensus implements IConsensus { return failedWrite(new RatisRequestFailedException(reply.getException())); } writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); - } catch (IOException | TException e) { + } catch (Exception e) { return failedWrite(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -357,7 +357,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } - } catch (IOException e) { + } catch (Exception e) { return failed(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -550,7 +550,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } - } catch (IOException e) { + } catch (Exception e) { return failed(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -560,13 +560,14 @@ class RatisConsensus implements IConsensus { return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } - private void forceStepDownLeader(RaftGroup group) throws IOException { + private void forceStepDownLeader(RaftGroup group) throws ClientManagerException, IOException { // when newLeaderPeerId == null, ratis forces current leader to step down and raise new // election transferLeader(group, null); } - private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws IOException { + private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) + throws ClientManagerException, IOException { RatisClient client = null; try { client = getRaftClient(group); @@ -770,10 +771,10 @@ class RatisConsensus implements IConsensus { Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY)); } - private RatisClient getRaftClient(RaftGroup group) throws IOException { + private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException { try { return clientManager.borrowClient(group); - } catch (IOException e) { + } catch (ClientManagerException e) { logger.error(String.format("Borrow client from pool for group %s failed.", group), e); // rethrow the exception throw e; @@ -792,7 +793,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { throw new RatisRequestFailedException(reply.getException()); } - } catch (IOException e) { + } catch (Exception e) { throw new RatisRequestFailedException(e); } finally { if (client != null) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java index e1979b48b2..7948abc1a9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java @@ -20,6 +20,7 @@ package org.apache.iotdb.it.env; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; @@ -74,6 +75,11 @@ public abstract class AbstractEnv implements BaseEnv { protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList(); protected String testMethodName = null; + private final IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = + new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() + .createClientManager( + new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); + protected void initEnvironment(int configNodesNum, int dataNodesNum) { this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); @@ -434,15 +440,12 @@ public abstract class AbstractEnv implements BaseEnv { @Override public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException, InterruptedException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); + for (int i = 0; i < 30; i++) { for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { SyncConfigNodeIServiceClient client = - clientManager.purelyBorrowClient( + clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); TShowClusterResp resp = client.showCluster(); @@ -472,10 +475,6 @@ public abstract class AbstractEnv implements BaseEnv { @Override public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); for (int retry = 0; retry < 30; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); @@ -488,7 +487,7 @@ public abstract class AbstractEnv implements BaseEnv { if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return configNodeId; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error( "Borrow ConfigNodeClient from ConfigNode: {} failed because: {}, retrying...", configNodeWrapper.getIp(), diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java index 83ab667eb9..0c4c5cb535 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java @@ -20,6 +20,7 @@ package org.apache.iotdb.it.env; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.db.client.DataNodeClientPoolFactory; @@ -41,11 +42,17 @@ import static org.apache.iotdb.jdbc.Config.VERSION; import static org.junit.Assert.fail; public class RemoteServerEnv implements BaseEnv { + private String ip_addr = System.getProperty("RemoteIp", "127.0.0.1"); private String port = System.getProperty("RemotePort", "6667"); private String user = System.getProperty("RemoteUser", "root"); private String password = System.getProperty("RemotePassword", "root"); + private final IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = + new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() + .createClientManager( + new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); + @Override public void initBeforeClass() { try (Connection connection = EnvFactory.getEnv().getConnection(); @@ -151,15 +158,8 @@ public class RemoteServerEnv implements BaseEnv { } @Override - public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); - try (SyncConfigNodeIServiceClient client = - clientManager.borrowClient(new TEndPoint(ip_addr, 22277))) { - return client; - } + public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws ClientManagerException { + return clientManager.borrowClient(new TEndPoint(ip_addr, 22277)); } @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index d0ba65b3b1..4e4b59b5fd 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.itbase.env; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.it.env.ConfigNodeWrapper; import org.apache.iotdb.it.env.DataNodeWrapper; @@ -73,7 +74,7 @@ public interface BaseEnv { void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList); IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() - throws IOException, InterruptedException; + throws ClientManagerException, IOException, InterruptedException; default ISession getSessionConnection() throws IoTDBConnectionException { return getSessionConnection( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index ba094a54ee..7c9553d751 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -19,14 +19,14 @@ package org.apache.iotdb.commons.client; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.commons.pool2.KeyedObjectPool; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.Optional; public class ClientManager<K, V> implements IClientManager<K, V> { @@ -44,60 +44,42 @@ public class ClientManager<K, V> implements IClientManager<K, V> { } @Override - public V borrowClient(K node) throws IOException { - V client; + public V borrowClient(K node) throws ClientManagerException { try { - client = pool.borrowObject(node); - } catch (TTransportException e) { - // External needs to check transport related exception - throw new IOException(e); - } catch (IOException e) { - // External needs the IOException to check connection - throw e; + return pool.borrowObject(node); } catch (Exception e) { - // External doesn't care of other exceptions - String errorMessage = - String.format( - "Borrow client from pool for node %s failed, you need to increase dn_max_connection_for_internal_service.", - node); - logger.warn(errorMessage, e); - throw new IOException(errorMessage, e); + throw new ClientManagerException(e); } - return client; } - @Override - public V purelyBorrowClient(K node) { - V client = null; - try { - client = pool.borrowObject(node); - } catch (Exception ignored) { - // Just ignore - } - return client; - } - - // return a V client of the K node to the Manager + // return a client V for node K to the ClientManager + // Note: We do not define this interface in IClientManager to make you aware that the return of a + // client is automatic whenever a particular client is used public void returnClient(K node, V client) { - if (client != null && node != null) { - try { - pool.returnObject(node, client); - } catch (Exception e) { - logger.error( - String.format("Return client %s for node %s to pool failed.", client, node), e); - } - } + Optional.ofNullable(node) + .ifPresent( + x -> { + try { + pool.returnObject(node, client); + } catch (Exception e) { + logger.error( + String.format("Return client %s for node %s to pool failed.", client, node), e); + } + }); } @Override public void clear(K node) { - if (node != null) { - try { - pool.clear(node); - } catch (Exception e) { - logger.error(String.format("Clear all client in pool for node %s failed.", node), e); - } - } + Optional.ofNullable(node) + .ifPresent( + x -> { + try { + pool.clear(node); + } catch (Exception e) { + logger.error( + String.format("Clear all client in pool for node %s failed.", node), e); + } + }); } @Override @@ -105,7 +87,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> { try { pool.close(); } catch (Exception e) { - logger.error("close client pool failed", e); + logger.error("Close client pool failed", e); } } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java index c764692564..18481cc53b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java @@ -38,10 +38,10 @@ public class ClientPoolProperty<V> { public static class Builder<V> { // when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the - // current thread will block waitClientTimeoutMS, ClientManager returns NULL if there are no - // clients after the block time + // current thread will block waitClientTimeoutMS, ClientManager throws ClientManagerException if + // there are no clients after the block time private long waitClientTimeoutMS = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS; - // the maximum number of clients that can be applied for a node + // the maximum number of clients that can be allocated for a node private int maxTotalClientForEachNode = DefaultProperty.MAX_TOTAL_CLIENT_FOR_EACH_NODE; // the maximum number of clients that can be idle for a node. When the number of idle clients on // a node exceeds this number, newly returned clients will be released diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 4dfaf6b362..3c6d349c49 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -19,28 +19,24 @@ package org.apache.iotdb.commons.client; -import javax.annotation.concurrent.ThreadSafe; +import org.apache.iotdb.commons.client.exception.ClientManagerException; -import java.io.IOException; +import javax.annotation.concurrent.ThreadSafe; @ThreadSafe public interface IClientManager<K, V> { - // get a V client of the K node from the Manager - V borrowClient(K node) throws IOException; + // get a client V for node K from the IClientManager + V borrowClient(K node) throws ClientManagerException; - // Get a V client of the K node from the Manager while - // no exceptions will be thrown and no logs will be printed. - // This interface is mainly used to process the cluster heartbeat. - V purelyBorrowClient(K node); - - // clear all clients for K node + // clear all clients for node K void clear(K node); - // close clientManager + // close IClientManager, which means closing all clients for all nodes void close(); class Factory<K, V> { + public IClientManager<K, V> createClientManager(IClientPoolFactory<K, V> clientPoolFactory) { return new ClientManager<>(clientPoolFactory); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java similarity index 51% copy from node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java copy to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java index 4dfaf6b362..77c169d965 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java @@ -17,32 +17,11 @@ * under the License. */ -package org.apache.iotdb.commons.client; +package org.apache.iotdb.commons.client.exception; -import javax.annotation.concurrent.ThreadSafe; +public class ClientManagerException extends Exception { -import java.io.IOException; - -@ThreadSafe -public interface IClientManager<K, V> { - - // get a V client of the K node from the Manager - V borrowClient(K node) throws IOException; - - // Get a V client of the K node from the Manager while - // no exceptions will be thrown and no logs will be printed. - // This interface is mainly used to process the cluster heartbeat. - V purelyBorrowClient(K node); - - // clear all clients for K node - void clear(K node); - - // close clientManager - void close(); - - class Factory<K, V> { - public IClientManager<K, V> createClientManager(IClientPoolFactory<K, V> clientPoolFactory) { - return new ClientManager<>(clientPoolFactory); - } + public ClientManagerException(Exception exception) { + super(exception); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java index 5ce1746799..f7c7f05e24 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java @@ -31,7 +31,7 @@ public class ThriftServiceThread extends AbstractThriftServiceThread { /** for asynced ThriftService. */ @SuppressWarnings("squid:S107") public ThriftServiceThread( - TBaseAsyncProcessor processor, + TBaseAsyncProcessor<?> processor, String serviceName, String threadsName, String bindAddress, diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 05169af548..18bf4bcd38 100644 --- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.mock.MockInternalRPCService; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.exception.StartupException; @@ -34,6 +35,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -251,14 +253,15 @@ public class ClientManagerTest { // get another sync client, should wait waitClientTimeoutMS ms, throw error SyncDataNodeInternalServiceClient syncClient2 = null; - long start = 0, end; + long start = 0; try { start = System.nanoTime(); syncClient2 = syncClusterManager.borrowClient(endPoint); - } catch (IOException e) { - end = System.nanoTime(); + } catch (ClientManagerException e) { + long end = System.nanoTime(); Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000); - Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node")); + Assert.assertTrue(e.getCause() instanceof NoSuchElementException); + Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object")); } Assert.assertNull(syncClient2); @@ -322,14 +325,15 @@ public class ClientManagerTest { Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint)); // get another sync client, should wait waitClientTimeoutMS ms, throw error - long start = 0, end; + long start = 0; try { start = System.nanoTime(); - syncClusterManager.borrowClient(endPoint); - } catch (IOException e) { - end = System.nanoTime(); + syncClient1 = syncClusterManager.borrowClient(endPoint); + } catch (ClientManagerException e) { + long end = System.nanoTime(); Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000); - Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node")); + Assert.assertTrue(e.getCause() instanceof NoSuchElementException); + Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object")); } // return one sync client diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java index 7c5b33ff16..56acc74894 100644 --- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.auth.entity.PathPrivilege; import org.apache.iotdb.commons.auth.entity.Role; import org.apache.iotdb.commons.auth.entity.User; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.PartialPath; @@ -50,7 +51,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -139,7 +139,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); future.setException(e); } catch (AuthException e) { @@ -173,7 +173,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } else { AuthorizerManager.getInstance().buildTSBlock(authorizerResp.getAuthorizerInfo(), future); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); authorizerResp.setStatus( RpcUtils.getStatus( @@ -209,7 +209,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server status = configNodeClient.login(req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); status = new TPermissionInfoResp(); status.setStatus( @@ -236,7 +236,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server permissionInfoResp = configNodeClient.checkUserPrivileges(req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); permissionInfoResp = new TPermissionInfoResp(); permissionInfoResp.setStatus( diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java index ea81e4401b..19564f2528 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.template; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; @@ -41,7 +42,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -98,7 +98,7 @@ public class ClusterTemplateManager implements ITemplateManager { tsStatus); } return tsStatus; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new RuntimeException( new IoTDBException( "create template error.", e, TSStatusCode.CREATE_TEMPLATE_ERROR.getStatusCode())); @@ -140,7 +140,7 @@ public class ClusterTemplateManager implements ITemplateManager { tGetAllTemplatesResp.getStatus().getMessage(), tGetAllTemplatesResp.getStatus().getCode())); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new RuntimeException( new IoTDBException( "get all template error.", TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode())); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 943d24a6e5..50a3f2e1e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -402,7 +402,7 @@ public class SinkHandle implements ISinkHandle { mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) { client.onNewDataBlockEvent(newDataBlockEvent); break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn("Failed to send new data block event, attempt times: {}", attempt, e); if (attempt == MAX_ATTEMPT_TIMES) { sinkHandleListener.onFailure(SinkHandle.this, e); @@ -442,7 +442,7 @@ public class SinkHandle implements ISinkHandle { mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) { client.onEndOfDataBlockEvent(endOfDataBlockEvent); break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn("Failed to send end of data block event, attempt times: {}", attempt, e); if (attempt == MAX_ATTEMPT_TIMES) { logger.warn("Failed to send end of data block event after all retry", e); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index cda072ff0f..b06b1366e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -461,7 +461,7 @@ public class SourceHandle implements ISourceHandle { } } break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn( "failed to get data block [{}, {}), attempt times: {}", @@ -532,7 +532,7 @@ public class SourceHandle implements ISourceHandle { mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) { client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent); break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn( "failed to send ack data block event [{}, {}), attempt times: {}", startSequenceId, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java index 72bced5f55..b9a358c2c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java @@ -37,9 +37,6 @@ import org.apache.iotdb.db.client.DataNodeClientPoolFactory; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; -import org.apache.thrift.TException; - -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -75,7 +72,7 @@ public class TestRPCClient { new TTriggerSnapshotLoadReq( new DataRegionId(1).convertToTConsensusGroupId(), "snapshot_1_1662370255552")); System.out.println(res.status); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -87,7 +84,7 @@ public class TestRPCClient { client.inactivatePeer( new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId())); System.out.println(res.status); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -97,7 +94,7 @@ public class TestRPCClient { INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) { client.removeRegionPeer( new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3))); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -107,7 +104,7 @@ public class TestRPCClient { INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) { client.addRegionPeer( new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3))); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -169,7 +166,7 @@ public class TestRPCClient { TSStatus res = client.createDataRegion(req); System.out.println(res.code + " " + res.message); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 31eaa3256b..2a8a2d9b19 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.analyze; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; @@ -2513,7 +2514,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> + showDataNodesResp.getStatus().getMessage()); } return showDataNodesResp.getDataNodeLocationList(); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getRunningDataNodeLocations():" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java index 42e379beb9..2c70aaf3d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.DataPartition; @@ -62,6 +63,7 @@ import java.util.List; import java.util.Map; public class ClusterPartitionFetcher implements IPartitionFetcher { + private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -74,6 +76,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory()); private static final class ClusterPartitionFetcherHolder { + private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher(); private ClusterPartitionFetcherHolder() {} @@ -115,7 +118,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { } } return schemaPartition; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.warn("Get Schema Partition error", e); throw new StatementAnalyzeException( "An error occurred when executing getSchemaPartition():" + e.getMessage()); @@ -147,7 +150,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { } } return schemaPartition; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateSchemaPartition():" + e.getMessage()); } @@ -164,7 +167,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { constructSchemaNodeManagementPartitionReq(patternTree, level)); return parseSchemaNodeManagementPartitionResp(schemaNodeManagementResp); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getSchemaNodeManagementPartition():" + e.getMessage()); } @@ -188,7 +191,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getDataPartition():" + e.getMessage()); } @@ -214,7 +217,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getDataPartition():" + e.getMessage()); } @@ -239,7 +242,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getOrCreateDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateDataPartition():" + e.getMessage()); } @@ -272,7 +275,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { dataPartitionTableResp.getStatus().getMessage(), dataPartitionTableResp.getStatus().getCode())); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateDataPartition():" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java index 3b5b25967f..040c568b40 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; @@ -56,7 +57,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -184,7 +184,8 @@ public class PartitionCache { * @param devicePaths the devices that need to hit */ private void fetchStorageGroupAndUpdateCache( - StorageGroupCacheResult<?> result, List<String> devicePaths) throws IOException, TException { + StorageGroupCacheResult<?> result, List<String> devicePaths) + throws ClientManagerException, TException { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { storageGroupCacheLock.writeLock().lock(); @@ -215,7 +216,7 @@ public class PartitionCache { */ private void createStorageGroupAndUpdateCache( StorageGroupCacheResult<?> result, List<String> devicePaths) - throws IOException, MetadataException, TException { + throws ClientManagerException, MetadataException, TException { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { storageGroupCacheLock.writeLock().lock(); @@ -336,7 +337,7 @@ public class PartitionCache { throw new StatementAnalyzeException("Failed to get database Map in three attempts."); } } - } catch (TException | MetadataException | IOException e) { + } catch (TException | MetadataException | ClientManagerException e) { throw new StatementAnalyzeException( "An error occurred when executing getDeviceToStorageGroup():" + e.getMessage()); } @@ -421,7 +422,7 @@ public class PartitionCache { throw new RuntimeException( "Failed to get replicaSet of consensus group[id= " + consensusGroupId + "]"); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getRegionReplicaSet():" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 953550f7a6..ced8aa4dfd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; @@ -216,7 +217,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -235,7 +236,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TShowStorageGroupResp resp = client.showStorageGroup(storageGroupPathPattern); // build TSBlock ShowStorageGroupTask.buildTSBlock(resp.getStorageGroupInfoMap(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -254,7 +255,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { storageGroupNum = resp.getCount(); // build TSBlock CountStorageGroupTask.buildTSBlock(storageGroupNum, future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -278,7 +279,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -400,7 +401,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | IOException | TException e) { future.setException(e); } return future; @@ -419,7 +420,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -439,7 +440,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert triggerTable and buildTsBlock ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } @@ -575,7 +576,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException | IOException e) { future.setException(e); } return future; @@ -593,7 +594,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -614,7 +615,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert triggerTable and buildTsBlock ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } @@ -642,7 +643,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -657,7 +658,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.merge(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -680,7 +681,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.flush(tFlushReq); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -703,7 +704,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.clearCache(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -726,7 +727,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.loadConfiguration(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -749,7 +750,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.setSystemStatus(status.getStatus()); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -770,7 +771,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { showClusterResp = client.showCluster(); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { if (showClusterResp.getConfigNodeList() == null) { future.setException(new TException(MSG_RECONNECTION_FAIL)); } else { @@ -816,7 +817,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } } } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -847,7 +848,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showRegionResp.getStatus().message, showRegionResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -869,7 +870,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -891,7 +892,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -1027,7 +1028,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1052,7 +1053,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1111,7 +1112,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1326,7 +1327,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1475,7 +1476,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1493,7 +1494,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1512,7 +1513,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert cqList and buildTsBlock ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java index e71f331cdf..bebb38f2b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; @@ -35,7 +36,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp; import org.apache.thrift.TException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -70,7 +70,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT public abstract void abort(); protected FragmentInstanceInfo fetchInstanceInfo(FragmentInstance instance) - throws TException, IOException { + throws ClientManagerException, TException { TEndPoint endPoint = instance.getHostDataNode().internalEndPoint; if (isInstanceRunningLocally(endPoint)) { FragmentInstanceInfo info = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 613e0011ba..80b90b9a21 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -35,7 +36,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -131,7 +131,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { updateQueryState(instance.getId(), instanceInfo); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { // TODO: do nothing ? logger.warn("error happened while fetching query state", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 1ba07ccb79..db04e97f0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -47,7 +48,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown query type [%s]", instance.getType()))); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index bda8205edf..14bace8dc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; @@ -28,10 +29,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,11 +100,11 @@ public class SimpleQueryTerminator implements IQueryTerminator { try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs)); - } catch (IOException e) { + } catch (ClientManagerException e) { logger.warn("can't connect to node {}", endPoint, e); // we shouldn't return here and need to cancel queryTasks in other nodes succeed = false; - } catch (Throwable t) { + } catch (TException t) { logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t); // we shouldn't return here and need to cancel queryTasks in other nodes succeed = false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java index 956f7c2ae9..80d9ec3516 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -50,7 +51,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -135,7 +135,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { logger.warn(loadResp.message); throw new FragmentInstanceDispatchException(loadResp.status); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); @@ -222,7 +222,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { logger.warn(loadResp.message); throw new FragmentInstanceDispatchException(loadResp.status); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java index fc1a7d056b..a9b49b846d 100644 --- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.trigger.executor; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.path.PartialPath; @@ -340,7 +341,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv Thread.sleep(4000); } } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { // IOException means that we failed to borrow client, possibly because corresponding // DataNode is down. // TException means there's a timeout or broken connection. @@ -353,7 +354,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv e); // update TDataNodeLocation of stateful trigger through config node updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId()); - } catch (Throwable e) { + } catch (Exception e) { LOGGER.warn( "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}", triggerName, @@ -412,7 +413,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv } } return false; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException | IOException e) { LOGGER.error( "Failed to update location of stateful trigger({}) through config node and the cause is {}.", triggerName, diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java index 029eac1ccf..57f121f40f 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; @@ -74,7 +75,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -223,7 +224,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -423,7 +424,7 @@ public class SinkHandleTest { Mockito.doThrow(mockException) .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -532,7 +533,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java index 7c345b5d9f..aa9adbace6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; @@ -37,7 +38,6 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -81,7 +81,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -197,7 +197,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -351,7 +351,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -523,7 +523,7 @@ public class SourceHandleTest { Mockito.doThrow(mockException) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -606,7 +606,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); }
