This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6fcc15a3473 Pipe/Subscription: avoid executing
`rollbackFromValidateTask` or `rollbackFromValidate` multiple times in retry
with rollback scenarios (#13825)
6fcc15a3473 is described below
commit 6fcc15a347365a7507081f81ae9e8e2b07e16564
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Oct 25 16:15:38 2024 +0800
Pipe/Subscription: avoid executing `rollbackFromValidateTask` or
`rollbackFromValidate` multiple times in retry with rollback scenarios (#13825)
---
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 18 ++++++++++-----
.../AbstractOperateSubscriptionProcedure.java | 26 ++++++++++++++--------
2 files changed, 30 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 1963d520d3f..ec0a33ee677 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -83,6 +83,11 @@ public abstract class AbstractOperatePipeProcedureV2
// Only used in rollback to reduce the number of network calls
protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
+ // Only used in rollback to avoid executing rollbackFromValidateTask
multiple times
+ // Pure in-memory object, not involved in snapshot serialization and
deserialization.
+ // TODO: consider serializing this variable later
+ protected boolean isRollbackFromValidateTaskSuccessful = false;
+
// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
@@ -298,10 +303,13 @@ public abstract class AbstractOperatePipeProcedureV2
switch (state) {
case VALIDATE_TASK:
- try {
- rollbackFromValidateTask(env);
- } catch (Exception e) {
- LOGGER.warn("ProcedureId {}: Failed to rollback from validate
task.", getProcId(), e);
+ if (!isRollbackFromValidateTaskSuccessful) {
+ try {
+ rollbackFromValidateTask(env);
+ isRollbackFromValidateTaskSuccessful = true;
+ } catch (Exception e) {
+ LOGGER.warn("ProcedureId {}: Failed to rollback from validate
task.", getProcId(), e);
+ }
}
break;
case CALCULATE_INFO_FOR_TASK:
@@ -330,7 +338,7 @@ public abstract class AbstractOperatePipeProcedureV2
break;
case OPERATE_ON_DATA_NODES:
try {
- // We have to make sure that rollbackFromOperateOnDataNodes is
executed before
+ // We have to make sure that rollbackFromOperateOnDataNodes is
executed after
// rollbackFromWriteConfigNodeConsensus, because
rollbackFromOperateOnDataNodes is
// executed based on the consensus of config nodes that is written by
// rollbackFromWriteConfigNodeConsensus
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 4566738cd8d..e661f3b3d3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -59,6 +59,11 @@ public abstract class AbstractOperateSubscriptionProcedure
private static final int RETRY_THRESHOLD = 1;
+ // Only used in rollback to avoid executing rollbackFromValidate multiple
times
+ // Pure in-memory object, not involved in snapshot serialization and
deserialization.
+ // TODO: consider serializing this variable later
+ protected boolean isRollbackFromValidateSuccessful = false;
+
protected AtomicReference<SubscriptionInfo> subscriptionInfo;
protected AtomicReference<SubscriptionInfo> acquireLockInternal(
@@ -250,15 +255,18 @@ public abstract class AbstractOperateSubscriptionProcedure
switch (state) {
case VALIDATE:
- try {
- rollbackFromValidate(env);
- } catch (Exception e) {
- LOGGER.warn(
- "ProcedureId {}: Failed to rollback from state [{}], because {}",
- getProcId(),
- state,
- e.getMessage(),
- e);
+ if (!isRollbackFromValidateSuccessful) {
+ try {
+ rollbackFromValidate(env);
+ isRollbackFromValidateSuccessful = true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ "ProcedureId {}: Failed to rollback from state [{}], because
{}",
+ getProcId(),
+ state,
+ e.getMessage(),
+ e);
+ }
}
break;
case OPERATE_ON_CONFIG_NODES: