This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e6ad0053602 Pipe: Fixed the bug that the pipe will restart a pipe
stopped by user but encountered sync failure later (#17586) (#17605)
e6ad0053602 is described below
commit e6ad00536026ba71d76cfbcf3f609faf811c13db
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 17:36:45 2026 +0800
Pipe: Fixed the bug that the pipe will restart a pipe stopped by user but
encountered sync failure later (#17586) (#17605)
* fix
* fix
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 8 ++
.../pipe/PipeTaskInfoAutoRestartTest.java | 128 +++++++++++++++++++++
2 files changed, 136 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 2bfdbf9e49a..199b835c839 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -798,6 +798,14 @@ public class PipeTaskInfo implements SnapshotProcessor {
final PipeRuntimeMeta runtimeMeta =
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
+ // Keep user-stopped pipes out of the auto-restart flow.
Otherwise, a failed
+ // STOPPED meta sync can turn a manually stopped pipe into a
runtime-stopped one
+ // and the next PipeMetaSyncer round will restart it
automatically.
+ if (PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
+ && !runtimeMeta.getIsStoppedByRuntimeException()) {
+ return;
+ }
+
// Mark the status of the pipe with exception as stopped
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
new file mode 100644
index 00000000000..762c3bf045c
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTaskInfoAutoRestartTest {
+
+ private static final int DATA_NODE_ID = 1;
+
+ private PipeTaskInfo pipeTaskInfo;
+
+ @Before
+ public void setUp() {
+ pipeTaskInfo = new PipeTaskInfo();
+ }
+
+ @Test
+ public void
testRecordDataNodePushPipeMetaExceptionsMarksRunningPipeForAutoRestart() {
+ final String pipeName = "runningPipe";
+ createPipe(pipeName, PipeStatus.RUNNING);
+
+ Assert.assertTrue(
+
pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName)));
+
+ final PipeRuntimeMeta runtimeMeta =
+ pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta();
+ Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+ Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
+
+ Assert.assertTrue(pipeTaskInfo.autoRestart());
+ Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
+ }
+
+ @Test
+ public void
testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAutoRestart() {
+ final String pipeName = "stoppedPipe";
+ createPipe(pipeName, PipeStatus.STOPPED);
+
+ Assert.assertTrue(pipeTaskInfo.isPipeStoppedByUser(pipeName));
+ Assert.assertTrue(
+
pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName)));
+
+ final PipeRuntimeMeta runtimeMeta =
+ pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta();
+ Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+ Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException());
+ Assert.assertTrue(pipeTaskInfo.isPipeStoppedByUser(pipeName));
+
+ Assert.assertFalse(pipeTaskInfo.autoRestart());
+ Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+ }
+
+ private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String
pipeName) {
+ final TPushPipeMetaRespExceptionMessage exceptionMessage =
+ new TPushPipeMetaRespExceptionMessage(
+ pipeName, "failed to push pipe meta", System.currentTimeMillis());
+ final TPushPipeMetaResp resp =
+ new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+ .setExceptionMessages(Collections.singletonList(exceptionMessage));
+ return Collections.singletonMap(DATA_NODE_ID, resp);
+ }
+
+ private void createPipe(final String pipeName, final PipeStatus
initialStatus) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+ extractorAttributes.put("extractor", "iotdb-source");
+ processorAttributes.put("processor", "do-nothing-processor");
+ connectorAttributes.put("connector", "iotdb-thrift-sink");
+
+ final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
+ final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
+ pipeTasks.put(DATA_NODE_ID, pipeTaskMeta);
+
+ final PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ pipeName,
+ System.currentTimeMillis(),
+ extractorAttributes,
+ processorAttributes,
+ connectorAttributes);
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+ pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta,
pipeRuntimeMeta));
+
+ if (PipeStatus.RUNNING.equals(initialStatus)) {
+ pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName,
PipeStatus.RUNNING));
+ }
+ }
+}