This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1ad56c78138 Pipe: Enabled waiting for pipes to finish & progress index
persist to config node in shutdown hook (#15896)
1ad56c78138 is described below
commit 1ad56c78138d0c4b73a608eb4ada56ce61e5aba0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 11:47:25 2025 +0800
Pipe: Enabled waiting for pipes to finish & progress index persist to
config node in shutdown hook (#15896)
* persist in shutdown hook
* Update PipeTaskAgent.java
* Update PipeConfigNodeTaskAgent.java
* Update DataNodeShutdownHook.java
* Fix
* Fix2
* Fix3
* Update PipeHeartbeatScheduler.java
---
.../rpc/DataNodeAsyncRequestRPCHandler.java | 2 +-
.../handlers/rpc/PipeHeartbeatRPCHandler.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 18 ++++++++
.../apache/iotdb/confignode/manager/IManager.java | 3 ++
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 2 +-
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +++
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 48 +++++++++++++++++-----
.../subtask/processor/PipeProcessorSubtask.java | 4 +-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 6 +--
.../statement/PipeStatementInsertionEvent.java | 6 +--
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 6 +--
.../common/tablet/PipeRawTabletInsertionEvent.java | 6 +--
.../common/tsfile/PipeTsFileInsertionEvent.java | 6 +--
.../dataregion/IoTDBDataRegionExtractor.java | 4 +-
.../PipeRealtimeDataRegionHybridExtractor.java | 5 +--
.../schemaregion/IoTDBSchemaRegionExtractor.java | 4 +-
.../iotdb/db/pipe/metric/PipeDataNodeMetrics.java | 6 +--
.../PipeDataNodeRemainingEventAndTimeOperator.java | 18 +++++++-
...ics.java => PipeDataNodeSinglePipeMetrics.java} | 15 ++++---
.../iotdb/db/protocol/client/ConfigNodeClient.java | 8 ++++
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../InformationSchemaContentSupplierFactory.java | 4 +-
.../execution/config/sys/pipe/ShowPipeTask.java | 4 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 36 +++++++++++++++-
.../apache/iotdb/commons/conf/CommonConfig.java | 14 +++++++
.../commons/pipe/agent/task/PipeTaskAgent.java | 2 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++
.../thrift-commons/src/main/thrift/common.thrift | 7 ++++
.../src/main/thrift/confignode.thrift | 3 ++
.../src/main/thrift/datanode.thrift | 9 +---
32 files changed, 202 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 85ce1253c13..46b9f44d8cd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async.handlers.rpc;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
@@ -33,7 +34,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
index 569424afbff..e5fa157961d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.confignode.client.async.handlers.rpc;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a95b945a73c..fdc0ccaeb31 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
@@ -2917,6 +2918,23 @@ public class ConfigManager implements IManager {
: new TFetchTableResp(status);
}
+ @Override
+ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp
resp) {
+ final TSStatus status = confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ pipeManager
+ .getPipeRuntimeCoordinator()
+ .parseHeartbeat(
+ dataNodeId,
+ resp.getPipeMetaList(),
+ resp.getPipeCompletedList(),
+ resp.getPipeRemainingEventCountList(),
+ resp.getPipeRemainingTimeList());
+ return StatusUtils.OK;
+ }
+
@Override
public DataSet registerAINode(TAINodeRegisterReq req) {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index db5cbe11e00..d47d9375c5e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -890,4 +891,6 @@ public interface IManager {
TDescTable4InformationSchemaResp describeTable4InformationSchema();
TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
+
+ TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 0e90174a7f2..b115ba25b4a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager.pipe.agent.task;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -35,7 +36,6 @@ import
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeConfigNodeRe
import
org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 3533b40158d..120ddb65f86 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -31,7 +32,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index af8e7e593b4..e8be7f574c8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -1415,4 +1416,9 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
public TSStatus createTableView(final TCreateTableViewReq req) {
return configManager.createTableView(req);
}
+
+ @Override
+ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp
resp) {
+ return configManager.pushHeartbeat(dataNodeId, resp);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9d00d5a1206..8eddc2987ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.agent.task;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -49,10 +51,13 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -63,10 +68,10 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
@@ -322,13 +327,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
@Override
protected void thawRate(final String pipeName, final long creationTime) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName +
"_" + creationTime);
+ PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" +
creationTime);
}
@Override
protected void freezeRate(final String pipeName, final long creationTime) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .freezeRate(pipeName + "_" + creationTime);
+ PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" +
creationTime);
}
@Override
@@ -339,7 +343,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+ PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
return true;
}
@@ -367,7 +371,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+ PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
// When the pipe contains no pipe tasks, there is no corresponding
prefetching queue for the
// subscribed pipe, so the subscription needs to be manually marked as
completed.
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
@@ -461,7 +465,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
pipeCompletedList.add(isCompleted);
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -491,7 +495,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
protected void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws
TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
- if (PipeDataNodeAgent.runtime().isShutdown()) {
+ if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId !=
Long.MIN_VALUE) {
return;
}
LOGGER.info("Received pipe heartbeat request {} from config node.",
req.heartbeatId);
@@ -533,7 +537,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
pipeCompletedList.add(isCompleted);
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -842,7 +846,29 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Shutdown Logic /////////////////////////
- public void persistAllProgressIndexLocally() {
+ public void persistAllProgressIndex() {
+ persistAllProgressIndexLocally();
+ persistAllProgressIndex2ConfigNode();
+ }
+
+ private void persistAllProgressIndex2ConfigNode() {
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ // Send request to some API server
+ final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+ collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+ final TSStatus result =
+ configNodeClient.pushHeartbeat(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+ LOGGER.warn("Failed to persist progress index to configNode, status:
{}", result);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+
+ private void persistAllProgressIndexLocally() {
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
LOGGER.info(
"Pipe progress index persist disabled. Skipping persist all progress
index locally.");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 1f7262c0c16..a574712cbbd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,7 +33,7 @@ import
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -167,7 +167,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
pipeProcessor.process((TsFileInsertionEvent) event,
outputEventCollector);
}
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.markTsFileCollectInvocationCount(
pipeNameWithCreationTime,
outputEventCollector.getCollectInvocationCount());
} else if (event instanceof PipeHeartbeatEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 1339611e7bf..8c3c29c315b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
@@ -93,7 +93,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.increaseHeartbeatEventCount(pipeName, creationTime);
}
return true;
@@ -104,7 +104,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName, creationTime);
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
LOGGER.debug(this.toString());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
index 964f186a308..2befefc2863 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -89,7 +89,7 @@ public class PipeStatementInsertionEvent extends
PipeInsertionEvent
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlock, statement.ramBytesUsed() +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
@@ -98,7 +98,7 @@ public class PipeStatementInsertionEvent extends
PipeInsertionEvent
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b1518d3f4f1..c28dba1aec6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -33,7 +33,7 @@ import
org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -204,7 +204,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
try {
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.increaseInsertNodeEventCount(pipeName, creationTime);
PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
}
@@ -240,7 +240,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseInsertNodeEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5f29402bd36..349bfc2f6b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -33,7 +33,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventP
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -235,7 +235,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
allocatedMemoryBlock,
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
@@ -244,7 +244,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index aa6e90054f8..f216ff51077 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -36,7 +36,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPo
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParserProvider;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -306,7 +306,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
return false;
} finally {
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.increaseTsFileEventCount(pipeName, creationTime);
}
}
@@ -330,7 +330,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
return false;
} finally {
if (Objects.nonNull(pipeName)) {
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseTsFileEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 3ee2d655ec9..7154d901748 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -37,7 +37,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -586,7 +586,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// register metric after generating taskID
PipeDataRegionExtractorMetrics.getInstance().register(this);
PipeTsFileToTabletsMetrics.getInstance().register(this);
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+ PipeDataNodeSinglePipeMetrics.getInstance().register(this);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 92e9ad64f5d..ae632237650 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -31,7 +31,7 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -238,8 +238,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private boolean mayRemainingInsertNodeEventExceedLimit(final
PipeRealtimeEvent event) {
final boolean mayRemainingInsertEventExceedLimit =
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .mayRemainingInsertEventExceedLimit(pipeID);
+
PipeDataNodeSinglePipeMetrics.getInstance().mayRemainingInsertEventExceedLimit(pipeID);
if (mayRemainingInsertEventExceedLimit &&
event.mayExtractorUseTablets(this)) {
logByLogManager(
l ->
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
index 7f5b8df6811..0df7fb21d73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
@@ -36,7 +36,7 @@ import
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionExtractorMetrics;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -98,7 +98,7 @@ public class IoTDBSchemaRegionExtractor extends
IoTDBNonDataRegionExtractor {
listenedTypeSet =
SchemaRegionListeningFilter.parseListeningPlanTypeSet(parameters);
PipeSchemaRegionExtractorMetrics.getInstance().register(this);
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+ PipeDataNodeSinglePipeMetrics.getInstance().register(this);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 3f03ce580fd..dd3873feaf4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
@@ -53,7 +53,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionListenerMetrics.getInstance().bindTo(metricService);
PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService);
PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
+ PipeDataNodeSinglePipeMetrics.getInstance().bindTo(metricService);
PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
}
@@ -71,7 +71,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionListenerMetrics.getInstance().unbindFrom(metricService);
PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService);
PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
+ PipeDataNodeSinglePipeMetrics.getInstance().unbindFrom(metricService);
PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 86368acf353..73d58285345 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
+public class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
// Calculate from schema region extractors directly for it requires less
computation
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
@@ -107,6 +107,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
return insertNodeEventCountEMA.insertNodeEMAValue;
}
+ public long getRemainingNonHeartbeatEvents() {
+ final long remainingEvents =
+ tsfileEventCount.get()
+ + rawTabletEventCount.get()
+ + insertNodeEventCount.get()
+ + schemaRegionExtractors.stream()
+ .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
+ .reduce(Long::sum)
+ .orElse(0L);
+
+ // There are cases where the indicator is negative. For example, after the
Pipe is restarted,
+ // the Processor SubTask is still collecting Events, resulting in a
negative count. This
+ // situation cannot be avoided because the Pipe may be restarted
internally.
+ return remainingEvents >= 0 ? remainingEvents : 0;
+ }
+
long getRemainingEvents() {
final long remainingEvents =
tsfileEventCount.get()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 354a980edfd..677d758a162 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -42,15 +42,14 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
+public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeMetrics.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeSinglePipeMetrics.class);
@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;
- private final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
+ public final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>();
private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
@@ -381,19 +380,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
- private static final PipeDataNodeRemainingEventAndTimeMetrics INSTANCE =
- new PipeDataNodeRemainingEventAndTimeMetrics();
+ private static final PipeDataNodeSinglePipeMetrics INSTANCE =
+ new PipeDataNodeSinglePipeMetrics();
private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
// Empty constructor
}
}
- public static PipeDataNodeRemainingEventAndTimeMetrics getInstance() {
+ public static PipeDataNodeSinglePipeMetrics getInstance() {
return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
}
- private PipeDataNodeRemainingEventAndTimeMetrics() {
+ private PipeDataNodeSinglePipeMetrics() {
PipeEventCommitManager.getInstance().setCommitRateMarker(this::markRegionCommit);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 9d0dcd22f91..8685b380b47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -1418,6 +1419,13 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.createTableView(req), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp
resp)
+ throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.pushHeartbeat(dataNodeId, resp), status ->
!updateConfigNodeLeader(status));
+ }
+
public static class Factory extends ThriftClientFactory<ConfigRegionId,
ConfigNodeClient> {
public Factory(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 82608ecf23a..226b26093ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TLoadSample;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSender;
import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -244,7 +245,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import
org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 85854bdb2dd..b011a49b077 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -59,7 +59,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TTableInfo;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -566,7 +566,7 @@ public class InformationSchemaContentSupplierFactory {
if (remainingEventCount == -1 && remainingTime == -1) {
final Pair<Long, Double> remainingEventAndTime =
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(tPipeInfo.getId(),
tPipeInfo.getCreationTime());
remainingEventCount = remainingEventAndTime.getLeft();
remainingTime = remainingEventAndTime.getRight();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 0dfd784e006..76b8680c378 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
-import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
@@ -105,7 +105,7 @@ public class ShowPipeTask implements IConfigTask {
if (remainingEventCount == -1 && remainingTime == -1) {
final Pair<Long, Double> remainingEventAndTime =
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ PipeDataNodeSinglePipeMetrics.getInstance()
.getRemainingEventAndTime(tPipeInfo.getId(),
tPipeInfo.getCreationTime());
remainingEventCount = remainingEventAndTime.getLeft();
remainingTime = remainingEventAndTime.getRight();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index efa628bbcbf..11faf429732 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -25,12 +25,15 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
+import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -44,6 +47,8 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class DataNodeShutdownHook extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeShutdownHook.class);
@@ -118,8 +123,37 @@ public class DataNodeShutdownHook extends Thread {
triggerSnapshotForAllDataRegion();
}
+ long startTime = System.currentTimeMillis();
+ if (PipeDataNodeAgent.task().getPipeCount() != 0) {
+ for (Map.Entry<String, PipeDataNodeRemainingEventAndTimeOperator> entry :
+
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
{
+ boolean timeout = false;
+ while (true) {
+ if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+ logger.info(
+ "Successfully waited for pipe {} to finish.",
entry.getValue().getPipeName());
+ break;
+ }
+ if (System.currentTimeMillis() - startTime
+ > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) {
+ timeout = true;
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.info("Interrupted when waiting for pipe to finish");
+ }
+ }
+ if (timeout) {
+ logger.info("Timed out when waiting for pipes to finish, will
break");
+ break;
+ }
+ }
+ }
// Persist progress index before shutdown to accurate recovery after
restart
- PipeDataNodeAgent.task().persistAllProgressIndexLocally();
+ PipeDataNodeAgent.task().persistAllProgressIndex();
// Shutdown all consensus pipe's receiver
PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
// Shutdown pipe progressIndex background service
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5d79b138067..b3a55435790 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -228,6 +228,8 @@ public class CommonConfig {
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
+ private long pipeMaxWaitFinishTime = 10 * 1000;
+
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50;
// 50B
private int pipeExtractorMatcherCacheSize = 1024;
@@ -1371,6 +1373,18 @@ public class CommonConfig {
pipeSubtaskExecutorForcedRestartIntervalMs);
}
+ public long getPipeMaxWaitFinishTime() {
+ return pipeMaxWaitFinishTime;
+ }
+
+ public void setPipeMaxWaitFinishTime(long pipeMaxWaitFinishTime) {
+ if (this.pipeMaxWaitFinishTime == pipeMaxWaitFinishTime) {
+ return;
+ }
+ this.pipeMaxWaitFinishTime = pipeMaxWaitFinishTime;
+ logger.info("pipeMaxWaitFinishTime is set to {}.", pipeMaxWaitFinishTime);
+ }
+
public int getPipeRealTimeQueuePollTsFileThreshold() {
return pipeRealTimeQueuePollTsFileThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index a4b9dfebe93..c61f4dc95fd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.agent.task;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.commons.exception.IllegalPathException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
@@ -35,7 +36,6 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index dc2b4350960..439dfec55f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -147,6 +147,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
}
+ public long getPipeMaxWaitFinishTime() {
+ return COMMON_CONFIG.getPipeMaxWaitFinishTime();
+ }
+
/////////////////////////////// Extractor ///////////////////////////////
public int getPipeExtractorAssignerDisruptorRingBufferSize() {
@@ -523,6 +527,7 @@ public class PipeConfig {
LOGGER.info(
"PipeSubtaskExecutorForcedRestartIntervalMs: {}",
getPipeSubtaskExecutorForcedRestartIntervalMs());
+ LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
LOGGER.info(
"PipeExtractorAssignerDisruptorRingBufferSize: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index b754a8581aa..db2ff49bbb0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -596,6 +596,11 @@ public class PipeDescriptor {
"pipe_threshold_allocation_strategy_high_usage_threshold",
String.valueOf(
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
+
+ config.setPipeMaxWaitFinishTime(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_max_wait_finish_time",
String.valueOf(config.getPipeMaxWaitFinishTime()))));
}
public static void loadPipeExternalConfig(
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index c6f881f06cb..3287f35b9bb 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -196,6 +196,13 @@ struct TSetThrottleQuotaReq {
2: required TThrottleQuota throttleQuota
}
+struct TPipeHeartbeatResp {
+ 1: required list<binary> pipeMetaList
+ 2: optional list<bool> pipeCompletedList
+ 3: optional list<i64> pipeRemainingEventCountList
+ 4: optional list<double> pipeRemainingTimeList
+}
+
struct TLicense {
1: required i64 licenseIssueTimestamp
2: required i64 expireTimestamp
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 3ce020c731b..bf91b410459 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -2011,6 +2011,9 @@ service IConfigNodeRPCService {
/** Get throttle quota information */
TThrottleQuotaResp getThrottleQuota()
+ /** Push heartbeat in shutdown */
+ common.TSStatus pushHeartbeat(i32 dataNodeId, common.TPipeHeartbeatResp resp)
+
// ======================================================
// Table Or View
// ======================================================
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1b8ad67de5c..64406b005d9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -316,13 +316,6 @@ struct TPipeHeartbeatReq {
1: required i64 heartbeatId
}
-struct TPipeHeartbeatResp {
- 1: required list<binary> pipeMetaList
- 2: optional list<bool> pipeCompletedList
- 3: optional list<i64> pipeRemainingEventCountList
- 4: optional list<double> pipeRemainingTimeList
-}
-
enum TSchemaLimitLevel{
DEVICE,
TIMESERIES
@@ -1118,7 +1111,7 @@ service IDataNodeRPCService {
/**
* ConfigNode will ask DataNode for pipe meta in every few seconds
**/
- TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
+ common.TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
/**
* Execute CQ on DataNode