This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix/clear-pipe-runtime-error-message
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4227fb55b3a34bab1fc89d89e0ae1dbb52ffd659
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 29 18:34:33 2026 +0800

    Fix stale pipe runtime error messages
---
 .../response/pipe/task/PipeTableResp.java          |   6 +
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  19 ++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  33 ++++--
 .../impl/pipe/task/StartPipeProcedureV2.java       |  37 +++++-
 .../runtime/heartbeat/PipeHeartbeatParserTest.java | 129 ++++++++++++++++++++-
 .../pipe/PipeTaskInfoAutoRestartTest.java          |  28 +++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  13 +++
 .../pipe/agent/task/meta/PipeRuntimeMeta.java      |   9 ++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java |   4 +
 .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java |  26 +++++
 10 files changed, 283 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 49f10a79e8a..a2d2856e751 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -194,6 +194,9 @@ public class PipeTableResp implements DataSet {
           runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) {
         final Integer nodeId = entry.getKey();
         final PipeRuntimeException e = entry.getValue();
+        if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
+          continue;
+        }
         final String exceptionMessage =
             DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + 
e.getMessage();
 
@@ -206,6 +209,9 @@ public class PipeTableResp implements DataSet {
           runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
         final Integer regionId = entry.getKey();
         for (final PipeRuntimeException e : 
entry.getValue().getExceptionMessages()) {
+          if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
+            continue;
+          }
           final String exceptionMessage =
               DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + 
e.getMessage();
           pipeExceptionMessage2RegionIdsMap
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 97c6795daf4..718a9fe97e4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -248,24 +248,21 @@ public class PipeHeartbeatParser {
 
         // Update runtime exception
         final PipeTaskMeta pipeTaskMetaFromCoordinator = 
runtimeMetaFromCoordinator.getValue();
+        final PipeRuntimeMeta pipeRuntimeMeta = 
pipeMetaFromCoordinator.getRuntimeMeta();
         pipeTaskMetaFromCoordinator.clearExceptionMessages();
         for (final PipeRuntimeException exception : 
runtimeMetaFromAgent.getExceptionMessages()) {
-
-          // Do not judge the exception's clear time to avoid the restart 
process
-          // being ended after the failure of some pipe
+          if (exception.getTimeStamp() <= 
pipeRuntimeMeta.getExceptionsClearTime()) {
+            needPushPipeMetaToDataNodes.set(true);
+            continue;
+          }
 
           pipeTaskMetaFromCoordinator.trackExceptionMessage(exception);
 
           if (exception instanceof PipeRuntimeCriticalException) {
             final String pipeName = 
pipeMetaFromCoordinator.getStaticMeta().getPipeName();
-            if (!pipeMetaFromCoordinator
-                .getRuntimeMeta()
-                .getStatus()
-                .get()
-                .equals(PipeStatus.STOPPED)) {
-              PipeRuntimeMeta runtimeMeta = 
pipeMetaFromCoordinator.getRuntimeMeta();
-              runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-              runtimeMeta.setIsStoppedByRuntimeException(true);
+            if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) 
{
+              pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED);
+              pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
 
               needWriteConsensusOnConfigNodes.set(true);
               needPushPipeMetaToDataNodes.set(false);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 247f803152b..d231a613621 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -763,14 +763,26 @@ public class PipeTaskInfo implements SnapshotProcessor {
   public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final 
String pipeName) {
     acquireWriteLock();
     try {
-      
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
+      clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
+          pipeName, System.currentTimeMillis());
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
+      final String pipeName, final long exceptionsClearTime) {
+    acquireWriteLock();
+    try {
+      clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
+          pipeName, exceptionsClearTime);
     } finally {
       releaseWriteLock();
     }
   }
 
   private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
-      final String pipeName) {
+      final String pipeName, final long exceptionsClearTime) {
     if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
       return;
     }
@@ -780,7 +792,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     // To avoid unnecessary retries, we set the isStoppedByRuntimeException 
flag to false
     runtimeMeta.setIsStoppedByRuntimeException(false);
 
-    runtimeMeta.setExceptionsClearTime(System.currentTimeMillis());
+    runtimeMeta.setExceptionsClearTime(exceptionsClearTime);
 
     final Map<Integer, PipeRuntimeException> exceptionMap =
         runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
@@ -904,14 +916,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
    */
   private boolean autoRestartInternal() {
     final AtomicBoolean needRestart = new AtomicBoolean(false);
+    final long exceptionsClearTime = System.currentTimeMillis();
     final List<String> pipeToRestart = new LinkedList<>();
 
     pipeMetaKeeper
         .getPipeMetaList()
         .forEach(
             pipeMeta -> {
-              if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) {
-                pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+              if (runtimeMeta.getIsStoppedByRuntimeException()) {
+                runtimeMeta.setExceptionsClearTime(exceptionsClearTime);
+                runtimeMeta.getStatus().set(PipeStatus.RUNNING);
 
                 needRestart.set(true);
                 pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName());
@@ -945,9 +960,11 @@ public class PipeTaskInfo implements SnapshotProcessor {
         .getPipeMetaList()
         .forEach(
             pipeMeta -> {
-              if 
(pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) {
-                clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
-                    pipeMeta.getStaticMeta().getPipeName());
+              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+              if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING)
+                  && runtimeMeta.getIsStoppedByRuntimeException()) {
+                
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
+                    pipeMeta.getStaticMeta().getPipeName(), 
runtimeMeta.getExceptionsClearTime());
               }
             });
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 7c9bff96f8f..42ac65a74c3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
 public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
@@ -102,6 +106,9 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws 
IOException {
     
LOGGER.info(ProcedureMessages.STARTPIPEPROCEDUREV2_EXECUTEFROMOPERATEONDATANODES,
 pipeName);
 
+    final long exceptionsClearTime = System.currentTimeMillis();
+    final boolean isStoppedByRuntimeException =
+        pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
     final String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
@@ -114,7 +121,35 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     // Clear exceptions and set isStoppedByRuntimeException to false if the 
pipe is
     // started successfully on all data nodes
-    
pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName);
+    pipeTaskInfo
+        .get()
+        .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 
exceptionsClearTime);
+
+    if (isStoppedByRuntimeException) {
+      writePipeMetaChangesToConfigNodeConsensus(env);
+    }
+  }
+
+  private void writePipeMetaChangesToConfigNodeConsensus(final 
ConfigNodeProcedureEnv env) {
+    final List<PipeMeta> pipeMetaList = new ArrayList<>();
+    for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+      pipeMetaList.add(pipeMeta);
+    }
+
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new PipeHandleMetaChangePlan(pipeMetaList));
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
index d5a46d42c84..7383bf7937a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
@@ -20,6 +20,14 @@
 package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
