This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d78f1a915f [IOTDB-5968] Pipe: pipe task does not work properly after 
cluster reboot (#10046)
5d78f1a915f is described below

commit 5d78f1a915f3c51c637a26e2c691ee07d5aefa2b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Jun 4 01:23:14 2023 +0800

    [IOTDB-5968] Pipe: pipe task does not work properly after cluster reboot 
(#10046)
    
    * fix: dead lock issue found in NodeManager.java
    
    * fix: dead lock issue found in SimpleProgressIndex.java
    
    * fix: logic error of 'pipe task start' found in PipeTaskAgent.java , which 
may cause data consistency issues
    
    * fix: NPE error from ClusterSyncInfoFetcher.java
    
    * improvement: reduce some exception log print
    
    * improvement: introduce some log print for progress index reporting and 
recovering
---
 .../iotdb/confignode/manager/node/NodeManager.java |  9 +++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  9 ++--
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  8 ++--
 .../runtime/PipeHandleMetaChangeProcedure.java     | 24 +++++++---
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  8 ++--
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  8 ++++
 .../consensus/index/impl/SimpleProgressIndex.java  |  2 +-
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  5 ++-
 .../db/pipe/agent/runtime/HeartbeatScheduler.java  | 23 ----------
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   | 12 +++--
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 51 +++++++++++++++-------
 .../core/event/realtime/TsFileEpochManager.java    |  6 +--
 .../db/sync/common/ClusterSyncInfoFetcher.java     | 13 +-----
 13 files changed, 98 insertions(+), 80 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2baa07f1973..8dcf95789d8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -206,8 +206,10 @@ public class NodeManager {
   }
 
   private TRuntimeConfiguration getRuntimeConfiguration() {
-    getPipeManager().getPipePluginCoordinator().lock();
+    // getPipeTaskCoordinator.lock() should be called outside the 
getPipePluginCoordinator().lock()
+    // to avoid deadlock
     getPipeManager().getPipeTaskCoordinator().lock();
+    getPipeManager().getPipePluginCoordinator().lock();
     getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
     getUDFManager().getUdfInfo().acquireUDFTableLock();
 
@@ -226,8 +228,11 @@ public class NodeManager {
     } finally {
       getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
       getUDFManager().getUdfInfo().releaseUDFTableLock();
-      getPipeManager().getPipeTaskCoordinator().unlock();
       getPipeManager().getPipePluginCoordinator().unlock();
+      // getPipeTaskCoordinator.unlock() should be called outside the
+      // getPipePluginCoordinator().unlock()
+      // to avoid deadlock
+      getPipeManager().getPipeTaskCoordinator().unlock();
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 2c9485c29fc..8aaa0948f80 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -223,22 +223,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
                                   dataRegionGroupId,
                                   new PipeTaskMeta(
                                       new MinimumProgressIndex(), 
newDataRegionLeader));
-                            } else {
-                              LOGGER.warn(
-                                  "The pipe task meta does not contain the 
data region group {} or the data region group has already been removed",
-                                  dataRegionGroupId);
                             }
+                            // else:
+                            // "The pipe task meta does not contain the data 
region group {} or
+                            // the data region group has already been removed"
                           }
                         }));
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+    LOGGER.info("Handling pipe meta changes ...");
     pipeMetaKeeper.clear();
     plan.getPipeMetaList()
         .forEach(
             pipeMeta -> {
               
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+              LOGGER.info("Recording pipe meta: {}", pipeMeta);
             });
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 0c218184b5c..3cbcebe6f6b 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -100,10 +100,10 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: 
executeFromHandleOnDataNodes");
 
-    pushPipeMetaToDataNodes(env);
+    pushPipeMetaToDataNodesIgnoreException(env);
   }
 
   @Override
@@ -142,10 +142,10 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   }
 
   @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: 
rollbackFromCreateOnDataNodes");
 
-    pushPipeMetaToDataNodes(env);
+    pushPipeMetaToDataNodesIgnoreException(env);
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 8f9f9207ebc..39948e9fbb9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
             .getValue()
             .getProgressIndex()
             .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
