This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new b14a387df6d [IOTDB-6017] Pipe: separate pipe heartbeat from cluster
heartbeat (#10285) (#10315)
b14a387df6d is described below
commit b14a387df6d0bda6d9b72aae09eda3c608d3c1ec
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 26 01:27:38 2023 +0800
[IOTDB-6017] Pipe: separate pipe heartbeat from cluster heartbeat (#10285)
(#10315)
* separate pipe heartbeat from cluster heartbeat
* fix: IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor
* remove rollback logic in PipeHandleLeaderChangeProcedure to avoid meta
lost when restarting cluster
* do not submit leader change procedure when leader is not changed
* change lock in abstract pipe procedure and procedure to fair lock
---------
Co-authored-by: yschengzi <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit 268c6b88f92e0099cb9c907bd84dc0c5b28d3d1e)
---
.../confignode/client/DataNodeRequestType.java | 1 +
.../client/async/AsyncDataNodeClientPool.java | 8 ++
.../client/async/handlers/AsyncClientHandler.java | 10 ++
.../handlers/rpc/PipeHeartbeatRPCHandler.java | 70 +++++++++++++
.../statemachine/ConfigRegionStateMachine.java | 3 +
.../manager/load/service/HeartbeatService.java | 8 +-
.../pipe/runtime/PipeHeartbeatScheduler.java | 111 +++++++++++++++++++++
.../pipe/runtime/PipeLeaderChangeHandler.java | 15 +--
.../pipe/runtime/PipeRuntimeCoordinator.java | 24 +++--
.../confignode/persistence/pipe/PipeTaskInfo.java | 2 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 2 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 18 +---
.../thrift/src/main/thrift/datanode.thrift | 13 +++
.../resources/conf/iotdb-common.properties | 8 +-
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../apache/iotdb/commons/conf/CommonConfig.java | 23 +++--
.../iotdb/commons/conf/CommonDescriptor.java | 11 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 13 ++-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 22 ++++
.../impl/DataNodeInternalRPCServiceImpl.java | 9 ++
20 files changed, 321 insertions(+), 51 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index f4e839add37..2282a6db166 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -68,6 +68,7 @@ public enum DataNodeRequestType {
/** Pipe Task */
PUSH_PIPE_META,
+ PIPE_HEARTBEAT,
/** CQ */
EXECUTE_CQ,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index e410e095289..05efc1bdbd3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -230,6 +232,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
+ case PIPE_HEARTBEAT:
+ client.pipeHeartbeat(
+ (TPipeHeartbeatReq) clientHandler.getRequest(requestId),
+ (PipeHeartbeatRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
+ break;
case MERGE:
case FULL_MERGE:
client.merge(
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index efd4b9f8f55..c2d2989ba06 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -26,10 +26,12 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import java.util.ArrayList;
import java.util.List;
@@ -197,6 +199,14 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TCheckTimeSeriesExistenceResp>) responseMap,
countDownLatch);
+ case PIPE_HEARTBEAT:
+ return new PipeHeartbeatRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TPipeHeartbeatResp>) responseMap,
+ countDownLatch);
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
new file mode 100644
index 00000000000..222cbe36a13
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class PipeHeartbeatRPCHandler extends
AbstractAsyncRPCHandler<TPipeHeartbeatResp> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatRPCHandler.class);
+
+ public PipeHeartbeatRPCHandler(
+ DataNodeRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ Map<Integer, TPipeHeartbeatResp> responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap,
responseMap, countDownLatch);
+ }
+
+ @Override
+ public void onComplete(TPipeHeartbeatResp response) {
+ // Put response
+ responseMap.put(requestId, response);
+ dataNodeLocationMap.remove(requestId);
+ LOGGER.info("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
+
+ // Always CountDown
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.error(
+ "Failed to "
+ + requestType
+ + " on DataNode: "
+ + formattedTargetLocation
+ + ", exception: "
+ + e.getMessage());
+
+ // Always CountDown
+ countDownLatch.countDown();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 4964ba79774..93033b4d9d9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -221,6 +221,8 @@ public class ConfigRegionStateMachine
threadPool.submit(
() ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
+ threadPool.submit(
+ () ->
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
} else {
LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, the
new leader is [nodeId:{}]",
@@ -230,6 +232,7 @@ public class ConfigRegionStateMachine
// Stop leader scheduling services
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
+
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
configManager.getLoadManager().stopLoadServices();
configManager.getProcedureManager().shiftExecutor(false);
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 3e91b2b4013..ea42eb3f49c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -127,9 +127,11 @@ public class HeartbeatService {
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
// We collect pipe meta in every 100 heartbeat loop
heartbeatReq.setNeedPipeMetaList(
- heartbeatCounter.get()
- %
PipeConfig.getInstance().getHeartbeatLoopCyclesForCollectingPipeMeta()
- == 0);
+ !PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled()
+ && heartbeatCounter.get()
+ % PipeConfig.getInstance()
+ .getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
+ == 0);
if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
new file mode 100644
index 00000000000..86347785381
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PipeHeartbeatScheduler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatScheduler.class);
+
+ private static final boolean IS_SEPERATED_PIPE_HEARTBEAT_ENABLED =
+ PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
+ private static final long HEARTBEAT_INTERVAL_SECONDS =
+
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+
+ private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_RUNTIME_HEARTBEAT.getName());
+
+ private final ConfigManager configManager;
+ private final PipeHeartbeatParser pipeHeartbeatParser;
+
+ private Future<?> heartbeatFuture;
+
+ PipeHeartbeatScheduler(ConfigManager configManager) {
+ this.configManager = configManager;
+ this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
+ }
+
+ public synchronized void start() {
+ if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture == null) {
+ heartbeatFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ HEARTBEAT_EXECUTOR,
+ this::heartbeat,
+ HEARTBEAT_INTERVAL_SECONDS,
+ HEARTBEAT_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ LOGGER.info("PipeHeartbeat is started successfully.");
+ }
+ }
+
+ private synchronized void heartbeat() {
+ if
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
{
+ return;
+ }
+
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPipeHeartbeatReq request = new
TPipeHeartbeatReq(System.currentTimeMillis());
+ LOGGER.info(String.format("Collecting pipe heartbeat %s from data nodes",
request.heartbeatId));
+
+ final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp>
clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ clientHandler
+ .getResponseMap()
+ .forEach(
+ (dataNodeId, resp) ->
+ pipeHeartbeatParser.parseHeartbeat(dataNodeId,
resp.getPipeMetaList()));
+ }
+
+ public synchronized void stop() {
+ if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
+ heartbeatFuture.cancel(false);
+ heartbeatFuture = null;
+ LOGGER.info("PipeHeartbeat is stopped successfully.");
+ }
+ }
+
+ public void parseHeartbeat(int dataNodeId, List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
+ pipeHeartbeatParser.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 1ea3cb00c2e..4d1f728714d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -61,13 +61,16 @@ public class PipeLeaderChangeHandler implements
IClusterStatusSubscriber {
if
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
final String databaseName =
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
+ // pipe only collect user's data, filter metric database here.
if (databaseName != null &&
!databaseName.equals(IoTDBConfig.SYSTEM_DATABASE)) {
- // pipe only collect user's data, filter metric database
here.
- dataRegionGroupToOldAndNewLeaderPairMap.put(
- regionGroupId,
- new Pair<>( // null or -1 means empty origin leader
- pair.left == null ? -1 : pair.left,
- pair.right == null ? -1 : pair.right));
+ // null or -1 means empty origin leader
+ final int oldLeaderDataNodeId = (pair.left == null ? -1 :
pair.left);
+ final int newLeaderDataNodeId = (pair.right == null ? -1 :
pair.right);
+
+ if (oldLeaderDataNodeId != newLeaderDataNodeId) {
+ dataRegionGroupToOldAndNewLeaderPairMap.put(
+ regionGroupId, new Pair<>(oldLeaderDataNodeId,
newLeaderDataNodeId));
+ }
}
}
});
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 59ed8e5056c..87a51eebe8f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -41,15 +41,15 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
private final ExecutorService procedureSubmitter;
private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
- private final PipeHeartbeatParser pipeHeartbeatParser;
private final PipeMetaSyncer pipeMetaSyncer;
+ private final PipeHeartbeatScheduler pipeHeartbeatScheduler;
public PipeRuntimeCoordinator(ConfigManager configManager) {
if (procedureSubmitterHolder.get() == null) {
synchronized (PipeRuntimeCoordinator.class) {
if (procedureSubmitterHolder.get() == null) {
procedureSubmitterHolder.set(
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName()));
}
}
@@ -57,8 +57,8 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
procedureSubmitter = procedureSubmitterHolder.get();
pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
- pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
pipeMetaSyncer = new PipeMetaSyncer(configManager);
+ pipeHeartbeatScheduler = new PipeHeartbeatScheduler(configManager);
}
public ExecutorService getProcedureSubmitter() {
@@ -75,11 +75,6 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
pipeLeaderChangeHandler.onRegionGroupLeaderChanged(event);
}
- public void parseHeartbeat(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- pipeHeartbeatParser.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
- }
-
public void startPipeMetaSync() {
pipeMetaSyncer.start();
}
@@ -87,4 +82,17 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void stopPipeMetaSync() {
pipeMetaSyncer.stop();
}
+
+ public void startPipeHeartbeat() {
+ pipeHeartbeatScheduler.start();
+ }
+
+ public void stopPipeHeartbeat() {
+ pipeHeartbeatScheduler.stop();
+ }
+
+ public void parseHeartbeat(
+ int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
+ pipeHeartbeatScheduler.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 46940fc239b..728e3f30792 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,7 +54,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskInfo.class);
private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin";
- private final ReentrantLock pipeTaskInfoLock = new ReentrantLock();
+ private final ReentrantLock pipeTaskInfoLock = new ReentrantLock(true);
private final PipeMetaKeeper pipeMetaKeeper;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 76b2cd2149e..4cab4b62642 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -96,7 +96,7 @@ public class ConfigNodeProcedureEnv {
/** pipe operation lock */
private final LockQueue pipeLock = new LockQueue();
- private final ReentrantLock schedulerLock = new ReentrantLock();
+ private final ReentrantLock schedulerLock = new ReentrantLock(true);
private final ConfigManager configManager;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index c07982381cd..33185518dbf 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -124,28 +124,14 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv
env) {
LOGGER.info("PipeHandleLeaderChangeProcedure:
rollbackFromHandleOnConfigNodes");
- final Map<TConsensusGroupId, Integer>
oldDataRegionGroupIdToLeaderDataRegionIdMap =
- new HashMap<>();
- dataRegionGroupToOldAndNewLeaderPairMap.forEach(
- (regionGroupId, oldNewLeaderPair) ->
- oldDataRegionGroupIdToLeaderDataRegionIdMap.put(
- regionGroupId, oldNewLeaderPair.getLeft()));
-
- final PipeHandleLeaderChangePlan pipeHandleLeaderChangePlan =
- new
PipeHandleLeaderChangePlan(oldDataRegionGroupIdToLeaderDataRegionIdMap);
-
- final ConsensusWriteResponse response =
-
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
- if (!response.isSuccessful()) {
- throw new PipeException(response.getErrorMessage());
- }
+ // nothing to do
}
@Override
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure:
rollbackFromCreateOnDataNodes");
- pushPipeMetaToDataNodesIgnoreException(env);
+ // nothing to do
}
@Override
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 43d26a03835..0deb7b78362 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -265,6 +265,14 @@ struct THeartbeatResp {
10: optional list<binary> pipeMetaList
}
+struct TPipeHeartbeatReq {
+ 1: required i64 heartbeatId
+}
+
+struct TPipeHeartbeatResp {
+ 1: required list<binary> pipeMetaList
+}
+
enum TSchemaLimitLevel{
DEVICE,
TIMESERIES
@@ -797,6 +805,11 @@ service IDataNodeRPCService {
*/
common.TSStatus pushPipeMeta(TPushPipeMetaReq req)
+ /**
+ * ConfigNode will ask DataNode for pipe meta in every few seconds
+ **/
+ TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
+
/**
* Execute CQ on DataNode
*/
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 0bd89a4a617..f314a1c3579 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -977,8 +977,12 @@ cluster_name=defaultCluster
# The size of the pending queue for the PipeConnector to store the events.
# pipe_connector_pending_queue_size=1024
-# The number of heartbeat loop cycles before collecting pipe meta once
-# pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100
+# True if the pipe heartbeat is seperated from the cluster's heartbeat, false
the pipe heartbeat is
+# merged with the cluster's heartbeat.
+# pipe_heartbeat_seperated_mode_enabled=true
+
+# The interval time between the heartbeat that collecting pipe meta (in
seconds).
+# pipe_heartbeat_interval_seconds_for_collecting_pipe_meta=100
# The initial delay before starting the PipeMetaSyncer service.
# pipe_meta_syncer_initial_sync_delay_minutes=3
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e9a136a7b08..40eca81c898 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -108,6 +108,7 @@ public enum ThreadName {
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
+ PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 7177c043878..f84931263ba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -160,7 +160,8 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 1024;
- private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
+ private boolean isSeperatedPipeHeartbeatEnabled = true;
+ private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
private long pipeMetaSyncerSyncIntervalMinutes = 3;
@@ -499,14 +500,22 @@ public class CommonConfig {
this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
}
- public int getPipeHeartbeatLoopCyclesForCollectingPipeMeta() {
- return pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+ public boolean isSeperatedPipeHeartbeatEnabled() {
+ return isSeperatedPipeHeartbeatEnabled;
}
- public void setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
- int pipeHeartbeatLoopCyclesForCollectingPipeMeta) {
- this.pipeHeartbeatLoopCyclesForCollectingPipeMeta =
- pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+ public void setSeperatedPipeHeartbeatEnabled(boolean
isSeperatedPipeHeartbeatEnabled) {
+ this.isSeperatedPipeHeartbeatEnabled = isSeperatedPipeHeartbeatEnabled;
+ }
+
+ public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() {
+ return pipeHeartbeatIntervalSecondsForCollectingPipeMeta;
+ }
+
+ public void setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+ int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+ this.pipeHeartbeatIntervalSecondsForCollectingPipeMeta =
+ pipeHeartbeatIntervalSecondsForCollectingPipeMeta;
}
public long getPipeMetaSyncerInitialSyncDelayMinutes() {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index a8058a549b7..be03167bf1d 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -304,11 +304,16 @@ public class CommonDescriptor {
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
- config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+ config.setSeperatedPipeHeartbeatEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_heartbeat_seperated_mode_enabled",
+ String.valueOf(config.isSeperatedPipeHeartbeatEnabled()))));
+ config.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
Integer.parseInt(
properties.getProperty(
- "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
-
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
+ "pipe_heartbeat_interval_seconds_for_collecting_pipe_meta",
+
String.valueOf(config.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()))));
config.setPipeMetaSyncerInitialSyncDelayMinutes(
Long.parseLong(
properties.getProperty(
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 077048ac11e..b6ffc2238c6 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -87,8 +87,12 @@ public class PipeConfig {
/////////////////////////////// Meta Consistency
///////////////////////////////
- public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
- return COMMON_CONFIG.getPipeHeartbeatLoopCyclesForCollectingPipeMeta();
+ public boolean isSeperatedPipeHeartbeatEnabled() {
+ return COMMON_CONFIG.isSeperatedPipeHeartbeatEnabled();
+ }
+
+ public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() {
+ return
COMMON_CONFIG.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
}
public long getPipeMetaSyncerInitialSyncDelayMinutes() {
@@ -129,9 +133,10 @@ public class PipeConfig {
LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());
+ LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
- "HeartbeatLoopCyclesForCollectingPipeMeta: {}",
- getHeartbeatLoopCyclesForCollectingPipeMeta());
+ "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
+ getPipeHeartbeatIntervalSecondsForCollectingPipeMeta());
LOGGER.info(
"PipeMetaSyncerInitialSyncDelayMinutes: {}",
getPipeMetaSyncerInitialSyncDelayMinutes());
LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}",
getPipeMetaSyncerSyncIntervalMinutes());
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index bbbbd3a7d23..69e7425c00b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
@@ -633,4 +635,24 @@ public class PipeTaskAgent {
}
resp.setPipeMetaList(pipeMetaBinaryList);
}
+
+ public synchronized void collectPipeMetaList(TPipeHeartbeatReq req,
TPipeHeartbeatResp resp)
+ throws TException {
+ // do nothing if data node is removing or removed, or request does not
need pipe meta list
+ if (PipeAgent.runtime().isShutdown()) {
+ return;
+ }
+ LOGGER.info("Received pipe heartbeat request {} from config node.",
req.heartbeatId);
+
+ final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ try {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ pipeMetaBinaryList.add(pipeMeta.serialize());
+ LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+ }
+ } catch (IOException e) {
+ throw new TException(e);
+ }
+ resp.setPipeMetaList(pipeMetaBinaryList);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b31fd026a11..66d6f235910 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -174,6 +174,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -943,6 +945,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
}
+ @Override
+ public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws
TException {
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ PipeAgent.task().collectPipeMetaList(req, resp);
+ return resp;
+ }
+
private TSStatus executeInternalSchemaTask(
List<TConsensusGroupId> consensusGroupIdList,
Function<TConsensusGroupId, TSStatus> executeOnOneRegion) {