This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 27637120a72 Pipe: skip pipe CN meta sync after successful
synchronization when having no pipe (#11532)
27637120a72 is described below
commit 27637120a72e66f680f48c8f8ac7222863f77f2b
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Nov 19 22:26:27 2023 +0800
Pipe: skip pipe CN meta sync after successful synchronization when having
no pipe (#11532)
In order to fix the issue that `sync` operation logs are still printed when
no pipe exists in CN, like:
```text
2023-11-13 21:01:36,412 [ProcExecWorker-1] INFO
o.a.i.c.p.ProcedureExecutor:394 - pid=1, state=SUCCESS;
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure
finished in 55ms successfully.
```
a simple idea is to, in the next round of `sync` after a successful `sync`
operation, check whether the current version of `PipeTaskInfo` is the same as
the version synced before (empty `PipeTaskInfo`), and whether the current
`PipeTaskInfo` is empty (indicating no need for a restart).
In terms of specific implementation:
1. Maintain the version of `PipeTaskInfo` synced by CN in the last round.
In this context, the version is considered as the number of times the
`PipeTaskInfo` write lock is acquired.
- If a restart occurs, it is the version after
`PipeHandleMetaChangeProcedure`; otherwise, it is the version after
`PipeMetaSyncProcedure`.
- Update in the lock release phase of these procedures to ensure that
the recorded version at this point is free from concurrency issues.
- Considering that `PipeHandleMetaChangeProcedure` may also be
submitted by other logic, in order to avoid introducing new variables that may
affect the serialization and deserialization logic in
`PipeHandleMetaChangeProcedure`, we only update the version of `PipeTaskInfo`
synced by `PipeMetaSyncProcedure` (partially affecting the liveness of
**skipping sync** but not affecting correctness).
2. Maintain whether the last sync was successful in `PipeMetaSyncer`.
- To check if the `sync` operation succeeded, it is also necessary to
block and wait for the execution result of `PipeHandleMetaChangeProcedure`
(refer to #11475).
Since querying `PipeTaskInfo` before each round of `sync` operations is
done without locking, there may be other procedures modifying `PipeTaskInfo` at
this time. After verification, it is confirmed that this only partially affects
the liveness of **sync** but does not affect correctness.
---
.../iotdb/confignode/manager/ProcedureManager.java | 21 +++++++++
.../manager/pipe/runtime/PipeMetaSyncer.java | 52 +++++++++++++++++-----
.../manager/pipe/task/PipeTaskCoordinator.java | 9 ++++
.../confignode/persistence/pipe/PipeTaskInfo.java | 47 +++++++++++++++++++
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 8 ++++
5 files changed, 127 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 290f89edbaf..af3adc40c99 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -839,6 +839,27 @@ public class ProcedureManager {
}
}
+ public TSStatus pipeHandleMetaChangeWithBlock(
+ boolean needWriteConsensusOnConfigNodes, boolean
needPushPipeMetaToDataNodes) {
+ try {
+ final long procedureId =
+ executor.submitProcedure(
+ new PipeHandleMetaChangeProcedure(
+ needWriteConsensusOnConfigNodes,
needPushPipeMetaToDataNodes));
+ final List<TSStatus> statusList = new ArrayList<>();
+ final boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (Exception e) {
+ return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
public TSStatus pipeMetaSync() {
try {
final long procedureId = executor.submitProcedure(new
PipeMetaSyncProcedure());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index c6047e3684f..af5da32dc08 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -59,6 +59,12 @@ public class PipeMetaSyncer {
private final boolean pipeAutoRestartEnabled =
PipeConfig.getInstance().getPipeAutoRestartEnabled();
+ // This variable is used to record whether the last sync operation was
successful. If successful,
+ // before the next sync operation, it can be determined based on
PipeTaskInfoVersion whether the
+ // pipe metadata in DN and the latest pipe metadata in CN have been
synchronized and whether there
+ // is a pipe task in the current cluster, thereby skipping unnecessary skip
operations.
+ private boolean isLastPipeSyncSuccessful = false;
+
PipeMetaSyncer(ConfigManager configManager) {
this.configManager = configManager;
}
@@ -87,6 +93,13 @@ public class PipeMetaSyncer {
}
private synchronized void sync() {
+ if (isLastPipeSyncSuccessful
+ &&
configManager.getPipeManager().getPipeTaskCoordinator().canSkipNextSync()) {
+ return;
+ }
+
+ isLastPipeSyncSuccessful = false;
+
if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
"PipeTaskCoordinatorLock is held by another thread, skip this round
of sync to avoid procedure and rpc accumulation as much as possible");
@@ -104,16 +117,34 @@ public class PipeMetaSyncer {
pipeAutoRestartRoundCounter.set(0);
}
- final TSStatus status = procedureManager.pipeMetaSync();
+ final TSStatus metaSyncStatus = procedureManager.pipeMetaSync();
- if (somePipesNeedRestarting
- && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- handleSuccessfulRestartWithLock();
- procedureManager.pipeHandleMetaChange(true, false);
- }
+ if (metaSyncStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ boolean successfulSync = !somePipesNeedRestarting;
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- procedureManager.pipeHandleMetaChange(true, true);
+ if (somePipesNeedRestarting) {
+ final boolean isRestartSuccessful = handleSuccessfulRestartWithLock();
+ final TSStatus handleMetaChangeStatus =
+ procedureManager.pipeHandleMetaChangeWithBlock(true, false);
+ if (isRestartSuccessful
+ && handleMetaChangeStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ successfulSync = true;
+ }
+ }
+
+ if (successfulSync) {
+ LOGGER.info(
+ "After this successful sync, if PipeTaskInfo is empty during this
sync and has not been modified afterwards, all subsequent syncs will be
skipped");
+ isLastPipeSyncSuccessful = true;
+ }
+ } else {
+ LOGGER.warn("Failed to sync pipe meta. Result status: {}.",
metaSyncStatus);
+ final TSStatus handleMetaChangeStatus =
+ procedureManager.pipeHandleMetaChangeWithBlock(true, true);
+ if (handleMetaChangeStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to handle pipe meta change. Result status: {}.",
handleMetaChangeStatus);
+ }
}
}
@@ -139,15 +170,16 @@ public class PipeMetaSyncer {
}
}
- private void handleSuccessfulRestartWithLock() {
+ private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for handling successful
restart.");
- return;
+ return false;
}
try {
pipeTaskInfo.get().handleSuccessfulRestart();
+ return true;
} finally {
configManager.getPipeManager().getPipeTaskCoordinator().unlock();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 25d5911478a..062b626be14 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -170,4 +170,13 @@ public class PipeTaskCoordinator {
public boolean hasAnyPipe() {
return !pipeTaskInfo.isEmpty();
}
+
+ /** Caller should ensure that the method is called in the write lock of
{@link #pipeTaskInfo}. */
+ public void updateLastSyncedVersion() {
+ pipeTaskInfo.updateLastSyncedVersion();
+ }
+
+ public boolean canSkipNextSync() {
+ return pipeTaskInfo.canSkipNextSync();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 4fa9de14deb..cd2c663727c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,6 +54,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -64,8 +65,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
private final PipeMetaKeeper pipeMetaKeeper;
+ // Pure in-memory object, not involved in snapshot serialization and
deserialization.
+ private final PipeTaskInfoVersion pipeTaskInfoVersion;
+
public PipeTaskInfo() {
this.pipeMetaKeeper = new PipeMetaKeeper();
+ this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
}
/////////////////////////////// Lock ///////////////////////////////
@@ -80,12 +85,54 @@ public class PipeTaskInfo implements SnapshotProcessor {
private void acquireWriteLock() {
pipeMetaKeeper.acquireWriteLock();
+ // We use the number of times obtaining the write lock of PipeMetaKeeper
as the version number
+ // of PipeTaskInfo.
+ pipeTaskInfoVersion.increaseLatestVersion();
}
private void releaseWriteLock() {
pipeMetaKeeper.releaseWriteLock();
}
+ /////////////////////////////// Version ///////////////////////////////
+
+ private class PipeTaskInfoVersion {
+
+ private final AtomicLong latestVersion;
+ private long lastSyncedVersion;
+ private boolean isLastSyncedPipeTaskInfoEmpty;
+
+ public PipeTaskInfoVersion() {
+ this.latestVersion = new AtomicLong(0);
+ this.lastSyncedVersion = 0;
+ this.isLastSyncedPipeTaskInfoEmpty = false;
+ }
+
+ public void increaseLatestVersion() {
+ latestVersion.incrementAndGet();
+ }
+
+ public void updateLastSyncedVersion() {
+ lastSyncedVersion = latestVersion.get();
+ isLastSyncedPipeTaskInfoEmpty = pipeMetaKeeper.isEmpty();
+ }
+
+ public boolean canSkipNextSync() {
+ return isLastSyncedPipeTaskInfoEmpty
+ && pipeMetaKeeper.isEmpty()
+ && lastSyncedVersion == latestVersion.get();
+ }
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock}. */
+ public void updateLastSyncedVersion() {
+ pipeTaskInfoVersion.updateLastSyncedVersion();
+ }
+
+ public boolean canSkipNextSync() {
+ return pipeTaskInfoVersion.canSkipNextSync();
+ }
+
/////////////////////////////// Validator ///////////////////////////////
public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws
PipeException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 07196c23e1b..841aff6c905 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -141,6 +142,13 @@ public abstract class AbstractOperatePipeProcedureV2
LOGGER.warn("ProcedureId {} release lock. No need to release pipe
lock.", getProcId());
} else {
LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.",
getProcId());
+ if (this instanceof PipeMetaSyncProcedure) {
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .updateLastSyncedVersion();
+ }
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
pipeTaskInfo = null;
}