-          runtimeMetaOnConfigNode
-              .getValue()
-              .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+          LOGGER.info(
+              "Updating progress index for (pipe name: {}, consensus group id: 
{}) ... Progress index on config node: {}, progress index from data node: {}",
+              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+              runtimeMetaOnConfigNode.getKey(),
+              runtimeMetaOnConfigNode.getValue().getProgressIndex(),
+              runtimeMetaFromDataNode.getProgressIndex());
+          LOGGER.info(
+              "Progress index for (pipe name: {}, consensus group id: {}) is 
updated to {}",
+              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+              runtimeMetaOnConfigNode.getKey(),
+              runtimeMetaOnConfigNode
+                  .getValue()
+                  
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
+
           needWriteConsensusOnConfigNodes = true;
         }
 
@@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
         pipeTaskMetaOnConfigNode.clearExceptionMessages();
         for (final PipeRuntimeException exception :
             runtimeMetaFromDataNode.getExceptionMessages()) {
+
           pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
 
           if (exception instanceof PipeRuntimeCriticalException) {
@@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                 .get()
                 .equals(PipeStatus.STOPPED)) {
               
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+
               needWriteConsensusOnConfigNodes = true;
               needPushPipeMetaToDataNodes = true;
 
@@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                       .forEach(
                           status -> {
                             status.set(PipeStatus.STOPPED);
+
                             needWriteConsensusOnConfigNodes = true;
                             needPushPipeMetaToDataNodes = true;
 
@@ -224,14 +238,14 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
 
     if (!needPushPipeMetaToDataNodes) {
       return;
     }
 
-    pushPipeMetaToDataNodes(env);
+    pushPipeMetaToDataNodesIgnoreException(env);
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 167008e2ed3..a7faa989ab9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -66,10 +66,10 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
 
-    pushPipeMetaToDataNodes(env);
+    pushPipeMetaToDataNodesIgnoreException(env);
   }
 
   @Override
@@ -94,10 +94,10 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
 
-    pushPipeMetaToDataNodes(env);
+    // do nothing
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 15065499562..133a74854be 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -200,6 +200,14 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
+  protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv 
env) {
+    try {
+      pushPipeMetaToDataNodes(env);
+    } catch (Throwable throwable) {
+      LOGGER.info("Failed to push pipe meta list to data nodes, will retry 
later.", throwable);
+    }
+  }
+
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index eaa80096823..9ce9890ecb4 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex {
       // thatSimpleProgressIndex.memtableFlushOrderId
       return this;
     } finally {
-      lock.writeLock().lock();
+      lock.writeLock().unlock();
     }
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 46547704d08..43330fa392b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -52,8 +52,9 @@ public class PipeTaskMeta {
     return progressIndex.get();
   }
 
-  public void updateProgressIndex(ProgressIndex updateIndex) {
-    progressIndex.updateAndGet(index -> 
index.updateToMinimumIsAfterProgressIndex(updateIndex));
+  public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+    return progressIndex.updateAndGet(
+        index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
   }
 
   public int getLeaderDataNodeId() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
deleted file mode 100644
index de3f30f388b..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
-public class HeartbeatScheduler {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index adf15709aff..16f3716bc13 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -163,13 +163,19 @@ class PipeAgentLauncher {
       PipeAgent.task()
           .handlePipeMetaChanges(
               getAllPipeInfoResp.getAllPipeInfo().stream()
-                  .map(PipeMeta::deserialize)
+                  .map(
+                      byteBuffer -> {
+                        final PipeMeta pipeMeta = 
PipeMeta.deserialize(byteBuffer);
+                        LOGGER.info(
+                            "Pulled pipe meta from config node: {}, recovering 
...", pipeMeta);
+                        return pipeMeta;
+                      })
                   .collect(Collectors.toList()));
-    } catch (Throwable throwable) {
+    } catch (Exception e) {
       LOGGER.info(
           "Failed to get pipe task meta from config node. Ignore the 
exception, "
               + "because config node may not be ready yet, and meta will be 
pushed by config node later.",
-          throwable);
+          e);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d086e6e71ea..d21d8ed5d4e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -94,11 +95,11 @@ public class PipeTaskAgent {
 
       // if pipe meta does not exist on data node, create a new pipe
       if (metaOnDataNode == null) {
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+        if (createPipe(metaFromConfigNode)) {
+          // if the status recorded in config node is RUNNING, start the pipe
           startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
         }
-        // if the status is STOPPED or DROPPED, do nothing
+        // if the status recorded in config node is STOPPED or DROPPED, do 
nothing
         continue;
       }
 
@@ -109,8 +110,7 @@ public class PipeTaskAgent {
       // first check if pipe static meta has changed, if so, drop the pipe and 
create a new one
       if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
         dropPipe(pipeName);
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+        if (createPipe(metaFromConfigNode)) {
           startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
         }
         // if the status is STOPPED or DROPPED, do nothing
@@ -245,9 +245,17 @@ public class PipeTaskAgent {
 
   ////////////////////////// Manage by Pipe Name //////////////////////////
 
-  private void createPipe(PipeMeta pipeMeta) {
-    final String pipeName = pipeMeta.getStaticMeta().getPipeName();
-    final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+  /**
+   * Create a new pipe. If the pipe already exists, do nothing and return 
false. Otherwise, create
+   * the pipe and return true.
+   *
+   * @param pipeMetaFromConfigNode pipe meta from config node
+   * @return true if the pipe is created successfully and should be started, 
false if the pipe
+   *     already exists or is created but should not be started
+   */
+  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
+    final String pipeName = 
pipeMetaFromConfigNode.getStaticMeta().getPipeName();
+    final long creationTime = 
pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
 
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
@@ -260,7 +268,7 @@ public class PipeTaskAgent {
                 pipeName,
                 creationTime,
                 existedPipeMeta.getRuntimeMeta().getStatus().get().name());
-            return;
+            return false;
           case DROPPED:
             LOGGER.info(
                 "Pipe {} (creation time = {}) has already been dropped, but 
the pipe task meta has not been cleaned up. "
@@ -284,16 +292,25 @@ public class PipeTaskAgent {
     }
 
     // create pipe tasks and trigger create() method for each pipe task
-    final Map<TConsensusGroupId, PipeTask> pipeTasks = new 
PipeBuilder(pipeMeta).build();
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        new PipeBuilder(pipeMetaFromConfigNode).build();
     for (PipeTask pipeTask : pipeTasks.values()) {
       pipeTask.create();
     }
-    pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
+    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), 
pipeTasks);
+
+    // No matter the pipe status from config node is RUNNING or STOPPED, we 
always set the status
+    // of pipe meta to STOPPED when it is created. The STOPPED status should 
always be the initial
+    // status of a pipe, which makes the status transition logic simpler.
+    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
+        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
+    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == 
PipeStatus.RUNNING;
+    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
+
+    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
 
-    // add pipe meta to pipe meta keeper
-    // note that we do not need to set the status of pipe meta here, because 
the status of pipe meta
-    // is already set to STOPPED when it is created
-    pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
+    // If the pipe status from config node is RUNNING, we will start the pipe 
later.
+    return needToStartPipe;
   }
 
   private void dropPipe(String pipeName, long creationTime) {
@@ -551,7 +568,8 @@ public class PipeTaskAgent {
 
   public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
       throws TException {
-    if (!req.isNeedPipeMetaList()) {
+    // do nothing if data node is removing or removed, or request does not 
need pipe meta list
+    if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
       return;
     }
 
@@ -559,6 +577,7 @@ public class PipeTaskAgent {
     try {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
+        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
       }
     } catch (IOException e) {
       throw new TException(e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index 1dcc1fb9320..3e09b86e74f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -46,15 +46,13 @@ public class TsFileEpochManager {
 
     // this would not happen, but just in case
     if (!filePath2Epoch.containsKey(filePath)) {
-      LOGGER.warn(
-          String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, 
create it", filePath));
+      LOGGER.info(
+          String.format("Pipe: can not find TsFileEpoch for TsFile %s, 
creating it", filePath));
       filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
     }
 
     return new PipeRealtimeCollectEvent(
         event,
-        // TODO: we have to make sure that the TsFileInsertionEvent is the 
last event of the
-        // TsFileEpoch's life cycle
         filePath2Epoch.remove(filePath),
         resource.getDevices().stream()
             .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..f69f55cd6f7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** Only fetch read request. For write request, return SUCCESS directly. */
 public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements 
ISyncInfoFetcher {
 
   @Override
   public List<PipeInfo> getAllPipeInfos() {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo();
-      return resp.getAllPipeInfo().stream()
-          .map(PipeInfo::deserializePipeInfo)
-          .collect(Collectors.toList());
-    } catch (Exception e) {
-      LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e);
-      return Collections.emptyList();
-    }
+    return Collections.emptyList();
   }
 
   @Override

Reply via email to