@@ -29,13 +37,18 @@ import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordin
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -49,6 +62,8 @@ import static org.mockito.Mockito.when;
 
 public class PipeHeartbeatParserTest {
 
+  private static final int DATA_NODE_ID = 1;
+
   private boolean originalSeparatedPipeHeartbeatEnabled;
 
   @Before
@@ -117,7 +132,88 @@ public class PipeHeartbeatParserTest {
     verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, 
false);
   }
 
+  @Test
+  public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws 
Exception {
+    
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+    final String pipeName = "staleExceptionPipe";
+    final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
+    createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);
+
+    final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
+    final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+    final PipeTaskMeta coordinatorTaskMeta =
+        runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
+    coordinatorTaskMeta.trackExceptionMessage(
+        new PipeRuntimeCriticalException("stale failure", 100L));
+
+    
pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 
200L);
+
+    final PipeTaskMeta agentTaskMeta =
+        new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
+    agentTaskMeta.trackExceptionMessage(new 
PipeRuntimeCriticalException("stale failure", 100L));
+    final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new 
ConcurrentHashMap<>();
+    agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
+    final PipeHeartbeat heartbeat =
+        new PipeHeartbeat(
+            Collections.singletonList(
+                new PipeMeta(pipeMeta.getStaticMeta(), new 
PipeRuntimeMeta(agentPipeTasks))
+                    .serialize()),
+            Collections.singletonList(false),
+            Collections.singletonList(0L),
+            Collections.singletonList(0D));
+
+    final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
+    context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);
+
+    Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages());
+    Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, 
true);
+  }
+
+  @Test
+  public void testParseHeartbeatTracksExceptionsAfterClearTime() throws 
Exception {
+    
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+    final String pipeName = "freshExceptionPipe";
+    final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
+    createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);
+
+    final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
+    final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+    final PipeTaskMeta coordinatorTaskMeta =
+        runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
+    
pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 
200L);
+
+    final PipeTaskMeta agentTaskMeta =
+        new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
+    agentTaskMeta.trackExceptionMessage(new 
PipeRuntimeCriticalException("fresh failure", 300L));
+    final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new 
ConcurrentHashMap<>();
+    agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
+    final PipeHeartbeat heartbeat =
+        new PipeHeartbeat(
+            Collections.singletonList(
+                new PipeMeta(pipeMeta.getStaticMeta(), new 
PipeRuntimeMeta(agentPipeTasks))
+                    .serialize()),
+            Collections.singletonList(false),
+            Collections.singletonList(0L),
+            Collections.singletonList(0D));
+
+    final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
+    context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);
+
+    Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages());
+    Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+    Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
+    verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, 
false);
+  }
+
   private ParserTestContext createParserTestContext(final int 
registeredDataNodeCount) {
+    return createParserTestContext(registeredDataNodeCount, new 
PipeTaskInfo());
+  }
+
+  private ParserTestContext createParserTestContext(
+      final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) {
     final ConfigManager configManager = Mockito.mock(ConfigManager.class);
     final NodeManager nodeManager = Mockito.mock(NodeManager.class);
     final ProcedureManager procedureManager = 
Mockito.mock(ProcedureManager.class);
@@ -134,7 +230,7 @@ public class PipeHeartbeatParserTest {
     
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
     when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
     
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
-    when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new 
PipeTaskInfo()));
+    when(pipeTaskCoordinator.tryLock()).thenReturn(new 
AtomicReference<>(pipeTaskInfo));
     when(procedureManager.pipeHandleMetaChange(anyBoolean(), 
anyBoolean())).thenReturn(true);
     Mockito.doAnswer(
             invocation -> {
@@ -147,6 +243,37 @@ public class PipeHeartbeatParserTest {
     return new ParserTestContext(new PipeHeartbeatParser(configManager), 
procedureManager);
   }
 
+  private void createPipe(
+      final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus 
initialStatus) {
+    final Map<String, String> extractorAttributes = new HashMap<>();
+    extractorAttributes.put("extractor", "iotdb-source");
+    final Map<String, String> processorAttributes = new HashMap<>();
+    processorAttributes.put("processor", "do-nothing-processor");
+    final Map<String, String> connectorAttributes = new HashMap<>();
+    connectorAttributes.put("connector", "iotdb-thrift-sink");
+
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
+    final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
+    pipeTasks.put(DATA_NODE_ID, pipeTaskMeta);
+    final PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            pipeName,
+            System.currentTimeMillis(),
+            extractorAttributes,
+            processorAttributes,
+            connectorAttributes);
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta));
+
+    if (PipeStatus.RUNNING.equals(initialStatus)) {
+      pipeTaskInfo
+          .getPipeMetaByPipeName(pipeName)
+          .getRuntimeMeta()
+          .getStatus()
+          .set(PipeStatus.RUNNING);
+    }
+  }
+
   private void setMetaChangeFlags(
       final PipeHeartbeatParser parser,
       final boolean needWriteConsensusOnConfigNodes,
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
index 7b78f59253d..39f0095d00e 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
@@ -231,6 +231,34 @@ public class PipeTaskInfoAutoRestartTest {
         rootPassword, 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
   }
 
+  @Test
+  public void testHandleSuccessfulRestartClearsRuntimeExceptionMessages() {
+    final String pipeName = "restartPipe";
+    createPipe(pipeName, PipeStatus.RUNNING);
+
+    Assert.assertTrue(
+        
pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName)));
+
+    final PipeRuntimeMeta runtimeMeta =
+        pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta();
+    Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+    Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
+    
Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty());
+
+    Assert.assertTrue(pipeTaskInfo.autoRestart());
+    final long exceptionsClearTime = runtimeMeta.getExceptionsClearTime();
+    Assert.assertTrue(
+        runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values().stream()
+            .allMatch(exception -> exception.getTimeStamp() <= 
exceptionsClearTime));
+
+    pipeTaskInfo.handleSuccessfulRestart();
+
+    Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
+    Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException());
+    
Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty());
+    Assert.assertEquals(exceptionsClearTime, 
runtimeMeta.getExceptionsClearTime());
+  }
+
   private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String 
