This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-source-reboot
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d3a83ecfaccbf1f70a5965c40e914131967688db
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 3 00:17:03 2023 +0800

    add logs for pipe meta changes
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  2 ++
 .../runtime/PipeHandleMetaChangeProcedure.java     | 20 ++++++++++++++++---
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  2 +-
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  5 +++--
 .../db/pipe/agent/runtime/HeartbeatScheduler.java  | 23 ----------------------
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   | 12 ++++++++---
 6 files changed, 32 insertions(+), 32 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 2c9485c29fc..d19c3588468 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -234,11 +234,13 @@ public class PipeTaskInfo implements SnapshotProcessor {
   }
 
   public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+    LOGGER.info("Handling pipe meta changes ...");
     pipeMetaKeeper.clear();
     plan.getPipeMetaList()
         .forEach(
             pipeMeta -> {
               
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+              LOGGER.info("Recording pipe meta: {}", pipeMeta);
             });
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 8f9f9207ebc..46a8fc1199a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
             .getValue()
             .getProgressIndex()
             .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
-          runtimeMetaOnConfigNode
-              .getValue()
-              .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+          LOGGER.info(
+              "Updating progress index for (pipe name: {}, consensus group id: 
{}) ... Progress index on config node: {}, progress index from data node: {}",
+              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+              runtimeMetaOnConfigNode.getKey(),
+              runtimeMetaOnConfigNode.getValue().getProgressIndex(),
+              runtimeMetaFromDataNode.getProgressIndex());
+          LOGGER.info(
+              "Progress index for (pipe name: {}, consensus group id: {}) is 
updated to {}",
+              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+              runtimeMetaOnConfigNode.getKey(),
+              runtimeMetaOnConfigNode
+                  .getValue()
+                  
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
+
           needWriteConsensusOnConfigNodes = true;
         }
 
@@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
         pipeTaskMetaOnConfigNode.clearExceptionMessages();
         for (final PipeRuntimeException exception :
             runtimeMetaFromDataNode.getExceptionMessages()) {
+
           pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
 
           if (exception instanceof PipeRuntimeCriticalException) {
@@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                 .get()
                 .equals(PipeStatus.STOPPED)) {
               
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+
               needWriteConsensusOnConfigNodes = true;
               needPushPipeMetaToDataNodes = true;
 
@@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                       .forEach(
                           status -> {
                             status.set(PipeStatus.STOPPED);
+
                             needWriteConsensusOnConfigNodes = true;
                             needPushPipeMetaToDataNodes = true;
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 167008e2ed3..dfa2283775a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -97,7 +97,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
 
-    pushPipeMetaToDataNodes(env);
+    // do nothing
   }
 
   @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 46547704d08..43330fa392b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -52,8 +52,9 @@ public class PipeTaskMeta {
     return progressIndex.get();
   }
 
-  public void updateProgressIndex(ProgressIndex updateIndex) {
-    progressIndex.updateAndGet(index -> 
index.updateToMinimumIsAfterProgressIndex(updateIndex));
+  public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+    return progressIndex.updateAndGet(
+        index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
   }
 
   public int getLeaderDataNodeId() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
deleted file mode 100644
index de3f30f388b..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
-public class HeartbeatScheduler {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index adf15709aff..16f3716bc13 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -163,13 +163,19 @@ class PipeAgentLauncher {
       PipeAgent.task()
           .handlePipeMetaChanges(
               getAllPipeInfoResp.getAllPipeInfo().stream()
-                  .map(PipeMeta::deserialize)
+                  .map(
+                      byteBuffer -> {
+                        final PipeMeta pipeMeta = 
PipeMeta.deserialize(byteBuffer);
+                        LOGGER.info(
+                            "Pulled pipe meta from config node: {}, recovering 
...", pipeMeta);
+                        return pipeMeta;
+                      })
                   .collect(Collectors.toList()));
-    } catch (Throwable throwable) {
+    } catch (Exception e) {
       LOGGER.info(
           "Failed to get pipe task meta from config node. Ignore the 
exception, "
               + "because config node may not be ready yet, and meta will be 
pushed by config node later.",
-          throwable);
+          e);
     }
   }
 }

Reply via email to