This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-source-reboot
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04ff1111fdf28f1c3d6466db8f238cbe41f59f12
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 3 06:43:54 2023 +0800

    fix
---
 .../iotdb/confignode/manager/node/NodeManager.java |  4 +-
 .../consensus/index/impl/SimpleProgressIndex.java  |  2 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 51 +++++++++++++++-------
 .../core/event/realtime/TsFileEpochManager.java    |  6 +--
 .../db/sync/common/ClusterSyncInfoFetcher.java     | 13 +-----
 5 files changed, 41 insertions(+), 35 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2baa07f1973..0276a5d7b4c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -206,8 +206,8 @@ public class NodeManager {
   }
 
   private TRuntimeConfiguration getRuntimeConfiguration() {
-    getPipeManager().getPipePluginCoordinator().lock();
     getPipeManager().getPipeTaskCoordinator().lock();
+    getPipeManager().getPipePluginCoordinator().lock();
     getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
     getUDFManager().getUdfInfo().acquireUDFTableLock();
 
@@ -226,8 +226,8 @@ public class NodeManager {
     } finally {
       getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
       getUDFManager().getUdfInfo().releaseUDFTableLock();
-      getPipeManager().getPipeTaskCoordinator().unlock();
       getPipeManager().getPipePluginCoordinator().unlock();
+      getPipeManager().getPipeTaskCoordinator().unlock();
     }
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index eaa80096823..9ce9890ecb4 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex {
       // thatSimpleProgressIndex.memtableFlushOrderId
       return this;
     } finally {
-      lock.writeLock().lock();
+      lock.writeLock().unlock();
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d086e6e71ea..d21d8ed5d4e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -94,11 +95,11 @@ public class PipeTaskAgent {
 
       // if pipe meta does not exist on data node, create a new pipe
       if (metaOnDataNode == null) {
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+        if (createPipe(metaFromConfigNode)) {
+          // if the status recorded in config node is RUNNING, start the pipe
           startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
         }
-        // if the status is STOPPED or DROPPED, do nothing
+        // if the status recorded in config node is STOPPED or DROPPED, do 
nothing
         continue;
       }
 
@@ -109,8 +110,7 @@ public class PipeTaskAgent {
       // first check if pipe static meta has changed, if so, drop the pipe and 
create a new one
       if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
         dropPipe(pipeName);
-        createPipe(metaFromConfigNode);
-        if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == 
PipeStatus.RUNNING) {
+        if (createPipe(metaFromConfigNode)) {
           startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
         }
         // if the status is STOPPED or DROPPED, do nothing
@@ -245,9 +245,17 @@ public class PipeTaskAgent {
 
   ////////////////////////// Manage by Pipe Name //////////////////////////
 
-  private void createPipe(PipeMeta pipeMeta) {
-    final String pipeName = pipeMeta.getStaticMeta().getPipeName();
-    final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+  /**
+   * Create a new pipe. If the pipe already exists, do nothing and return 
false. Otherwise, create
+   * the pipe and return true.
+   *
+   * @param pipeMetaFromConfigNode pipe meta from config node
+   * @return true if the pipe is created successfully and should be started, 
false if the pipe
+   *     already exists or is created but should not be started
+   */
+  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
+    final String pipeName = 
pipeMetaFromConfigNode.getStaticMeta().getPipeName();
+    final long creationTime = 
pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
 
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
@@ -260,7 +268,7 @@ public class PipeTaskAgent {
                 pipeName,
                 creationTime,
                 existedPipeMeta.getRuntimeMeta().getStatus().get().name());
-            return;
+            return false;
           case DROPPED:
             LOGGER.info(
                 "Pipe {} (creation time = {}) has already been dropped, but 
the pipe task meta has not been cleaned up. "
@@ -284,16 +292,25 @@ public class PipeTaskAgent {
     }
 
     // create pipe tasks and trigger create() method for each pipe task
-    final Map<TConsensusGroupId, PipeTask> pipeTasks = new 
PipeBuilder(pipeMeta).build();
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        new PipeBuilder(pipeMetaFromConfigNode).build();
     for (PipeTask pipeTask : pipeTasks.values()) {
       pipeTask.create();
     }
-    pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
+    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), 
pipeTasks);
+
+    // No matter the pipe status from config node is RUNNING or STOPPED, we 
always set the status
+    // of pipe meta to STOPPED when it is created. The STOPPED status should 
always be the initial
+    // status of a pipe, which makes the status transition logic simpler.
+    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
+        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
+    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == 
PipeStatus.RUNNING;
+    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
+
+    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
 
-    // add pipe meta to pipe meta keeper
-    // note that we do not need to set the status of pipe meta here, because 
the status of pipe meta
-    // is already set to STOPPED when it is created
-    pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
+    // If the pipe status from config node is RUNNING, we will start the pipe 
later.
+    return needToStartPipe;
   }
 
   private void dropPipe(String pipeName, long creationTime) {
@@ -551,7 +568,8 @@ public class PipeTaskAgent {
 
   public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
       throws TException {
-    if (!req.isNeedPipeMetaList()) {
+    // do nothing if data node is removing or removed, or request does not 
need pipe meta list
+    if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
       return;
     }
 
@@ -559,6 +577,7 @@ public class PipeTaskAgent {
     try {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
+        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
       }
     } catch (IOException e) {
       throw new TException(e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index 1dcc1fb9320..3e09b86e74f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -46,15 +46,13 @@ public class TsFileEpochManager {
 
     // this would not happen, but just in case
     if (!filePath2Epoch.containsKey(filePath)) {
-      LOGGER.warn(
-          String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, 
create it", filePath));
+      LOGGER.info(
+          String.format("Pipe: can not find TsFileEpoch for TsFile %s, 
creating it", filePath));
       filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
     }
 
     return new PipeRealtimeCollectEvent(
         event,
-        // TODO: we have to make sure that the TsFileInsertionEvent is the 
last event of the
-        // TsFileEpoch's life cycle
         filePath2Epoch.remove(filePath),
         resource.getDevices().stream()
             .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..f69f55cd6f7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** Only fetch read request. For write request, return SUCCESS directly. */
 public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements 
ISyncInfoFetcher {
 
   @Override
   public List<PipeInfo> getAllPipeInfos() {
-    try (ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo();
-      return resp.getAllPipeInfo().stream()
-          .map(PipeInfo::deserializePipeInfo)
-          .collect(Collectors.toList());
-    } catch (Exception e) {
-      LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e);
-      return Collections.emptyList();
-    }
+    return Collections.emptyList();
   }
 
   @Override

Reply via email to