This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcc15a3473 Pipe/Subscription: avoid executing 
`rollbackFromValidateTask` or `rollbackFromValidate` multiple times in retry 
with rollback scenarios (#13825)
6fcc15a3473 is described below

commit 6fcc15a347365a7507081f81ae9e8e2b07e16564
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Oct 25 16:15:38 2024 +0800

    Pipe/Subscription: avoid executing `rollbackFromValidateTask` or 
`rollbackFromValidate` multiple times in retry with rollback scenarios (#13825)
---
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 18 ++++++++++-----
 .../AbstractOperateSubscriptionProcedure.java      | 26 ++++++++++++++--------
 2 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 1963d520d3f..ec0a33ee677 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -83,6 +83,11 @@ public abstract class AbstractOperatePipeProcedureV2
   // Only used in rollback to reduce the number of network calls
   protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
 
+  // Only used in rollback to avoid executing rollbackFromValidateTask 
multiple times
+  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
+  // TODO: consider serializing this variable later
+  protected boolean isRollbackFromValidateTaskSuccessful = false;
+
   // This variable should not be serialized into procedure store,
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
@@ -298,10 +303,13 @@ public abstract class AbstractOperatePipeProcedureV2
 
     switch (state) {
       case VALIDATE_TASK:
-        try {
-          rollbackFromValidateTask(env);
-        } catch (Exception e) {
-          LOGGER.warn("ProcedureId {}: Failed to rollback from validate 
task.", getProcId(), e);
+        if (!isRollbackFromValidateTaskSuccessful) {
+          try {
+            rollbackFromValidateTask(env);
+            isRollbackFromValidateTaskSuccessful = true;
+          } catch (Exception e) {
+            LOGGER.warn("ProcedureId {}: Failed to rollback from validate 
task.", getProcId(), e);
+          }
         }
         break;
       case CALCULATE_INFO_FOR_TASK:
@@ -330,7 +338,7 @@ public abstract class AbstractOperatePipeProcedureV2
         break;
       case OPERATE_ON_DATA_NODES:
         try {
-          // We have to make sure that rollbackFromOperateOnDataNodes is 
executed before
+          // We have to make sure that rollbackFromOperateOnDataNodes is 
executed after
           // rollbackFromWriteConfigNodeConsensus, because 
rollbackFromOperateOnDataNodes is
           // executed based on the consensus of config nodes that is written by
           // rollbackFromWriteConfigNodeConsensus
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 4566738cd8d..e661f3b3d3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -59,6 +59,11 @@ public abstract class AbstractOperateSubscriptionProcedure
 
   private static final int RETRY_THRESHOLD = 1;
 
+  // Only used in rollback to avoid executing rollbackFromValidate multiple 
times
+  // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
+  // TODO: consider serializing this variable later
+  protected boolean isRollbackFromValidateSuccessful = false;
+
   protected AtomicReference<SubscriptionInfo> subscriptionInfo;
 
   protected AtomicReference<SubscriptionInfo> acquireLockInternal(
@@ -250,15 +255,18 @@ public abstract class AbstractOperateSubscriptionProcedure
 
     switch (state) {
       case VALIDATE:
-        try {
-          rollbackFromValidate(env);
-        } catch (Exception e) {
-          LOGGER.warn(
-              "ProcedureId {}: Failed to rollback from state [{}], because {}",
-              getProcId(),
-              state,
-              e.getMessage(),
-              e);
+        if (!isRollbackFromValidateSuccessful) {
+          try {
+            rollbackFromValidate(env);
+            isRollbackFromValidateSuccessful = true;
+          } catch (Exception e) {
+            LOGGER.warn(
+                "ProcedureId {}: Failed to rollback from state [{}], because 
{}",
+                getProcId(),
+                state,
+                e.getMessage(),
+                e);
+          }
         }
         break;
       case OPERATE_ON_CONFIG_NODES:

Reply via email to