This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40934ddc572 Pipe: Add completion signal to historical events & allow
all data regions' completion signal to drop the pipe (#12490)
40934ddc572 is described below
commit 40934ddc5726d5b13787450f072951020dcdfa61
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 23 12:01:59 2024 +0800
Pipe: Add completion signal to historical events & allow all data regions'
completion signal to drop the pipe (#12490)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 80 +++++++++++++++
.../iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java | 4 +-
.../heartbeat/DataNodeHeartbeatHandler.java | 3 +-
.../runtime/PipeRuntimeCoordinator.java | 18 ++--
.../runtime/heartbeat/PipeHeartbeat.java | 61 ++++++++++++
.../{ => heartbeat}/PipeHeartbeatParser.java | 51 +++++-----
.../{ => heartbeat}/PipeHeartbeatScheduler.java | 20 ++--
.../confignode/persistence/pipe/PipeTaskInfo.java | 89 ++++++++++-------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 107 ++++++++++++++++++++-
.../event/common/terminate/PipeTerminateEvent.java | 94 ++++++++++++++++++
.../PipeHistoricalDataRegionTsFileExtractor.java | 56 +++++++----
.../iotdb/db/pipe/task/PipeDataNodeTask.java | 20 +++-
.../config/constant/PipeExtractorConstant.java | 6 ++
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 38 +++++---
.../commons/pipe/task/meta/PipeTemporaryMeta.java | 60 ++++++++++++
.../src/main/thrift/datanode.thrift | 2 +
16 files changed, 593 insertions(+), 116 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
new file mode 100644
index 00000000000..b223a245d0a
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
+ @Test
+ public void testAutoDropInHistoricalTransfer() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+
extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed",
"true");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "show pipes",
+
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
+ Collections.emptySet());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
index 7a8ff6c3f15..4515536c253 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
@@ -229,7 +229,7 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualManualIT {
senderEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
- "insert into root.ln.wf01.wt01(time, status) values(0, 1)",
+ "insert into root.ln.wf01.wt01(time, status) values(0, true)",
"flush"))) {
return;
}
@@ -240,7 +240,7 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualManualIT {
receiverEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status1 with
datatype=BOOLEAN,encoding=PLAIN",
- "insert into root.ln.wf01.wt01(time, status1) values(0, 1)",
+ "insert into root.ln.wf01.wt01(time, status1) values(0, true)",
"flush"))) {
return;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 49981d72834..5829fd07a56 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -130,7 +130,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
if (heartbeatResp.getPipeMetaList() != null) {
- pipeRuntimeCoordinator.parseHeartbeat(nodeId,
heartbeatResp.getPipeMetaList());
+ pipeRuntimeCoordinator.parseHeartbeat(
+ nodeId, heartbeatResp.getPipeMetaList(),
heartbeatResp.getPipeCompletedList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 8d9644eae5f..97f893f4eae 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatist
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
import javax.validation.constraints.NotNull;
@@ -45,7 +47,7 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
private final PipeMetaSyncer pipeMetaSyncer;
private final PipeHeartbeatScheduler pipeHeartbeatScheduler;
- public PipeRuntimeCoordinator(ConfigManager configManager) {
+ public PipeRuntimeCoordinator(final ConfigManager configManager) {
if (procedureSubmitterHolder.get() == null) {
synchronized (PipeRuntimeCoordinator.class) {
if (procedureSubmitterHolder.get() == null) {
@@ -71,18 +73,18 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
}
@Override
- public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+ public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
// Do nothing
}
@Override
- public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
+ public void onRegionGroupStatisticsChanged(final
RegionGroupStatisticsChangeEvent event) {
// Do nothing
}
@Override
public synchronized void onConsensusGroupStatisticsChanged(
- ConsensusGroupStatisticsChangeEvent event) {
+ final ConsensusGroupStatisticsChangeEvent event) {
pipeLeaderChangeHandler.onConsensusGroupStatisticsChanged(event);
}
@@ -103,7 +105,11 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
}
public void parseHeartbeat(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- pipeHeartbeatScheduler.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
+ final int dataNodeId,
+ @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
+ /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+ pipeHeartbeatScheduler.parseHeartbeat(
+ dataNodeId,
+ new PipeHeartbeat(pipeMetaByteBufferListFromDataNode,
pipeCompletedListFromAgent));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
new file mode 100644
index 00000000000..203ba96ed44
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+
+import javax.validation.constraints.NotNull;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeHeartbeat {
+
+ private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
+ private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
+
+ public PipeHeartbeat(
+ @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+ /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+ for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
+ final PipeMeta pipeMeta =
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
+ pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
+ isCompletedMap.put(
+ pipeMeta.getStaticMeta(),
+ Objects.nonNull(pipeCompletedListFromAgent) &&
pipeCompletedListFromAgent.get(i));
+ }
+ }
+
+ public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
+ return pipeMetaMap.get(pipeStaticMeta);
+ }
+
+ public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
+ return isCompletedMap.get(pipeStaticMeta);
+ }
+
+ public boolean isEmpty() {
+ return pipeMetaMap.isEmpty();
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
similarity index 88%
rename from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
rename to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 3bda79f0dd5..68ea140dda9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -17,16 +17,16 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
@@ -34,12 +34,8 @@ import
org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.constraints.NotNull;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +51,7 @@ public class PipeHeartbeatParser {
private final AtomicBoolean needWriteConsensusOnConfigNodes;
private final AtomicBoolean needPushPipeMetaToDataNodes;
- PipeHeartbeatParser(ConfigManager configManager) {
+ PipeHeartbeatParser(final ConfigManager configManager) {
this.configManager = configManager;
heartbeatCounter = 0;
@@ -65,8 +61,7 @@ public class PipeHeartbeatParser {
needPushPipeMetaToDataNodes = new AtomicBoolean(false);
}
- public synchronized void parseHeartbeat(
- int nodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
+ synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat
pipeHeartbeat) {
final long heartbeatCount = ++heartbeatCounter;
final AtomicBoolean canSubmitHandleMetaChangeProcedure = new
AtomicBoolean(false);
@@ -87,7 +82,7 @@ public class PipeHeartbeatParser {
}
}
- if (pipeMetaByteBufferListFromAgent.isEmpty()
+ if (pipeHeartbeat.isEmpty()
&& !(canSubmitHandleMetaChangeProcedure.get()
&& (needWriteConsensusOnConfigNodes.get() ||
needPushPipeMetaToDataNodes.get()))) {
return;
@@ -108,9 +103,8 @@ public class PipeHeartbeatParser {
}
try {
- if (!pipeMetaByteBufferListFromAgent.isEmpty()) {
- parseHeartbeatAndSaveMetaChangeLocally(
- pipeTaskInfo, nodeId, pipeMetaByteBufferListFromAgent);
+ if (!pipeHeartbeat.isEmpty()) {
+ parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId,
pipeHeartbeat);
}
if (canSubmitHandleMetaChangeProcedure.get()
@@ -134,16 +128,10 @@ public class PipeHeartbeatParser {
private void parseHeartbeatAndSaveMetaChangeLocally(
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
- @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
- final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromAgent = new HashMap<>();
- for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromAgent) {
- final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
- pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta);
- }
-
+ final PipeHeartbeat pipeHeartbeat) {
for (final PipeMeta pipeMetaFromCoordinator :
pipeTaskInfo.get().getPipeMetaList()) {
final PipeMeta pipeMetaFromAgent =
- pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta());
+ pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
if (pipeMetaFromAgent == null) {
LOGGER.info(
"PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
@@ -152,6 +140,25 @@ public class PipeHeartbeatParser {
continue;
}
+ // Remove completed pipes
+ final Boolean isPipeCompletedFromAgent =
+ pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
+ if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
+ final PipeTemporaryMeta temporaryMeta =
pipeMetaFromCoordinator.getTemporaryMeta();
+
+ temporaryMeta.markDataNodeCompleted(nodeId);
+
+ final Set<Integer> uncompletedDataNodeIds =
+
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
+
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
+ if (uncompletedDataNodeIds.isEmpty()) {
+
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
+ needWriteConsensusOnConfigNodes.set(true);
+ needPushPipeMetaToDataNodes.set(true);
+ continue;
+ }
+ }
+
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
similarity index 90%
rename from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
rename to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 65217818e7d..462b6a017a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -36,8 +36,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,7 +59,7 @@ public class PipeHeartbeatScheduler {
private Future<?> heartbeatFuture;
- PipeHeartbeatScheduler(ConfigManager configManager) {
+ public PipeHeartbeatScheduler(final ConfigManager configManager) {
this.configManager = configManager;
this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
}
@@ -90,7 +88,7 @@ public class PipeHeartbeatScheduler {
return;
}
- // data node heartbeat
+ // Data node heartbeat
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
final TPipeHeartbeatReq request = new
TPipeHeartbeatReq(System.currentTimeMillis());
@@ -109,7 +107,9 @@ public class PipeHeartbeatScheduler {
.getResponseMap()
.forEach(
(dataNodeId, resp) ->
- pipeHeartbeatParser.parseHeartbeat(dataNodeId,
resp.getPipeMetaList()));
+ pipeHeartbeatParser.parseHeartbeat(
+ dataNodeId,
+ new PipeHeartbeat(resp.getPipeMetaList(),
resp.getPipeCompletedList())));
// config node heartbeat
try {
@@ -117,8 +117,8 @@ public class PipeHeartbeatScheduler {
PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
pipeHeartbeatParser.parseHeartbeat(
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- configNodeResp.getPipeMetaList());
- } catch (Exception e) {
+ new PipeHeartbeat(configNodeResp.getPipeMetaList(), null));
+ } catch (final Exception e) {
LOGGER.warn("Failed to collect pipe meta list from config node task
agent", e);
}
}
@@ -131,7 +131,7 @@ public class PipeHeartbeatScheduler {
}
}
- public void parseHeartbeat(int dataNodeId, List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- pipeHeartbeatParser.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
+ public void parseHeartbeat(final int dataNodeId, final PipeHeartbeat
pipeHeartbeat) {
+ pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeHeartbeat);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index d41493762d2..201f2f049ed 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -143,7 +143,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
- public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws
PipeException {
+ public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
throws PipeException {
acquireReadLock();
try {
checkBeforeCreatePipeInternal(createPipeRequest);
@@ -166,7 +166,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq
alterPipeRequest)
+ public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq
alterPipeRequest)
throws PipeException {
acquireReadLock();
try {
@@ -176,7 +176,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq
alterPipeRequest)
+ private void checkAndUpdateRequestBeforeAlterPipeInternal(final
TAlterPipeReq alterPipeRequest)
throws PipeException {
if (!isPipeExisted(alterPipeRequest.getPipeName())) {
final String exceptionMessage =
@@ -229,7 +229,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public void checkBeforeStartPipe(String pipeName) throws PipeException {
+ public void checkBeforeStartPipe(final String pipeName) throws PipeException
{
acquireReadLock();
try {
checkBeforeStartPipeInternal(pipeName);
@@ -238,7 +238,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void checkBeforeStartPipeInternal(String pipeName) throws
PipeException {
+ private void checkBeforeStartPipeInternal(final String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe does not exist",
pipeName);
@@ -255,7 +255,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public void checkBeforeStopPipe(String pipeName) throws PipeException {
+ public void checkBeforeStopPipe(final String pipeName) throws PipeException {
acquireReadLock();
try {
checkBeforeStopPipeInternal(pipeName);
@@ -264,7 +264,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void checkBeforeStopPipeInternal(String pipeName) throws
PipeException {
+ private void checkBeforeStopPipeInternal(final String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe does not exist",
pipeName);
@@ -281,7 +281,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public void checkBeforeDropPipe(String pipeName) {
+ public void checkBeforeDropPipe(final String pipeName) {
acquireReadLock();
try {
checkBeforeDropPipeInternal(pipeName);
@@ -290,7 +290,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void checkBeforeDropPipeInternal(String pipeName) {
+ private void checkBeforeDropPipeInternal(final String pipeName) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Check before drop pipe {}, pipe exists: {}.", pipeName,
isPipeExisted(pipeName));
@@ -300,7 +300,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
// DO NOTHING HERE!
}
- public boolean isPipeExisted(String pipeName) {
+ public boolean isPipeExisted(final String pipeName) {
acquireReadLock();
try {
return pipeMetaKeeper.containsPipeMeta(pipeName);
@@ -309,7 +309,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private PipeStatus getPipeStatus(String pipeName) {
+ private PipeStatus getPipeStatus(final String pipeName) {
acquireReadLock();
try {
return
pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get();
@@ -318,7 +318,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public boolean isPipeRunning(String pipeName) {
+ public boolean isPipeRunning(final String pipeName) {
acquireReadLock();
try {
return pipeMetaKeeper.containsPipeMeta(pipeName)
@@ -328,7 +328,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public boolean isPipeStoppedByUser(String pipeName) {
+ public boolean isPipeStoppedByUser(final String pipeName) {
acquireReadLock();
try {
return pipeMetaKeeper.containsPipeMeta(pipeName)
@@ -341,7 +341,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
/////////////////////////////// Pipe Task Management
///////////////////////////////
- public TSStatus createPipe(CreatePipePlanV2 plan) {
+ public TSStatus createPipe(final CreatePipePlanV2 plan) {
acquireWriteLock();
try {
pipeMetaKeeper.addPipeMeta(
@@ -353,7 +353,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) {
+ public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) {
acquireWriteLock();
try {
if (plan.getSubPlans() == null || plan.getSubPlans().isEmpty()) {
@@ -364,7 +364,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
// We use sub-status to record the status of each subPlan
status.setSubStatus(new ArrayList<>());
- for (ConfigPhysicalPlan subPlan : plan.getSubPlans()) {
+ for (final ConfigPhysicalPlan subPlan : plan.getSubPlans()) {
try {
if (subPlan instanceof CreatePipePlanV2) {
createPipe((CreatePipePlanV2) subPlan);
@@ -379,7 +379,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
String.format("Unsupported subPlan type: %s",
subPlan.getClass().getName()));
}
status.getSubStatus().add(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- } catch (Exception e) {
+ } catch (final Exception e) {
// If one of the subPlan fails, we stop operating the rest of the
pipes
LOGGER.error("Failed to operate pipe", e);
status.setCode(TSStatusCode.PIPE_ERROR.getStatusCode());
@@ -399,7 +399,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public TSStatus alterPipe(AlterPipePlanV2 plan) {
+ public TSStatus alterPipe(final AlterPipePlanV2 plan) {
acquireWriteLock();
try {
pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
@@ -412,7 +412,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
acquireWriteLock();
try {
pipeMetaKeeper
@@ -426,7 +426,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public TSStatus dropPipe(DropPipePlanV2 plan) {
+ public TSStatus dropPipe(final DropPipePlanV2 plan) {
acquireWriteLock();
try {
pipeMetaKeeper.removePipeMeta(plan.getPipeName());
@@ -457,7 +457,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- public PipeMeta getPipeMetaByPipeName(String pipeName) {
+ public PipeMeta getPipeMetaByPipeName(final String pipeName) {
acquireReadLock();
try {
return pipeMetaKeeper.getPipeMetaByPipeName(pipeName);
@@ -478,7 +478,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
/////////////////////////////// Pipe Runtime Management
///////////////////////////////
/** Handle the region leader change event and update the pipe task meta
accordingly. */
- public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
+ public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) {
acquireWriteLock();
try {
return handleLeaderChangeInternal(plan);
@@ -487,7 +487,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan)
{
+ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan
plan) {
plan.getConsensusGroupId2NewLeaderIdMap()
.forEach(
(consensusGroupId, newLeader) ->
@@ -532,7 +532,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
* @param plan The plan containing all the {@link PipeMeta}s from leader
{@link ConfigNode}
* @return {@link TSStatusCode#SUCCESS_STATUS}
*/
- public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+ public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
acquireWriteLock();
try {
return handleMetaChangesInternal(plan);
@@ -541,7 +541,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private TSStatus handleMetaChangesInternal(PipeHandleMetaChangePlan plan) {
+ private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan
plan) {
LOGGER.info("Handling pipe meta changes ...");
pipeMetaKeeper.clear();
@@ -556,7 +556,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public boolean isStoppedByRuntimeException(String pipeName) {
+ public boolean isStoppedByRuntimeException(final String pipeName) {
acquireReadLock();
try {
return isStoppedByRuntimeExceptionInternal(pipeName);
@@ -565,7 +565,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private boolean isStoppedByRuntimeExceptionInternal(String pipeName) {
+ private boolean isStoppedByRuntimeExceptionInternal(final String pipeName) {
return pipeMetaKeeper.containsPipeMeta(pipeName)
&&
pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getIsStoppedByRuntimeException();
}
@@ -578,7 +578,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
*
* @param pipeName The name of the pipe to be clear exception
*/
- public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(String
pipeName) {
+ public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final
String pipeName) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
@@ -587,7 +587,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(String
pipeName) {
+ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
+ final String pipeName) {
if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
return;
}
@@ -616,7 +617,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
});
}
- public void setIsStoppedByRuntimeExceptionToFalse(String pipeName) {
+ public void setIsStoppedByRuntimeExceptionToFalse(final String pipeName) {
acquireWriteLock();
try {
setIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
@@ -625,7 +626,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
- private void setIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) {
+ private void setIsStoppedByRuntimeExceptionToFalseInternal(final String
pipeName) {
if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
return;
}
@@ -641,9 +642,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
* messages will then be updated to all the nodes through {@link
PipeHandleMetaChangeProcedure}.
*
* @param respMap The responseMap after pushing pipe meta
- * @return {@link true} if there are exceptions encountered
+ * @return {@code true} if there are exceptions encountered
*/
- public boolean recordDataNodePushPipeMetaExceptions(Map<Integer,
TPushPipeMetaResp> respMap) {
+ public boolean recordDataNodePushPipeMetaExceptions(
+ final Map<Integer, TPushPipeMetaResp> respMap) {
acquireWriteLock();
try {
return recordDataNodePushPipeMetaExceptionsInternal(respMap);
@@ -653,7 +655,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
private boolean recordDataNodePushPipeMetaExceptionsInternal(
- Map<Integer, TPushPipeMetaResp> respMap) {
+ final Map<Integer, TPushPipeMetaResp> respMap) {
boolean hasException = false;
for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry :
respMap.entrySet()) {
@@ -758,10 +760,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
});
}
+ public void removePipeMeta(final String pipeName) {
+ acquireWriteLock();
+ try {
+ removePipeMetaInternal(pipeName);
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private void removePipeMetaInternal(final String pipeName) {
+ pipeMetaKeeper.removePipeMeta(pipeName);
+ }
+
/////////////////////////////// Snapshot ///////////////////////////////
@Override
- public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ public boolean processTakeSnapshot(final File snapshotDir) throws
IOException {
acquireReadLock();
try {
final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -783,7 +798,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
@Override
- public void processLoadSnapshot(File snapshotDir) throws IOException {
+ public void processLoadSnapshot(final File snapshotDir) throws IOException {
acquireWriteLock();
try {
final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -810,7 +825,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 366024d2906..65c7699f398 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -79,6 +80,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
+
public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
@@ -275,7 +280,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return;
}
+ final Set<Integer> dataRegionIds =
+ StorageEngine.getInstance().getAllDataRegionIds().stream()
+ .map(DataRegionId::getId)
+ .collect(Collectors.toSet());
+
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ final List<Boolean> pipeCompletedList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
@@ -286,13 +297,44 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- logger.ifPresent(l -> l.info("Reporting pipe meta: {}",
pipeMeta.coreReportMessage()));
+
+ final Map<Integer, PipeTask> pipeTaskMap =
+ pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+ final boolean isAllDataRegionCompleted =
+ pipeTaskMap == null
+ || pipeTaskMap.entrySet().stream()
+ .filter(entry -> dataRegionIds.contains(entry.getKey()))
+ .allMatch(entry -> ((PipeDataNodeTask)
entry.getValue()).isCompleted());
+ // If the "source.history.terminate-pipe-on-all-consumed" is false or
the pipe does
+ // not include data transfer, we should not terminate the pipe.
+ final boolean includeDataAndNeedDrop =
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+ pipeMeta.getStaticMeta().getExtractorParameters())
+ .getLeft()
+ && pipeMeta
+ .getStaticMeta()
+ .getExtractorParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(
+ SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+
+ pipeCompletedList.add(isAllDataRegionCompleted &&
includeDataAndNeedDrop);
+
+ logger.ifPresent(
+ l ->
+ l.info(
+ "Reporting pipe meta: {}, isCompleted: {}",
+ pipeMeta.coreReportMessage(),
+ includeDataAndNeedDrop));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
- } catch (final IOException e) {
+ } catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
+ resp.setPipeCompletedList(pipeCompletedList);
}
@Override
@@ -304,7 +346,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
LOGGER.info("Received pipe heartbeat request {} from config node.",
req.heartbeatId);
+ final Set<Integer> dataRegionIds =
+ StorageEngine.getInstance().getAllDataRegionIds().stream()
+ .map(DataRegionId::getId)
+ .collect(Collectors.toSet());
+
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ final List<Boolean> pipeCompletedList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
@@ -315,14 +363,44 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- logger.ifPresent(l -> l.info("Reporting pipe meta: {}",
pipeMeta.coreReportMessage()));
+
+ final Map<Integer, PipeTask> pipeTaskMap =
+ pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+ final boolean isAllDataRegionCompleted =
+ pipeTaskMap == null
+ || pipeTaskMap.entrySet().stream()
+ .filter(entry -> dataRegionIds.contains(entry.getKey()))
+ .allMatch(entry -> ((PipeDataNodeTask)
entry.getValue()).isCompleted());
+ // If the "source.history.terminate-pipe-on-all-consumed" is false or
the pipe does
+ // not include data transfer, we should not terminate the pipe.
+ final boolean includeDataAndNeedDrop =
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+ pipeMeta.getStaticMeta().getExtractorParameters())
+ .getLeft()
+ && pipeMeta
+ .getStaticMeta()
+ .getExtractorParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(
+ SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+
+ pipeCompletedList.add(isAllDataRegionCompleted &&
includeDataAndNeedDrop);
+
+ logger.ifPresent(
+ l ->
+ l.info(
+ "Reporting pipe meta: {}, isCompleted: {}",
+ pipeMeta.coreReportMessage(),
+ includeDataAndNeedDrop));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
- } catch (final IOException e) {
+ } catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
-
+ resp.setPipeCompletedList(pipeCompletedList);
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
@@ -477,6 +555,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
+ ///////////////////////// Terminate Logic /////////////////////////
+
+ public void markCompleted(final String pipeName, final int regionId) {
+ acquireWriteLock();
+ try {
+ if (pipeMetaKeeper.containsPipeMeta(pipeName)) {
+ final PipeDataNodeTask pipeDataNodeTask =
+ ((PipeDataNodeTask)
+ pipeTaskManager.getPipeTask(
+ pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(),
regionId));
+ if (Objects.nonNull(pipeDataNodeTask)) {
+ pipeDataNodeTask.markCompleted();
+ }
+ }
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
///////////////////////// Utils /////////////////////////
public Set<Integer> getPipeTaskRegionIdSet(final String pipeName, final long
creationTime) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
new file mode 100644
index 00000000000..c1252fe668f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.terminate;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
+
+/**
+ * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
+ * that is, when the historical {@link PipeTsFileInsertionEvent}s are all
processed, this will be
+ * reported next and mark the {@link PipeDataNodeTask} as completed. WARNING:
This event shall never
+ * be discarded.
+ */
+public class PipeTerminateEvent extends EnrichedEvent {
+ private final int dataRegionId;
+
+ public PipeTerminateEvent(
+ final String pipeName, final PipeTaskMeta pipeTaskMeta, final int
dataRegionId) {
+ super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ this.dataRegionId = dataRegionId;
+ }
+
+ @Override
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
+ return true;
+ }
+
+ @Override
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
+ return true;
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return MinimumProgressIndex.INSTANCE;
+ }
+
+ @Override
+ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
+ // Should record PipeTaskMeta, for the terminateEvent shall report
progress to
+ // notify the pipeTask it's completed.
+ return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return false;
+ }
+
+ @Override
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return true;
+ }
+
+ @Override
+ public void reportProgress() {
+ PipeAgent.task().markCompleted(pipeName, dataRegionId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId)
+ + " - "
+ + super.toString();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 3d1528fe659..880d3f9f7f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -64,6 +65,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -72,6 +75,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
@@ -93,22 +97,22 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private boolean isDbNameCoveredByPattern = false;
private boolean isHistoricalExtractorEnabled = false;
-
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event
time
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
-
private long historicalDataExtractionTimeLowerBound; // Arrival time
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
private boolean shouldExtractInsertion;
-
private boolean shouldTransferModFile; // Whether to transfer mods
+ private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
+ private boolean isTerminateSignalSent = false;
+
private Queue<TsFileResource> pendingQueue;
@Override
- public void validate(PipeParameterValidator validator) {
+ public void validate(final PipeParameterValidator validator) {
final PipeParameters parameters = validator.getParameters();
if (parameters.hasAnyAttributes(
@@ -139,7 +143,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
EXTRACTOR_END_TIME_KEY));
}
return;
- } catch (Exception e) {
+ } catch (final Exception e) {
// compatible with the current validation framework
throw new PipeParameterNotValidException(e.getMessage());
}
@@ -191,14 +195,22 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
|| // Should extract deletion
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
.getRight());
- } catch (Exception e) {
+
+ shouldTerminatePipeOnAllHistoricalEventsConsumed =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+ EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+ EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+ } catch (final Exception e) {
// Compatible with the current validation framework
throw new PipeParameterNotValidException(e.getMessage());
}
}
@Override
- public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws IllegalPathException {
shouldExtractInsertion =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters).getLeft();
@@ -394,7 +406,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
// Will unpin it after the PipeTsFileInsertionEvent is created
and pinned.
try {
PipeResourceManager.tsfile().pinTsFileResource(resource,
shouldTransferModFile);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("Pipe: failed to pin TsFileResource {}",
resource.getTsFilePath());
}
});
@@ -426,7 +438,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
}
- private boolean mayTsFileContainUnprocessedData(TsFileResource resource) {
+ private boolean mayTsFileContainUnprocessedData(final TsFileResource
resource) {
if (startIndex instanceof TimeWindowStateProgressIndex) {
// The resource is closed thus the TsFileResource#getFileEndTime() is
safe to use
return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <=
resource.getFileEndTime();
@@ -446,21 +458,21 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
}
- private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource
resource) {
+ private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource
resource) {
return !(resource.getFileEndTime() < historicalDataExtractionStartTime
|| historicalDataExtractionEndTime < resource.getFileStartTime());
}
- private boolean isTsFileResourceCoveredByTimeRange(TsFileResource resource) {
+ private boolean isTsFileResourceCoveredByTimeRange(final TsFileResource
resource) {
return historicalDataExtractionStartTime <= resource.getFileStartTime()
&& historicalDataExtractionEndTime >= resource.getFileEndTime();
}
- private boolean
isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource resource) {
+ private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final
TsFileResource resource) {
try {
return historicalDataExtractionTimeLowerBound
<=
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to get the generation time of TsFile {}, extract
it anyway"
+ " (historical data extraction time lower bound: {})",
@@ -483,7 +495,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final TsFileResource resource = pendingQueue.poll();
if (resource == null) {
- return null;
+ isTerminateSignalSent = true;
+ final PipeTerminateEvent terminateEvent =
+ new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+ terminateEvent.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileExtractor.class.getName());
+ return terminateEvent;
}
final PipeTsFileInsertionEvent event =
@@ -508,7 +525,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
try {
PipeResourceManager.tsfile().unpinTsFileResource(resource);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
pipeName,
@@ -520,7 +537,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
public synchronized boolean hasConsumedAll() {
- return Objects.isNull(pendingQueue) || pendingQueue.isEmpty();
+ // If the pendingQueue is null when the function is called, it
+ // implies that the extractor only extracts deletion thus the
+ // Historical event has nothing to consume
+ return Objects.isNull(pendingQueue)
+ || pendingQueue.isEmpty()
+ && (!shouldTerminatePipeOnAllHistoricalEventsConsumed ||
isTerminateSignalSent);
}
@Override
@@ -535,7 +557,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
resource -> {
try {
PipeResourceManager.tsfile().unpinTsFileResource(resource);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after dropping
pipe, original path: {}",
pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
index c5b58c6e9b8..502bdec2c7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
@@ -36,12 +36,14 @@ public class PipeDataNodeTask implements PipeTask {
private final PipeTaskStage processorStage;
private final PipeTaskStage connectorStage;
+ private volatile boolean isCompleted = false;
+
public PipeDataNodeTask(
- String pipeName,
- int regionId,
- PipeTaskStage extractorStage,
- PipeTaskStage processorStage,
- PipeTaskStage connectorStage) {
+ final String pipeName,
+ final int regionId,
+ final PipeTaskStage extractorStage,
+ final PipeTaskStage processorStage,
+ final PipeTaskStage connectorStage) {
this.pipeName = pipeName;
this.regionId = regionId;
@@ -106,6 +108,14 @@ public class PipeDataNodeTask implements PipeTask {
return pipeName;
}
+ public boolean isCompleted() {
+ return isCompleted;
+ }
+
+ public void markCompleted() {
+ this.isCompleted = true;
+ }
+
@Override
public String toString() {
return pipeName + "@" + regionId;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 05ed098976f..6a60d77cd0e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -61,6 +61,12 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_MODS_ENABLE_KEY =
"extractor.mods.enable";
public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
+ public static final String
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY =
+ "extractor.history.terminate-pipe-on-all-consumed";
+ public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY
=
+ "source.history.terminate-pipe-on-all-consumed";
+ public static final boolean
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE =
+ false;
public static final String EXTRACTOR_REALTIME_ENABLE_KEY =
"extractor.realtime.enable";
public static final String SOURCE_REALTIME_ENABLE_KEY =
"source.realtime.enable";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index dbbc8fdd961..e30fcf11269 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -33,9 +33,13 @@ public class PipeMeta {
private final PipeStaticMeta staticMeta;
private final PipeRuntimeMeta runtimeMeta;
- public PipeMeta(PipeStaticMeta staticMeta, PipeRuntimeMeta runtimeMeta) {
+ // This is temporary information of pipe and will not be serialized.
+ private final PipeTemporaryMeta temporaryMeta;
+
+ public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta
runtimeMeta) {
this.staticMeta = staticMeta;
this.runtimeMeta = runtimeMeta;
+ this.temporaryMeta = new PipeTemporaryMeta();
}
public PipeStaticMeta getStaticMeta() {
@@ -46,25 +50,29 @@ public class PipeMeta {
return runtimeMeta;
}
+ public PipeTemporaryMeta getTemporaryMeta() {
+ return temporaryMeta;
+ }
+
public ByteBuffer serialize() throws IOException {
- PublicBAOS byteArrayOutputStream = new PublicBAOS();
- DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
serialize(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- public void serialize(OutputStream outputStream) throws IOException {
+ public void serialize(final OutputStream outputStream) throws IOException {
staticMeta.serialize(outputStream);
runtimeMeta.serialize(outputStream);
}
- public static PipeMeta deserialize(FileInputStream fileInputStream) throws
IOException {
+ public static PipeMeta deserialize(final FileInputStream fileInputStream)
throws IOException {
final PipeStaticMeta staticMeta =
PipeStaticMeta.deserialize(fileInputStream);
final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(fileInputStream);
return new PipeMeta(staticMeta, runtimeMeta);
}
- public static PipeMeta deserialize(ByteBuffer byteBuffer) {
+ public static PipeMeta deserialize(final ByteBuffer byteBuffer) {
final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(byteBuffer);
return new PipeMeta(staticMeta, runtimeMeta);
@@ -104,25 +112,33 @@ public class PipeMeta {
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
- PipeMeta pipeMeta = (PipeMeta) o;
+ final PipeMeta pipeMeta = (PipeMeta) o;
return Objects.equals(staticMeta, pipeMeta.staticMeta)
- && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta);
+ && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta)
+ && Objects.equals(temporaryMeta, pipeMeta.temporaryMeta);
}
@Override
public int hashCode() {
- return Objects.hash(staticMeta, runtimeMeta);
+ return Objects.hash(staticMeta, runtimeMeta, temporaryMeta);
}
@Override
public String toString() {
- return "PipeMeta{" + "staticMeta=" + staticMeta + ", runtimeMeta=" +
runtimeMeta + '}';
+ return "PipeMeta{"
+ + "staticMeta="
+ + staticMeta
+ + ", runtimeMeta="
+ + runtimeMeta
+ + ", temporaryMeta="
+ + temporaryMeta
+ + '}';
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
new file mode 100644
index 00000000000..6da2be8e81f
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTemporaryMeta {
+
+ private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new
ConcurrentHashMap<>();
+
+ public void markDataNodeCompleted(final int dataNodeId) {
+ completedDataNodeIds.put(dataNodeId, dataNodeId);
+ }
+
+ public Set<Integer> getCompletedDataNodeIds() {
+ return completedDataNodeIds.keySet();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
+ return Objects.equals(this.completedDataNodeIds,
that.completedDataNodeIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(completedDataNodeIds);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeTemporaryMeta{" + "completedDataNodeIds=" +
completedDataNodeIds + '}';
+ }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index f5d6839ce6c..06d0fd8d0e7 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -294,6 +294,7 @@ struct TDataNodeHeartbeatResp {
11: optional string activateStatus
12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
13: optional map<common.TConsensusGroupId, i64> consensusLogicalTimeMap
+ 14: optional list<bool> pipeCompletedList
}
struct TPipeHeartbeatReq {
@@ -302,6 +303,7 @@ struct TPipeHeartbeatReq {
struct TPipeHeartbeatResp {
1: required list<binary> pipeMetaList
+ 2: optional list<bool> pipeCompletedList
}
enum TSchemaLimitLevel{