This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd3726f384a Avoid to roll back the state imminently and resolve the
NoSuchElementException (#17363)
fd3726f384a is described below
commit fd3726f384ab753f69d0cba66d9f98f615249073
Author: libo <[email protected]>
AuthorDate: Thu Mar 26 09:41:09 2026 +0800
Avoid to roll back the state imminently and resolve the
NoSuchElementException (#17363)
---
.../DataPartitionTableIntegrityCheckProcedure.java | 28 ++++++++++++++++++----
.../impl/DataNodeInternalRPCServiceImpl.java | 3 +++
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 0ff1ec91acd..5f67355b0cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -90,6 +90,9 @@ public class DataPartitionTableIntegrityCheckProcedure
// how long to check all datanode are alive, the unit is ms
private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000;
+ // how long to roll back the next state, the unit is ms
+ private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000;
+
NodeManager dataNodeManager;
private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
@@ -276,7 +279,8 @@ public class DataPartitionTableIntegrityCheckProcedure
}
if (failedDataNodes.size() == allDataNodes.size()) {
-
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ delayRollbackNextState(
+
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
} else {
setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS);
}
@@ -439,7 +443,8 @@ public class DataPartitionTableIntegrityCheckProcedure
}
if (failedDataNodes.size() == allDataNodes.size()) {
-
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ delayRollbackNextState(
+
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
}
@@ -524,7 +529,8 @@ public class DataPartitionTableIntegrityCheckProcedure
// Don't find any one data partition table generation task on all
registered DataNodes, go back
// to the REQUEST_PARTITION_TABLES step and re-execute
if (failedDataNodes.size() == allDataNodes.size()) {
-
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
+ delayRollbackNextState(
+
DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
return Flow.HAS_MORE_STATE;
}
@@ -554,7 +560,8 @@ public class DataPartitionTableIntegrityCheckProcedure
if (dataPartitionTables.isEmpty()) {
LOG.error(
"[DataPartitionIntegrity] No DataPartitionTables to merge,
dataPartitionTables is empty");
-
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ delayRollbackNextState(
+
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
}
@@ -675,7 +682,8 @@ public class DataPartitionTableIntegrityCheckProcedure
if (!failedDataNodes.isEmpty()) {
allDataNodes.removeAll(failedDataNodes);
skipDataNodes = new HashSet<>(allDataNodes);
-
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ delayRollbackNextState(
+
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
return Flow.HAS_MORE_STATE;
} else {
skipDataNodes.clear();
@@ -683,6 +691,16 @@ public class DataPartitionTableIntegrityCheckProcedure
}
}
+ /** Delay to jump to next state, avoid write raft logs frequently when
exception occur */
+ private void
delayRollbackNextState(DataPartitionTableIntegrityCheckProcedureState state) {
+ sleep(
+ ROLL_BACK_NEXT_STATE_INTERVAL,
+ String.format(
+ "[DataPartitionIntegrity] Error waiting for roll back the %s state
due to thread interruption.",
+ state));
+ setNextState(state);
+ }
+
@Override
public void serialize(final DataOutputStream stream) throws IOException {
super.serialize(stream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index e2bfe4f6ad9..3aa7aad7e4f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -3377,6 +3377,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
Set<Long> timePartitionIds = tsFileManager.getTimePartitions();
+ if (timePartitionIds.isEmpty()) {
+ return;
+ }
final long earliestTimeSlotId =
Collections.min(timePartitionIds);
earliestTimeslots.compute(
databaseName,