This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 27637120a72 Pipe: skip pipe CN meta sync after successful 
synchronization when having no pipe (#11532)
27637120a72 is described below

commit 27637120a72e66f680f48c8f8ac7222863f77f2b
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Nov 19 22:26:27 2023 +0800

    Pipe: skip pipe CN meta sync after successful synchronization when having 
no pipe (#11532)
    
    In order to fix the issue that `sync` operation logs are still printed when 
no pipe exists in CN, like:
    
    ```text
    2023-11-13 21:01:36,412 [ProcExecWorker-1] INFO  
o.a.i.c.p.ProcedureExecutor:394 - pid=1, state=SUCCESS; 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure 
finished in 55ms successfully.
    ```
    
    a simple idea is to, in the next round of `sync` after a successful `sync` 
operation, check whether the current version of `PipeTaskInfo` is the same as 
the version synced before (empty `PipeTaskInfo`), and whether the current 
`PipeTaskInfo` is empty (indicating no need for a restart).
    
    In terms of specific implementation:
    
    1. Maintain the version of `PipeTaskInfo` synced by CN in the last round. 
In this context, the version is considered as the number of times the 
`PipeTaskInfo` write lock is acquired.
        - If a restart occurs, it is the version after 
`PipeHandleMetaChangeProcedure`; otherwise, it is the version after 
`PipeMetaSyncProcedure`.
        - Update in the lock release phase of these procedures to ensure that 
the recorded version at this point is free from concurrency issues.
        - Considering that `PipeHandleMetaChangeProcedure` may also be 
submitted by other logic, in order to avoid introducing new variables that may 
affect the serialization and deserialization logic in 
`PipeHandleMetaChangeProcedure`, we only update the version of `PipeTaskInfo` 
synced by `PipeMetaSyncProcedure` (partially affecting the liveness of 
**skipping sync** but not affecting correctness).
    2. Maintain whether the last sync was successful in `PipeMetaSyncer`.
        - To check if the `sync` operation succeeded, it is also necessary to 
block and wait for the execution result of `PipeHandleMetaChangeProcedure` 
(refer to #11475).
    
    Since querying `PipeTaskInfo` before each round of `sync` operations is 
done without locking, there may be other procedures modifying `PipeTaskInfo` at 
this time. After verification, it is confirmed that this only partially affects 
the liveness of **sync** but does not affect correctness.
---
 .../iotdb/confignode/manager/ProcedureManager.java | 21 +++++++++
 .../manager/pipe/runtime/PipeMetaSyncer.java       | 52 +++++++++++++++++-----
 .../manager/pipe/task/PipeTaskCoordinator.java     |  9 ++++
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 47 +++++++++++++++++++
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  8 ++++
 5 files changed, 127 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 290f89edbaf..af3adc40c99 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -839,6 +839,27 @@ public class ProcedureManager {
     }
   }
 
+  public TSStatus pipeHandleMetaChangeWithBlock(
+      boolean needWriteConsensusOnConfigNodes, boolean 
needPushPipeMetaToDataNodes) {
+    try {
+      final long procedureId =
+          executor.submitProcedure(
+              new PipeHandleMetaChangeProcedure(
+                  needWriteConsensusOnConfigNodes, 
needPushPipeMetaToDataNodes));
+      final List<TSStatus> statusList = new ArrayList<>();
+      final boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (Exception e) {
+      return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
   public TSStatus pipeMetaSync() {
     try {
       final long procedureId = executor.submitProcedure(new 
PipeMetaSyncProcedure());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index c6047e3684f..af5da32dc08 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -59,6 +59,12 @@ public class PipeMetaSyncer {
   private final boolean pipeAutoRestartEnabled =
       PipeConfig.getInstance().getPipeAutoRestartEnabled();
 
+  // This variable is used to record whether the last sync operation was 
successful. If successful,
+  // before the next sync operation, it can be determined based on 
PipeTaskInfoVersion whether the
+  // pipe metadata in DN and the latest pipe metadata in CN have been 
synchronized and whether there
+  // is a pipe task in the current cluster, thereby skipping unnecessary skip 
operations.
+  private boolean isLastPipeSyncSuccessful = false;
+
   PipeMetaSyncer(ConfigManager configManager) {
     this.configManager = configManager;
   }
@@ -87,6 +93,13 @@ public class PipeMetaSyncer {
   }
 
   private synchronized void sync() {
+    if (isLastPipeSyncSuccessful
+        && 
configManager.getPipeManager().getPipeTaskCoordinator().canSkipNextSync()) {
+      return;
+    }
+
+    isLastPipeSyncSuccessful = false;
+
     if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
       LOGGER.warn(
           "PipeTaskCoordinatorLock is held by another thread, skip this round 
of sync to avoid procedure and rpc accumulation as much as possible");
@@ -104,16 +117,34 @@ public class PipeMetaSyncer {
       pipeAutoRestartRoundCounter.set(0);
     }
 
-    final TSStatus status = procedureManager.pipeMetaSync();
+    final TSStatus metaSyncStatus = procedureManager.pipeMetaSync();
 
-    if (somePipesNeedRestarting
-        && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      handleSuccessfulRestartWithLock();
-      procedureManager.pipeHandleMetaChange(true, false);
-    }
+    if (metaSyncStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      boolean successfulSync = !somePipesNeedRestarting;
 
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      procedureManager.pipeHandleMetaChange(true, true);
+      if (somePipesNeedRestarting) {
+        final boolean isRestartSuccessful = handleSuccessfulRestartWithLock();
+        final TSStatus handleMetaChangeStatus =
+            procedureManager.pipeHandleMetaChangeWithBlock(true, false);
+        if (isRestartSuccessful
+            && handleMetaChangeStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          successfulSync = true;
+        }
+      }
+
+      if (successfulSync) {
+        LOGGER.info(
+            "After this successful sync, if PipeTaskInfo is empty during this 
sync and has not been modified afterwards, all subsequent syncs will be 
skipped");
+        isLastPipeSyncSuccessful = true;
+      }
+    } else {
+      LOGGER.warn("Failed to sync pipe meta. Result status: {}.", 
metaSyncStatus);
+      final TSStatus handleMetaChangeStatus =
+          procedureManager.pipeHandleMetaChangeWithBlock(true, true);
+      if (handleMetaChangeStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Failed to handle pipe meta change. Result status: {}.", 
handleMetaChangeStatus);
+      }
     }
   }
 
@@ -139,15 +170,16 @@ public class PipeMetaSyncer {
     }
   }
 
-  private void handleSuccessfulRestartWithLock() {
+  private boolean handleSuccessfulRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
         configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for handling successful 
restart.");
-      return;
+      return false;
     }
     try {
       pipeTaskInfo.get().handleSuccessfulRestart();
+      return true;
     } finally {
       configManager.getPipeManager().getPipeTaskCoordinator().unlock();
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 25d5911478a..062b626be14 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -170,4 +170,13 @@ public class PipeTaskCoordinator {
   public boolean hasAnyPipe() {
     return !pipeTaskInfo.isEmpty();
   }
+
+  /** Caller should ensure that the method is called in the write lock of 
{@link #pipeTaskInfo}. */
+  public void updateLastSyncedVersion() {
+    pipeTaskInfo.updateLastSyncedVersion();
+  }
+
+  public boolean canSkipNextSync() {
+    return pipeTaskInfo.canSkipNextSync();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 4fa9de14deb..cd2c663727c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,6 +54,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -64,8 +65,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   private final PipeMetaKeeper pipeMetaKeeper;
 
+  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
+  private final PipeTaskInfoVersion pipeTaskInfoVersion;
+
   public PipeTaskInfo() {
     this.pipeMetaKeeper = new PipeMetaKeeper();
+    this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
   }
 
   /////////////////////////////// Lock ///////////////////////////////
@@ -80,12 +85,54 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   private void acquireWriteLock() {
     pipeMetaKeeper.acquireWriteLock();
+    // We use the number of times obtaining the write lock of PipeMetaKeeper 
as the version number
+    // of PipeTaskInfo.
+    pipeTaskInfoVersion.increaseLatestVersion();
   }
 
   private void releaseWriteLock() {
     pipeMetaKeeper.releaseWriteLock();
   }
 
+  /////////////////////////////// Version ///////////////////////////////
+
+  private class PipeTaskInfoVersion {
+
+    private final AtomicLong latestVersion;
+    private long lastSyncedVersion;
+    private boolean isLastSyncedPipeTaskInfoEmpty;
+
+    public PipeTaskInfoVersion() {
+      this.latestVersion = new AtomicLong(0);
+      this.lastSyncedVersion = 0;
+      this.isLastSyncedPipeTaskInfoEmpty = false;
+    }
+
+    public void increaseLatestVersion() {
+      latestVersion.incrementAndGet();
+    }
+
+    public void updateLastSyncedVersion() {
+      lastSyncedVersion = latestVersion.get();
+      isLastSyncedPipeTaskInfoEmpty = pipeMetaKeeper.isEmpty();
+    }
+
+    public boolean canSkipNextSync() {
+      return isLastSyncedPipeTaskInfoEmpty
+          && pipeMetaKeeper.isEmpty()
+          && lastSyncedVersion == latestVersion.get();
+    }
+  }
+
+  /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock}. */
+  public void updateLastSyncedVersion() {
+    pipeTaskInfoVersion.updateLastSyncedVersion();
+  }
+
+  public boolean canSkipNextSync() {
+    return pipeTaskInfoVersion.canSkipNextSync();
+  }
+
   /////////////////////////////// Validator ///////////////////////////////
 
   public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws 
PipeException {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 07196c23e1b..841aff6c905 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -141,6 +142,13 @@ public abstract class AbstractOperatePipeProcedureV2
       LOGGER.warn("ProcedureId {} release lock. No need to release pipe 
lock.", getProcId());
     } else {
       LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.", 
getProcId());
+      if (this instanceof PipeMetaSyncProcedure) {
+        configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .updateLastSyncedVersion();
+      }
       
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
       pipeTaskInfo = null;
     }

Reply via email to