pipeName) {
     final TPushPipeMetaRespExceptionMessage exceptionMessage =
         new TPushPipeMetaRespExceptionMessage(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index c30a50ac495..3daeec4ebd8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -305,6 +305,8 @@ public abstract class PipeTaskAgent {
       }
     }
 
+    syncRuntimeExceptionClearTime(runtimeMetaFromCoordinator, 
runtimeMetaInAgent);
+
     // 2. Handle pipe runtime meta status changes
     final PipeStatus statusFromCoordinator = 
runtimeMetaFromCoordinator.getStatus().get();
     final PipeStatus statusInAgent = runtimeMetaInAgent.getStatus().get();
@@ -347,6 +349,12 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  private void syncRuntimeExceptionClearTime(
+      final PipeRuntimeMeta runtimeMetaFromCoordinator, final PipeRuntimeMeta 
runtimeMetaInAgent) {
+    
runtimeMetaInAgent.setExceptionsClearTime(runtimeMetaFromCoordinator.getExceptionsClearTime());
+    
runtimeMetaInAgent.clearExceptionMessagesBefore(runtimeMetaInAgent.getExceptionsClearTime());
+  }
+
   protected abstract void thawRate(final String pipeName, final long 
creationTime);
 
   protected abstract void freezeRate(final String pipeName, final long 
creationTime);
@@ -548,6 +556,11 @@ public abstract class PipeTaskAgent {
 
     pipeMetaKeeper.addPipeMeta(pipeMetaFromCoordinator);
 
+    pipeMetaFromCoordinator
+        .getRuntimeMeta()
+        .clearExceptionMessagesBefore(
+            pipeMetaFromCoordinator.getRuntimeMeta().getExceptionsClearTime());
+
     // If the pipe status from coordinator is RUNNING, we will start the pipe 
later.
     return needToStartPipe;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 1f28a24dd60..2aa1b65638f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -128,6 +128,15 @@ public class PipeRuntimeMeta {
     }
   }
 
+  public void clearExceptionMessagesBefore(final long exceptionsClearTime) {
+    nodeId2PipeRuntimeExceptionMap
+        .entrySet()
+        .removeIf(entry -> entry.getValue().getTimeStamp() <= 
exceptionsClearTime);
+    consensusGroupId2TaskMetaMap
+        .values()
+        .forEach(pipeTaskMeta -> 
pipeTaskMeta.clearExceptionMessagesBefore(exceptionsClearTime));
+  }
+
   public boolean getIsStoppedByRuntimeException() {
     return isStoppedByRuntimeException.get();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 9584ca8cbab..e9939d7b2c6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -134,6 +134,10 @@ public class PipeTaskMeta {
     exceptionMessages.clear();
   }
 
+  public synchronized void clearExceptionMessagesBefore(final long 
exceptionsClearTime) {
+    exceptionMessages.removeIf(exception -> exception.getTimeStamp() <= 
exceptionsClearTime);
+  }
+
   public synchronized void serialize(final OutputStream outputStream) throws 
IOException {
     progressIndex.get().serialize(outputStream);
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index 1e29c96e090..fa0ac2c1a37 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -151,4 +151,30 @@ public class PipeMetaDeSerTest {
     final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer);
     Assert.assertEquals(pipeMeta, pipeMeta1);
   }
+
+  @Test
+  public void testClearExceptionMessagesBeforeClearTime() {
+    final PipeTaskMeta staleTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+    staleTaskMeta.trackExceptionMessage(new 
PipeRuntimeCriticalException("stale", 100L));
+    final PipeTaskMeta freshTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+    freshTaskMeta.trackExceptionMessage(new 
PipeRuntimeCriticalException("fresh", 300L));
+
+    final ConcurrentHashMap<Integer, PipeTaskMeta> taskMetaMap = new 
ConcurrentHashMap<>();
+    taskMetaMap.put(1, staleTaskMeta);
+    taskMetaMap.put(2, freshTaskMeta);
+    final PipeRuntimeMeta runtimeMeta = new PipeRuntimeMeta(taskMetaMap);
+    runtimeMeta
+        .getNodeId2PipeRuntimeExceptionMap()
+        .put(1, new PipeRuntimeCriticalException("stale node", 100L));
+    runtimeMeta
+        .getNodeId2PipeRuntimeExceptionMap()
+        .put(2, new PipeRuntimeCriticalException("fresh node", 300L));
+
+    runtimeMeta.clearExceptionMessagesBefore(200L);
+
+    Assert.assertFalse(staleTaskMeta.hasExceptionMessages());
+    Assert.assertTrue(freshTaskMeta.hasExceptionMessages());
+    
Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(1));
+    
Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(2));
+  }
 }

Reply via email to