This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 1.2-iotdb-6017 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bdde57538fda587b535d78ee0a0c4022c6090a20 Author: yschengzi <[email protected]> AuthorDate: Mon Jun 26 01:10:04 2023 +0800 [IOTDB-6017] Pipe: separate pipe heartbeat from cluster heartbeat (#10285) * separate pipe heartbeat from cluster heartbeat * fix: IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor * remove rollback logic in PipeHandleLeaderChangeProcedure to avoid meta lost when restarting cluster * do not submit leader change procedure when leader is not changed * change lock in abstract pipe procedure and procedure to fair lock --------- Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 268c6b88f92e0099cb9c907bd84dc0c5b28d3d1e) --- .../confignode/client/DataNodeRequestType.java | 1 + .../client/async/AsyncDataNodeClientPool.java | 8 ++ .../client/async/handlers/AsyncClientHandler.java | 10 ++ .../handlers/rpc/PipeHeartbeatRPCHandler.java | 70 +++++++++++++ .../statemachine/ConfigRegionStateMachine.java | 3 + .../manager/load/service/HeartbeatService.java | 8 +- .../pipe/runtime/PipeHeartbeatScheduler.java | 111 +++++++++++++++++++++ .../pipe/runtime/PipeLeaderChangeHandler.java | 15 +-- .../pipe/runtime/PipeRuntimeCoordinator.java | 24 +++-- .../confignode/persistence/pipe/PipeTaskInfo.java | 2 +- .../procedure/env/ConfigNodeProcedureEnv.java | 2 +- .../runtime/PipeHandleLeaderChangeProcedure.java | 18 +--- .../thrift/src/main/thrift/datanode.thrift | 13 +++ .../resources/conf/iotdb-common.properties | 8 +- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../apache/iotdb/commons/conf/CommonConfig.java | 23 +++-- .../iotdb/commons/conf/CommonDescriptor.java | 11 +- .../iotdb/commons/pipe/config/PipeConfig.java | 13 ++- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 22 ++++ .../impl/DataNodeInternalRPCServiceImpl.java | 9 ++ 20 files changed, 321 insertions(+), 51 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java index f4e839add37..2282a6db166 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java @@ -68,6 +68,7 @@ public enum DataNodeRequestType { /** Pipe Task */ PUSH_PIPE_META, + PIPE_HEARTBEAT, /** CQ */ EXECUTE_CQ, diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java index e410e095289..05efc1bdbd3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java @@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan import org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler; +import org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler; import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq; @@ -59,6 +60,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; @@ -230,6 +232,12 @@ public class AsyncDataNodeClientPool { (AsyncTSStatusRPCHandler) clientHandler.createAsyncRPCHandler(requestId, targetDataNode)); break; + case PIPE_HEARTBEAT: + client.pipeHeartbeat( + (TPipeHeartbeatReq) clientHandler.getRequest(requestId), + (PipeHeartbeatRPCHandler) + clientHandler.createAsyncRPCHandler(requestId, targetDataNode)); + break; case MERGE: case FULL_MERGE: client.merge( diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java index efd4b9f8f55..c2d2989ba06 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java @@ -26,10 +26,12 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan import org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler; +import org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler; import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp; import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp; import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import java.util.ArrayList; import java.util.List; @@ -197,6 +199,14 @@ public class AsyncClientHandler<Q, R> { dataNodeLocationMap, (Map<Integer, TCheckTimeSeriesExistenceResp>) responseMap, countDownLatch); + case PIPE_HEARTBEAT: + return new PipeHeartbeatRPCHandler( + requestType, + requestId, + targetDataNode, + dataNodeLocationMap, + (Map<Integer, TPipeHeartbeatResp>) responseMap, + countDownLatch); case SET_TTL: case CREATE_DATA_REGION: case CREATE_SCHEMA_REGION: diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java new file mode 100644 index 00000000000..222cbe36a13 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.async.handlers.rpc; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.confignode.client.DataNodeRequestType; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +public class PipeHeartbeatRPCHandler extends AbstractAsyncRPCHandler<TPipeHeartbeatResp> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatRPCHandler.class); + + public PipeHeartbeatRPCHandler( + DataNodeRequestType requestType, + int requestId, + TDataNodeLocation targetDataNode, + Map<Integer, TDataNodeLocation> dataNodeLocationMap, + Map<Integer, TPipeHeartbeatResp> responseMap, + CountDownLatch countDownLatch) { + super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch); + } + + @Override + public void onComplete(TPipeHeartbeatResp response) { + // Put response + responseMap.put(requestId, response); + dataNodeLocationMap.remove(requestId); + LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation); + + // Always CountDown + countDownLatch.countDown(); + } + + @Override + public void onError(Exception e) { + LOGGER.error( + "Failed to " + + requestType + + " on DataNode: " + + formattedTargetLocation + + ", exception: " + + e.getMessage()); + + // Always CountDown + countDownLatch.countDown(); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 4964ba79774..93033b4d9d9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -221,6 +221,8 @@ public class ConfigRegionStateMachine threadPool.submit( () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); + threadPool.submit( + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); } else { LOGGER.info( "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]", @@ -230,6 +232,7 @@ public class ConfigRegionStateMachine // Stop leader scheduling services configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); configManager.getLoadManager().stopLoadServices(); configManager.getProcedureManager().shiftExecutor(false); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 3e91b2b4013..ea42eb3f49c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -127,9 +127,11 @@ public class HeartbeatService { heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount()); // We collect pipe meta in every 100 heartbeat loop heartbeatReq.setNeedPipeMetaList( - heartbeatCounter.get() - % PipeConfig.getInstance().getHeartbeatLoopCyclesForCollectingPipeMeta() - == 0); + !PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() + && heartbeatCounter.get() + % PipeConfig.getInstance() + .getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() + == 0); if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) { heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds()); heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java new file mode 100644 index 00000000000..86347785381 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.runtime; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.confignode.client.DataNodeRequestType; +import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; +import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class PipeHeartbeatScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatScheduler.class); + + private static final boolean IS_SEPERATED_PIPE_HEARTBEAT_ENABLED = + PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled(); + private static final long HEARTBEAT_INTERVAL_SECONDS = + PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta(); + + private static final ScheduledExecutorService HEARTBEAT_EXECUTOR = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.PIPE_RUNTIME_HEARTBEAT.getName()); + + private final ConfigManager configManager; + private final PipeHeartbeatParser pipeHeartbeatParser; + + private Future<?> heartbeatFuture; + + PipeHeartbeatScheduler(ConfigManager configManager) { + this.configManager = configManager; + this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager); + } + + public synchronized void start() { + if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture == null) { + heartbeatFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + HEARTBEAT_EXECUTOR, + this::heartbeat, + HEARTBEAT_INTERVAL_SECONDS, + HEARTBEAT_INTERVAL_SECONDS, + TimeUnit.SECONDS); + LOGGER.info("PipeHeartbeat is started successfully."); + } + } + + private synchronized void heartbeat() { + if (configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty()) { + return; + } + + final Map<Integer, TDataNodeLocation> dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TPipeHeartbeatReq request = new TPipeHeartbeatReq(System.currentTimeMillis()); + LOGGER.info(String.format("Collecting pipe heartbeat %s from data nodes", request.heartbeatId)); + + final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp> clientHandler = + new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap); + AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); + clientHandler + .getResponseMap() + .forEach( + (dataNodeId, resp) -> + pipeHeartbeatParser.parseHeartbeat(dataNodeId, resp.getPipeMetaList())); + } + + public synchronized void stop() { + if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) { + heartbeatFuture.cancel(false); + heartbeatFuture = null; + LOGGER.info("PipeHeartbeat is stopped successfully."); + } + } + + public void parseHeartbeat(int dataNodeId, List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { + pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java index 1ea3cb00c2e..4d1f728714d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java @@ -61,13 +61,16 @@ public class PipeLeaderChangeHandler implements IClusterStatusSubscriber { if (regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) { final String databaseName = configManager.getPartitionManager().getRegionStorageGroup(regionGroupId); + // pipe only collect user's data, filter metric database here. if (databaseName != null && !databaseName.equals(IoTDBConfig.SYSTEM_DATABASE)) { - // pipe only collect user's data, filter metric database here. - dataRegionGroupToOldAndNewLeaderPairMap.put( - regionGroupId, - new Pair<>( // null or -1 means empty origin leader - pair.left == null ? -1 : pair.left, - pair.right == null ? -1 : pair.right)); + // null or -1 means empty origin leader + final int oldLeaderDataNodeId = (pair.left == null ? -1 : pair.left); + final int newLeaderDataNodeId = (pair.right == null ? -1 : pair.right); + + if (oldLeaderDataNodeId != newLeaderDataNodeId) { + dataRegionGroupToOldAndNewLeaderPairMap.put( + regionGroupId, new Pair<>(oldLeaderDataNodeId, newLeaderDataNodeId)); + } } } }); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java index 59ed8e5056c..87a51eebe8f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java @@ -41,15 +41,15 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { private final ExecutorService procedureSubmitter; private final PipeLeaderChangeHandler pipeLeaderChangeHandler; - private final PipeHeartbeatParser pipeHeartbeatParser; private final PipeMetaSyncer pipeMetaSyncer; + private final PipeHeartbeatScheduler pipeHeartbeatScheduler; public PipeRuntimeCoordinator(ConfigManager configManager) { if (procedureSubmitterHolder.get() == null) { synchronized (PipeRuntimeCoordinator.class) { if (procedureSubmitterHolder.get() == null) { procedureSubmitterHolder.set( - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + IoTDBThreadPoolFactory.newSingleThreadExecutor( ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName())); } } @@ -57,8 +57,8 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { procedureSubmitter = procedureSubmitterHolder.get(); pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager); - pipeHeartbeatParser = new PipeHeartbeatParser(configManager); pipeMetaSyncer = new PipeMetaSyncer(configManager); + pipeHeartbeatScheduler = new PipeHeartbeatScheduler(configManager); } public ExecutorService getProcedureSubmitter() { @@ -75,11 +75,6 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { pipeLeaderChangeHandler.onRegionGroupLeaderChanged(event); } - public void parseHeartbeat( - int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { - pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode); - } - public void startPipeMetaSync() { pipeMetaSyncer.start(); } @@ -87,4 +82,17 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { public void stopPipeMetaSync() { pipeMetaSyncer.stop(); } + + public void startPipeHeartbeat() { + pipeHeartbeatScheduler.start(); + } + + public void stopPipeHeartbeat() { + pipeHeartbeatScheduler.stop(); + } + + public void parseHeartbeat( + int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { + pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 46940fc239b..728e3f30792 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -54,7 +54,7 @@ public class PipeTaskInfo implements SnapshotProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskInfo.class); private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin"; - private final ReentrantLock pipeTaskInfoLock = new ReentrantLock(); + private final ReentrantLock pipeTaskInfoLock = new ReentrantLock(true); private final PipeMetaKeeper pipeMetaKeeper; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 76b2cd2149e..4cab4b62642 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -96,7 +96,7 @@ public class ConfigNodeProcedureEnv { /** pipe operation lock */ private final LockQueue pipeLock = new LockQueue(); - private final ReentrantLock schedulerLock = new ReentrantLock(); + private final ReentrantLock schedulerLock = new ReentrantLock(true); private final ConfigManager configManager; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index c07982381cd..33185518dbf 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -124,28 +124,14 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { LOGGER.info("PipeHandleLeaderChangeProcedure: rollbackFromHandleOnConfigNodes"); - final Map<TConsensusGroupId, Integer> oldDataRegionGroupIdToLeaderDataRegionIdMap = - new HashMap<>(); - dataRegionGroupToOldAndNewLeaderPairMap.forEach( - (regionGroupId, oldNewLeaderPair) -> - oldDataRegionGroupIdToLeaderDataRegionIdMap.put( - regionGroupId, oldNewLeaderPair.getLeft())); - - final PipeHandleLeaderChangePlan pipeHandleLeaderChangePlan = - new PipeHandleLeaderChangePlan(oldDataRegionGroupIdToLeaderDataRegionIdMap); - - final ConsensusWriteResponse response = - env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); - } + // nothing to do } @Override protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeHandleLeaderChangeProcedure: rollbackFromCreateOnDataNodes"); - pushPipeMetaToDataNodesIgnoreException(env); + // nothing to do } @Override diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift index 43d26a03835..0deb7b78362 100644 --- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift @@ -265,6 +265,14 @@ struct THeartbeatResp { 10: optional list<binary> pipeMetaList } +struct TPipeHeartbeatReq { + 1: required i64 heartbeatId +} + +struct TPipeHeartbeatResp { + 1: required list<binary> pipeMetaList +} + enum TSchemaLimitLevel{ DEVICE, TIMESERIES @@ -797,6 +805,11 @@ service IDataNodeRPCService { */ common.TSStatus pushPipeMeta(TPushPipeMetaReq req) + /** + * ConfigNode will ask DataNode for pipe meta in every few seconds + **/ + TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) + /** * Execute CQ on DataNode */ diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index 0bd89a4a617..f314a1c3579 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -977,8 +977,12 @@ cluster_name=defaultCluster # The size of the pending queue for the PipeConnector to store the events. # pipe_connector_pending_queue_size=1024 -# The number of heartbeat loop cycles before collecting pipe meta once -# pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100 +# True if the pipe heartbeat is seperated from the cluster's heartbeat, false the pipe heartbeat is +# merged with the cluster's heartbeat. +# pipe_heartbeat_seperated_mode_enabled=true + +# The interval time between the heartbeat that collecting pipe meta (in seconds). +# pipe_heartbeat_interval_seconds_for_collecting_pipe_meta=100 # The initial delay before starting the PipeMetaSyncer service. # pipe_meta_syncer_initial_sync_delay_minutes=3 diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index e9a136a7b08..40eca81c898 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -108,6 +108,7 @@ public enum ThreadName { PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"), PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"), + PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"), PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"), PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"), WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"), diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 7177c043878..f84931263ba 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -160,7 +160,8 @@ public class CommonConfig { private long pipeConnectorRetryIntervalMs = 1000L; private int pipeConnectorPendingQueueSize = 1024; - private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100; + private boolean isSeperatedPipeHeartbeatEnabled = true; + private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100; private long pipeMetaSyncerInitialSyncDelayMinutes = 3; private long pipeMetaSyncerSyncIntervalMinutes = 3; @@ -499,14 +500,22 @@ public class CommonConfig { this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize; } - public int getPipeHeartbeatLoopCyclesForCollectingPipeMeta() { - return pipeHeartbeatLoopCyclesForCollectingPipeMeta; + public boolean isSeperatedPipeHeartbeatEnabled() { + return isSeperatedPipeHeartbeatEnabled; } - public void setPipeHeartbeatLoopCyclesForCollectingPipeMeta( - int pipeHeartbeatLoopCyclesForCollectingPipeMeta) { - this.pipeHeartbeatLoopCyclesForCollectingPipeMeta = - pipeHeartbeatLoopCyclesForCollectingPipeMeta; + public void setSeperatedPipeHeartbeatEnabled(boolean isSeperatedPipeHeartbeatEnabled) { + this.isSeperatedPipeHeartbeatEnabled = isSeperatedPipeHeartbeatEnabled; + } + + public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() { + return pipeHeartbeatIntervalSecondsForCollectingPipeMeta; + } + + public void setPipeHeartbeatIntervalSecondsForCollectingPipeMeta( + int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) { + this.pipeHeartbeatIntervalSecondsForCollectingPipeMeta = + pipeHeartbeatIntervalSecondsForCollectingPipeMeta; } public long getPipeMetaSyncerInitialSyncDelayMinutes() { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index a8058a549b7..be03167bf1d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -304,11 +304,16 @@ public class CommonDescriptor { "pipe_connector_pending_queue_size", String.valueOf(config.getPipeConnectorPendingQueueSize())))); - config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta( + config.setSeperatedPipeHeartbeatEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_heartbeat_seperated_mode_enabled", + String.valueOf(config.isSeperatedPipeHeartbeatEnabled())))); + config.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta( Integer.parseInt( properties.getProperty( - "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta", - String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta())))); + "pipe_heartbeat_interval_seconds_for_collecting_pipe_meta", + String.valueOf(config.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta())))); config.setPipeMetaSyncerInitialSyncDelayMinutes( Long.parseLong( properties.getProperty( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 077048ac11e..b6ffc2238c6 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -87,8 +87,12 @@ public class PipeConfig { /////////////////////////////// Meta Consistency /////////////////////////////// - public int getHeartbeatLoopCyclesForCollectingPipeMeta() { - return COMMON_CONFIG.getPipeHeartbeatLoopCyclesForCollectingPipeMeta(); + public boolean isSeperatedPipeHeartbeatEnabled() { + return COMMON_CONFIG.isSeperatedPipeHeartbeatEnabled(); + } + + public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() { + return COMMON_CONFIG.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta(); } public long getPipeMetaSyncerInitialSyncDelayMinutes() { @@ -129,9 +133,10 @@ public class PipeConfig { LOGGER.info("PipeConnectorRetryIntervalMs: {}", getPipeConnectorRetryIntervalMs()); LOGGER.info("PipeConnectorPendingQueueSize: {}", getPipeConnectorPendingQueueSize()); + LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled()); LOGGER.info( - "HeartbeatLoopCyclesForCollectingPipeMeta: {}", - getHeartbeatLoopCyclesForCollectingPipeMeta()); + "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}", + getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()); LOGGER.info( "PipeMetaSyncerInitialSyncDelayMinutes: {}", getPipeMetaSyncerInitialSyncDelayMinutes()); LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}", getPipeMetaSyncerSyncIntervalMinutes()); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index bbbbd3a7d23..69e7425c00b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -35,6 +35,8 @@ import org.apache.iotdb.db.pipe.task.PipeTaskBuilder; import org.apache.iotdb.db.pipe.task.PipeTaskManager; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.thrift.TException; @@ -633,4 +635,24 @@ public class PipeTaskAgent { } resp.setPipeMetaList(pipeMetaBinaryList); } + + public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) + throws TException { + // do nothing if data node is removing or removed, or request does not need pipe meta list + if (PipeAgent.runtime().isShutdown()) { + return; + } + LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId); + + final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>(); + try { + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + pipeMetaBinaryList.add(pipeMeta.serialize()); + LOGGER.info("Reporting pipe meta: {}", pipeMeta); + } + } catch (IOException e) { + throw new TException(e); + } + resp.setPipeMetaList(pipeMetaBinaryList); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index b31fd026a11..66d6f235910 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -174,6 +174,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq; import org.apache.iotdb.mpp.rpc.thrift.TLoadResp; import org.apache.iotdb.mpp.rpc.thrift.TLoadSample; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; @@ -943,6 +945,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface } } + @Override + public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws TException { + final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(); + PipeAgent.task().collectPipeMetaList(req, resp); + return resp; + } + private TSStatus executeInternalSchemaTask( List<TConsensusGroupId> consensusGroupIdList, Function<TConsensusGroupId, TSStatus> executeOnOneRegion) {
