This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/HA in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5493b32b60ef8a0f7801b2e9b4df5eac8aa1ff9f Merge: 28b276bf130 827735d1d77 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 23 18:31:53 2024 +0800 resolve conflicts .../src/assembly/resources/tools/collect-info.bat | 146 ++++--- .../resources/conf/iotdb-confignode.properties | 5 - .../assembly/resources/sbin/daemon-confignode.sh | 18 +- .../heartbeat/DataNodeHeartbeatHandler.java | 2 +- .../iotdb/confignode/conf/ConfigNodeConfig.java | 17 +- .../confignode/conf/ConfigNodeDescriptor.java | 8 - .../manager/consensus/ConsensusManager.java | 2 - .../confignode/manager/load/cache/LoadCache.java | 2 +- .../manager/load/cache/node/BaseNodeCache.java | 2 +- .../load/cache/node/ConfigNodeHeartbeatCache.java | 2 +- .../load/cache/node/DataNodeHeartbeatCache.java | 2 +- .../load/cache/node/NodeHeartbeatSample.java | 6 +- .../manager/load/cache/node/NodeStatistics.java | 2 +- .../manager/load/cache/region/RegionCache.java | 5 +- .../load/cache/region/RegionHeartbeatSample.java | 2 +- .../manager/load/service/HeartbeatService.java | 4 +- .../procedure/env/ConfigNodeProcedureEnv.java | 2 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 2 +- .../router/priority/GreedyPriorityTest.java | 6 +- .../priority/LeaderPriorityBalancerTest.java | 6 +- .../manager/load/cache/LoadCacheTest.java | 2 +- .../manager/load/cache/NodeCacheTest.java | 6 +- .../manager/load/cache/RegionGroupCacheTest.java | 6 +- .../manager/load/cache/RegionRouteCacheTest.java | 2 +- .../iotdb/consensus/config/IoTConsensusConfig.java | 16 - .../apache/iotdb/consensus/config/RatisConfig.java | 16 - .../exception/RatisReadUnavailableException.java | 13 +- .../iot/client/IoTConsensusClientPool.java | 2 - .../consensus/iot/logdispatcher/LogDispatcher.java | 24 +- .../iotdb/consensus/ratis/RatisConsensus.java | 18 +- .../resources/conf/iotdb-datanode.properties | 5 - .../src/assembly/resources/sbin/daemon-datanode.sh | 18 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 30 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 27 +- .../db/consensus/DataRegionConsensusImpl.java | 2 - .../db/consensus/SchemaRegionConsensusImpl.java | 1 - .../agent/runtime/PipePeriodicalJobExecutor.java | 1 + .../db/pipe/task/subtask/PipeDataNodeSubtask.java | 105 +++-- .../subtask/connector/PipeConnectorSubtask.java | 142 +++---- .../protocol/client/DataNodeClientPoolFactory.java | 2 - .../execution/load/LoadTsFileManager.java | 144 ++++--- .../execution/operator/AggregationUtil.java | 6 + .../plan/planner/LogicalPlanBuilder.java | 19 +- .../plan/planner/distribution/SourceRewriter.java | 9 +- .../plan/parameter/AggregationDescriptor.java | 14 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 24 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 8 +- .../schemafile/pagemgr/BTreePageManager.java | 8 +- .../pbtree/schemafile/pagemgr/PageIOChannel.java | 184 +++++++++ .../schemafile/pagemgr/PageIndexSortBuckets.java | 129 ++++++ .../pbtree/schemafile/pagemgr/PageManager.java | 452 ++------------------- .../impl/pbtree/schemafile/pagemgr/PagePool.java | 152 +++++++ .../schemafile/pagemgr/SchemaPageContext.java | 109 +++++ .../iotdb/db/storageengine/StorageEngine.java | 19 +- .../impl/ReadChunkCompactionPerformer.java | 7 +- .../ReadChunkAlignedSeriesCompactionExecutor.java | 10 +- .../execute/utils/reader/PointPriorityReader.java | 2 +- .../storageengine/dataregion/wal/node/WALNode.java | 15 +- .../org/apache/iotdb/db/utils/SchemaUtils.java | 27 +- .../FastInnerCompactionPerformerTest.java | 108 +++++ ...nkCompactionPerformerWithAlignedSeriesTest.java | 102 ++++- .../resources/conf/iotdb-cluster.properties | 2 +- .../resources/conf/iotdb-common.properties | 15 +- .../iotdb/commons/client/ClientPoolFactory.java | 29 +- .../client/property/ClientPoolProperty.java | 35 +- .../apache/iotdb/commons/conf/CommonConfig.java | 20 +- .../iotdb/commons/conf/CommonDescriptor.java | 23 -- .../iotdb/commons/pipe/config/PipeConfig.java | 5 - .../iotdb/commons/client/ClientManagerTest.java | 47 ++- 69 files changed, 1387 insertions(+), 1016 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 22babfca96f,b48e8201629..c542bd09c8c --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@@ -283,9 -284,8 +284,9 @@@ public class FragmentInstanceDispatcher return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port; } - /** return true if need retry, false if no need to retry */ - private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) - throws FragmentInstanceDispatchException { + private void dispatchRemoteHelper(FragmentInstance instance, TEndPoint endPoint) - throws FragmentInstanceDispatchException, TException, ClientManagerException { ++ throws FragmentInstanceDispatchException, TException, ClientManagerException, ++ RatisReadUnavailableException { try (SyncDataNodeInternalServiceClient client = syncInternalServiceClientManager.borrowClient(endPoint)) { switch (instance.getType()) { @@@ -343,35 -348,18 +349,35 @@@ TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } + } + + private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) + throws FragmentInstanceDispatchException { + + try { + dispatchRemoteHelper(instance, endPoint); - } catch (ClientManagerException | TException e) { + } catch (ClientManagerException | TException | RatisReadUnavailableException e) { logger.warn( - "can't connect to node {}, error msg is {}, and we try to reconnect this node.", - "can't execute request on node {}, error msg is {}.", ++ "can't execute request on node {}, error msg is {}, and we try to reconnect this node.", endPoint, ExceptionUtils.getRootCause(e).toString()); - TSStatus status = new TSStatus(); - status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); - status.setMessage("can't connect to node " + endPoint); - // If the DataNode cannot be connected, its endPoint will be put into black list - // so that the following retry will avoid dispatching instance towards this DataNode. - queryContext.addFailedEndPoint(endPoint); - throw new FragmentInstanceDispatchException(status); + // we just retry once to clear stale connection for a restart node. + try { + dispatchRemoteHelper(instance, endPoint); - } catch (ClientManagerException | TException e1) { ++ } catch (ClientManagerException | TException | RatisReadUnavailableException e1) { + logger.warn( - "can't connect to node {} in second try, error msg is {}.", ++ "can't execute request on node {} in second try, error msg is {}.", + endPoint, + ExceptionUtils.getRootCause(e1).toString()); + TSStatus status = new TSStatus(); + status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); + status.setMessage("can't connect to node " + endPoint); + // If the DataNode cannot be connected, its endPoint will be put into black list + // so that the following retry will avoid dispatching instance towards this DataNode. + queryContext.addFailedEndPoint(endPoint); + throw new FragmentInstanceDispatchException(status); + } } }
