This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 402abdd0071 Automatically repair lost data partitions (#17279)
402abdd0071 is described below
commit 402abdd00717590f943ef663117e6b8137d7c12e
Author: libo <[email protected]>
AuthorDate: Fri Mar 20 23:24:44 2026 +0800
Automatically repair lost data partitions (#17279)
* Come true all rpc interfaces in the DataNode, and partial features in the
DataPartitionTableIntegrityCheckProcedure.
* Debugged and verified all key logic in the procedure.
* Correct dataPartitionTables and finalDataPartitionTable serialization;
Adjust method that record the earliest timeslot id for every database
* Correct heartbeat logic when data partition table is generating;
Remove two unit testes only run successful in local environment
* Use StorageEngine.getInstance().getAllDataRegions() to get Data Partition
Information instead of scanning data directories in the DataNode;
Correct the logic that retry after the step failed;
Correct skipDataNodes and failedDataNodes serialization and deserialization.
* Adjust the default value is 1 min
* Adjust the default value is 1 min
* Append a description about the unit
* use the spotless command to format code
* Avoid writing duplicate values
* Fix bug when get no data partition table in the ConfigNode.
* Add a license description.
* mvn spotless:apply
* Fix problems based on review opinions
* Remove some unuseful functions
* Find the earliest time slot in the ConfigNode
* Resolve the problem that data partition table generation is not completed
in the progress caused by rpc timeout
* Correct need to redirect the target step.
* Integrate the merge logic into the DataPartitionTable class
* Support multiple databases data partition tables are repaired
* Fix bugs
* Fix
* Compare startTime between ConfigNode and DataNode, if the larger one is
from ConfigNode, the data partition table lost.
* Change to 10 seconds.
* Correct the input parameter name
* Changed to the local variable.
* Fix all opinions
* spotless code
* license
* Use our standard binary written function instead of stream.UTF();
The overflow problem still exist, that's no way to resolve, previously, the
user set timePartitionOrigin to Long.MIN_VALUE. In this case, adding it to
partitionId = -1 will indeed cause an overflow. However, the partition table in
the system only accepts timestamps of the long type and does not support bigint
timestamps. Therefore, if an overflow actually occurs, we have to accept the
outcome where the program is interrupted by an exception being thrown.
* Fix some opinions
* Fix another lots of opinions
* Optimize codes
* Skip current loop when no time slot info is found in the seq and unseq
directory
* Fix some opinions
* Use the Google guava RateLimiter instead of LeakyBucketRateLimiter;
Adjust new method to compute progress of data partition table generation
* Fix
* Fix bug
* Fix serialize bugs
---
.../client/sync/CnToDnSyncRequestType.java | 5 +
.../client/sync/SyncDataNodeClientPool.java | 10 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 11 +
.../confignode/conf/ConfigNodeDescriptor.java | 6 +
.../iotdb/confignode/manager/ProcedureManager.java | 11 +
.../impl/partition/ConfigNodeProcedureEnv.java} | 42 +-
.../DataPartitionTableIntegrityCheckProcedure.java | 954 +++++++++++++++++++++
...artitionTableIntegrityCheckProcedureState.java} | 46 +-
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 5 +-
.../iotdb/confignode/service/ConfigNode.java | 58 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +
.../db/partition/DataPartitionTableGenerator.java | 277 ++++++
.../iotdb/db/protocol/thrift/OperationType.java | 5 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 314 ++++++-
.../dataregion/tsfile/TsFileResource.java | 5 +
.../tsfile/timeindex/ArrayDeviceTimeIndex.java | 7 +
.../dataregion/tsfile/timeindex/FileTimeIndex.java | 41 +
.../dataregion/tsfile/timeindex/ITimeIndex.java | 8 +
.../conf/iotdb-system.properties.template | 18 +
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../enums/DataPartitionTableGeneratorState.java} | 55 +-
.../commons/partition/DataPartitionTable.java | 42 +
.../DatabaseScopedDataPartitionTable.java | 102 +++
.../commons/partition/SeriesPartitionTable.java | 16 +-
.../org/apache/iotdb/commons/utils/IOUtils.java | 35 +
.../iotdb/commons/utils/TimePartitionUtils.java | 4 +
.../src/main/thrift/datanode.thrift | 54 ++
29 files changed, 2078 insertions(+), 94 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
index 4055398ddb7..790fd637d61 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
@@ -37,6 +37,11 @@ public enum CnToDnSyncRequestType {
DELETE_OLD_REGION_PEER,
RESET_PEER_LIST,
+ // Data Partition Table Maintenance
+ COLLECT_EARLIEST_TIMESLOTS,
+ GENERATE_DATA_PARTITION_TABLE,
+ GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+
// PartitionCache
INVALIDATE_PARTITION_CACHE,
INVALIDATE_PERMISSION_CACHE,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index d63d5a74f60..9f5729ef06d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TKillQueryInstanceReq;
@@ -139,6 +140,15 @@ public class SyncDataNodeClientPool {
actionMapBuilder.put(
CnToDnSyncRequestType.SHOW_APPLIED_CONFIGURATIONS,
(req, client) -> client.showAppliedConfigurations());
+ actionMapBuilder.put(
+ CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+ (req, client) -> client.getEarliestTimeslots());
+ actionMapBuilder.put(
+ CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
+ (req, client) ->
client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req));
+ actionMapBuilder.put(
+ CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+ (req, client) -> client.generateDataPartitionTableHeartbeat());
actionMap = actionMapBuilder.build();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3abb322d084..59b318a4b11 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -319,6 +319,8 @@ public class ConfigNodeConfig {
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+ private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000;
+
public ConfigNodeConfig() {
// empty constructor
}
@@ -1286,4 +1288,13 @@ public class ConfigNodeConfig {
public void setFailureDetectorPhiAcceptablePauseInMs(long
failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs =
failureDetectorPhiAcceptablePauseInMs;
}
+
+ public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() {
+ return partitionTableRecoverWaitAllDnUpTimeoutInMs;
+ }
+
+ public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
+ long partitionTableRecoverWaitAllDnUpTimeoutInMs) {
+ this.partitionTableRecoverWaitAllDnUpTimeoutInMs =
partitionTableRecoverWaitAllDnUpTimeoutInMs;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 77790dae1a9..9f8206f5dd7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -322,6 +322,12 @@ public class ConfigNodeDescriptor {
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
+ conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "partition_table_recover_wait_all_dn_up_timeout_ms",
+
String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs()))));
+
String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 646aaf66daf..1a69044d37d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -67,6 +67,7 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
@@ -1374,6 +1375,16 @@ public class ProcedureManager {
}
}
+ /** Used to repair the lost data partition table */
+ public TSStatus dataPartitionTableIntegrityCheck() {
+ DataPartitionTableIntegrityCheckProcedure procedure;
+ synchronized (this) {
+ procedure = new DataPartitionTableIntegrityCheckProcedure();
+ executor.submitProcedure(procedure);
+ }
+ return waitingProcedureFinished(procedure);
+ }
+
/**
* Generate {@link CreateTriggerProcedure} and wait until it finished.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java
similarity index 56%
copy from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
copy to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java
index 4055398ddb7..c1ebd7ffccd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java
@@ -17,37 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.confignode.client.sync;
+package org.apache.iotdb.confignode.procedure.impl.partition;
-public enum CnToDnSyncRequestType {
- // Node Maintenance
- CLEAN_DATA_NODE_CACHE,
- STOP_AND_CLEAR_DATA_NODE,
- SET_SYSTEM_STATUS,
- SHOW_CONFIGURATION,
- SHOW_APPLIED_CONFIGURATIONS,
+import org.apache.iotdb.confignode.manager.ConfigManager;
- // Region Maintenance
- CREATE_DATA_REGION,
- CREATE_SCHEMA_REGION,
- DELETE_REGION,
- CREATE_NEW_REGION_PEER,
- ADD_REGION_PEER,
- REMOVE_REGION_PEER,
- DELETE_OLD_REGION_PEER,
- RESET_PEER_LIST,
-
- // PartitionCache
- INVALIDATE_PARTITION_CACHE,
- INVALIDATE_PERMISSION_CACHE,
- INVALIDATE_SCHEMA_CACHE,
+/**
+ * Environment object for ConfigNode procedures. Provides access to
ConfigManager and other
+ * necessary components.
+ */
+public class ConfigNodeProcedureEnv {
- // Template
- UPDATE_TEMPLATE,
+ private final ConfigManager configManager;
- // Schema
- KILL_QUERY_INSTANCE,
+ public ConfigNodeProcedureEnv(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
- // Table
- UPDATE_TABLE,
+ public ConfigManager getConfigManager() {
+ return configManager;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
new file mode 100644
index 00000000000..0ff1ec91acd
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -0,0 +1,954 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This
procedure scans all
+ * DataNodes to detect missing data partitions and restores the
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+ extends StateMachineProcedure<
+ ConfigNodeProcedureEnv,
DataPartitionTableIntegrityCheckProcedureState> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+ // how many times will retry after rpc request failed
+ private static final int MAX_RETRY_COUNT = 3;
+
+ // how long to start a heartbeat request, the unit is ms
+ private static final long HEART_BEAT_REQUEST_INTERVAL = 10000;
+
+ // how long to check all datanode are alive, the unit is ms
+ private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000;
+
+ NodeManager dataNodeManager;
+ private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+ // ============Need serialize BEGIN=============/
+ /** Collected earliest timeslots from DataNodes: database -> earliest
timeslot */
+ private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+ /** DataPartitionTables collected from DataNodes: dataNodeId -> <database,
DataPartitionTable> */
+ private Map<Integer, List<DatabaseScopedDataPartitionTable>>
dataPartitionTables =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Collect all database names that those database lost data partition, the
string in the Set
+ * collection is database name
+ */
+ private Set<String> databasesWithLostDataPartition = new HashSet<>();
+
+ /**
+ * Final merged DataPartitionTable for every database Map<String,
DataPartitionTable> key(String):
+ * database name
+ */
+ private Map<String, DataPartitionTable> finalDataPartitionTables;
+
+ private static Set<TDataNodeConfiguration> skipDataNodes =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private static Set<TDataNodeConfiguration> failedDataNodes =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ // ============Need serialize END=============/
+
+ public DataPartitionTableIntegrityCheckProcedure() {
+ super();
+ }
+
+ @Override
+ protected Flow executeFromState(
+ final ConfigNodeProcedureEnv env, final
DataPartitionTableIntegrityCheckProcedureState state)
+ throws InterruptedException {
+ try {
+ // Ensure to get the real-time DataNodes in the current cluster at every
step
+ dataNodeManager = env.getConfigManager().getNodeManager();
+ allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+ switch (state) {
+ case COLLECT_EARLIEST_TIMESLOTS:
+ failedDataNodes = new HashSet<>();
+ return collectEarliestTimeslots();
+ case ANALYZE_MISSING_PARTITIONS:
+ databasesWithLostDataPartition = new HashSet<>();
+ return analyzeMissingPartitions(env);
+ case REQUEST_PARTITION_TABLES:
+ return requestPartitionTables();
+ case REQUEST_PARTITION_TABLES_HEART_BEAT:
+ return requestPartitionTablesHeartBeat();
+ case MERGE_PARTITION_TABLES:
+ finalDataPartitionTables = new HashMap<>();
+ return mergePartitionTables(env);
+ case WRITE_PARTITION_TABLE_TO_CONSENSUS:
+ return writePartitionTableToConsensus(env);
+ default:
+ throw new ProcedureException("Unknown state: " + state);
+ }
+ } catch (Exception e) {
+ LOG.error("[DataPartitionIntegrity] Error executing state {}: {}",
state, e.getMessage(), e);
+ setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected void rollbackState(
+ final ConfigNodeProcedureEnv env, final
DataPartitionTableIntegrityCheckProcedureState state)
+ throws IOException, InterruptedException, ProcedureException {
+ // Cleanup resources
+ switch (state) {
+ case COLLECT_EARLIEST_TIMESLOTS:
+ earliestTimeslots.clear();
+ break;
+ case ANALYZE_MISSING_PARTITIONS:
+ databasesWithLostDataPartition.clear();
+ break;
+ case REQUEST_PARTITION_TABLES:
+ case REQUEST_PARTITION_TABLES_HEART_BEAT:
+ dataPartitionTables.clear();
+ break;
+ case MERGE_PARTITION_TABLES:
+ finalDataPartitionTables.clear();
+ break;
+ default:
+ allDataNodes.clear();
+ earliestTimeslots.clear();
+ dataPartitionTables.clear();
+ finalDataPartitionTables.clear();
+ throw new ProcedureException("Unknown state for rollback: " + state);
+ }
+ }
+
+ @Override
+ protected DataPartitionTableIntegrityCheckProcedureState getState(final int
stateId) {
+ return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(final
DataPartitionTableIntegrityCheckProcedureState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+ skipDataNodes = new HashSet<>();
+ failedDataNodes = new HashSet<>();
+ return
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+ }
+
+ /**
+ * Collect earliest timeslot information from all DataNodes. Each DataNode
returns a Map<String,
+ * Long> where key is database name and value is the earliest timeslot id.
+ */
+ private Flow collectEarliestTimeslots() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting earliest timeslots from all DataNodes...");
+ }
+
+ if (allDataNodes.isEmpty()) {
+ LOG.error(
+ "[DataPartitionIntegrity] No DataNodes registered, no way to collect
earliest timeslots, waiting for them to go up");
+ sleep(
+ CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL,
+ "[DataPartitionIntegrity] Error waiting for DataNode startup due to
thread interruption.");
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ // Collect earliest timeslots from all DataNodes
+ allDataNodes.removeAll(skipDataNodes);
+ for (TDataNodeConfiguration dataNode : allDataNodes) {
+ try {
+ TGetEarliestTimeslotsResp resp =
+ (TGetEarliestTimeslotsResp)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ null,
+ CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+ MAX_RETRY_COUNT);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to collected earliest timeslots
from the DataNode[id={}], response status is {}",
+ dataNode.getLocation().getDataNodeId(),
+ resp.getStatus());
+ continue;
+ }
+
+ Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot();
+
+ // Merge with existing timeslots (take minimum)
+ for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) {
+ earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Collected earliest timeslots from the DataNode[id={}]: {}",
+ dataNode.getLocation().getDataNodeId(),
+ nodeTimeslots);
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to collect earliest timeslots
from the DataNode[id={}]: {}",
+ dataNode.getLocation().getDataNodeId(),
+ e.getMessage(),
+ e);
+ failedDataNodes.add(dataNode);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Collected earliest timeslots from {} DataNodes: {}, the number of
successful DataNodes is {}",
+ allDataNodes.size(),
+ earliestTimeslots,
+ allDataNodes.size() - failedDataNodes.size());
+ }
+
+ if (failedDataNodes.size() == allDataNodes.size()) {
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ } else {
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ /**
+ * Analyze which data partitions are missing based on earliest timeslots.
Identify data partitions
+ * of databases need to be repaired.
+ */
+ private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Analyzing missing data partitions...");
+ }
+
+ if (earliestTimeslots.isEmpty()) {
+ LOG.warn(
+ "[DataPartitionIntegrity] No missing data partitions detected,
nothing needs to be repaired, terminating procedure");
+ return Flow.NO_MORE_STATE;
+ }
+
+ // Find all databases that have lost data partition tables
+ for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) {
+ String database = entry.getKey();
+ long earliestTimeslot = entry.getValue();
+
+ // Get current DataPartitionTable from ConfigManager
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
+ localDataPartitionTable = getLocalDataPartitionTable(env, database);
+
+ // Check if ConfigNode has a data partition that is associated with the
earliestTimeslot
+ if (localDataPartitionTable == null
+ || localDataPartitionTable.isEmpty()
+ || localDataPartitionTable.get(database) == null
+ || localDataPartitionTable.get(database).isEmpty()) {
+ databasesWithLostDataPartition.add(database);
+ LOG.warn(
+ "[DataPartitionIntegrity] No data partition table related to
database {} was found from the ConfigNode, and this issue needs to be repaired",
+ database);
+ continue;
+ }
+
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>
+ seriesPartitionMap = localDataPartitionTable.get(database);
+ for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>
+ seriesPartitionEntry : seriesPartitionMap.entrySet()) {
+ Map<TTimePartitionSlot, List<TConsensusGroupId>>
tTimePartitionSlotListMap =
+ seriesPartitionEntry.getValue();
+
+ if (tTimePartitionSlotListMap.isEmpty()) {
+ continue;
+ }
+
+ TTimePartitionSlot localEarliestSlot =
+ tTimePartitionSlotListMap.keySet().stream()
+
.min(Comparator.comparingLong(TTimePartitionSlot::getStartTime))
+ .orElse(null);
+
+ if (localEarliestSlot.getStartTime()
+ > TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot)) {
+ databasesWithLostDataPartition.add(database);
+ LOG.warn(
+ "[DataPartitionIntegrity] Database {} has lost timeslot {} in
its data table partition, and this issue needs to be repaired",
+ database,
+ earliestTimeslot);
+ }
+ }
+ }
+
+ if (databasesWithLostDataPartition.isEmpty()) {
+ LOG.info(
+ "[DataPartitionIntegrity] No databases have lost data partitions,
terminating procedure");
+ return Flow.NO_MORE_STATE;
+ }
+
+ LOG.info(
+ "[DataPartitionIntegrity] Identified {} databases have lost data
partitions, will request DataPartitionTable generation from {} DataNodes",
+ databasesWithLostDataPartition.size(),
+ allDataNodes.size() - failedDataNodes.size());
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
+ getLocalDataPartitionTable(final ConfigNodeProcedureEnv env, final
String database) {
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionTable =
+ env.getConfigManager()
+ .getSchemaPartition(Collections.singletonMap(database,
Collections.emptyList()))
+ .getSchemaPartitionTable();
+
+ // Construct request for getting data partition
+ final Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
partitionSlotsMap = new HashMap<>();
+ schemaPartitionTable.forEach(
+ (key, value) -> {
+ Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new
HashMap<>();
+ value
+ .keySet()
+ .forEach(
+ slot ->
+ slotListMap.put(
+ slot, new TTimeSlotList(Collections.emptyList(),
true, true)));
+ partitionSlotsMap.put(key, slotListMap);
+ });
+ final GetDataPartitionPlan getDataPartitionPlan = new
GetDataPartitionPlan(partitionSlotsMap);
+ return
env.getConfigManager().getDataPartition(getDataPartitionPlan).getDataPartitionTable();
+ }
+
+ /**
+ * Request DataPartitionTable generation from target DataNodes. Each
DataNode scans its tsfile
+ * resources and generates a DataPartitionTable.
+ */
+ private Flow requestPartitionTables() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Requesting DataPartitionTable generation from {} DataNodes...",
allDataNodes.size());
+ }
+
+ if (allDataNodes.isEmpty()) {
+ LOG.error(
+ "[DataPartitionIntegrity] No DataNodes registered, no way to
requested DataPartitionTable generation, terminating procedure");
+ sleep(
+ CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL,
+ "[DataPartitionIntegrity] Error waiting for DataNode startup due to
thread interruption.");
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ allDataNodes.removeAll(skipDataNodes);
+ allDataNodes.removeAll(failedDataNodes);
+ for (TDataNodeConfiguration dataNode : allDataNodes) {
+ int dataNodeId = dataNode.getLocation().getDataNodeId();
+ if (!dataPartitionTables.containsKey(dataNodeId)) {
+ try {
+ TGenerateDataPartitionTableReq req = new
TGenerateDataPartitionTableReq();
+ req.setDatabases(databasesWithLostDataPartition);
+ TGenerateDataPartitionTableResp resp =
+ (TGenerateDataPartitionTableResp)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ req,
+ CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
+ MAX_RETRY_COUNT);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to request DataPartitionTable
generation from the DataNode[id={}], response status is {}",
+ dataNode.getLocation().getDataNodeId(),
+ resp.getStatus());
+ }
+ } catch (Exception e) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to request DataPartitionTable
generation from DataNode[id={}]: {}",
+ dataNodeId,
+ e.getMessage(),
+ e);
+ }
+ }
+ }
+
+ if (failedDataNodes.size() == allDataNodes.size()) {
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ setNextState(
+
DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow requestPartitionTablesHeartBeat() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking DataPartitionTable generation completion status...");
+ }
+
+ int completeCount = 0;
+ for (TDataNodeConfiguration dataNode : allDataNodes) {
+ int dataNodeId = dataNode.getLocation().getDataNodeId();
+
+ if (!dataPartitionTables.containsKey(dataNodeId)) {
+ try {
+ TGenerateDataPartitionTableHeartbeatResp resp =
+ (TGenerateDataPartitionTableHeartbeatResp)
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ null,
+
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+ MAX_RETRY_COUNT);
+ DataPartitionTableGeneratorState state =
+
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());
+
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to request DataPartitionTable
generation heart beat from the DataNode[id={}], state is {}, response status is
{}",
+ dataNode.getLocation().getDataNodeId(),
+ state,
+ resp.getStatus());
+ continue;
+ }
+
+ switch (state) {
+ case SUCCESS:
+ List<ByteBuffer> byteBufferList =
resp.getDatabaseScopedDataPartitionTables();
+ List<DatabaseScopedDataPartitionTable>
databaseScopedDataPartitionTableList =
+ deserializeDatabaseScopedTableList(byteBufferList);
+ dataPartitionTables.put(dataNodeId,
databaseScopedDataPartitionTableList);
+ LOG.info(
+ "[DataPartitionIntegrity] DataNode {} completed
DataPartitionTable generation, terminating heart beat",
+ dataNodeId);
+ completeCount++;
+ break;
+ case IN_PROGRESS:
+ LOG.info(
+ "[DataPartitionIntegrity] DataNode {} still generating
DataPartitionTable",
+ dataNodeId);
+ break;
+ default:
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] DataNode {} returned unknown error
code: {}",
+ dataNodeId,
+ resp.getErrorCode());
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "[DataPartitionIntegrity] Error checking DataPartitionTable
status from DataNode {}: {}, terminating heart beat",
+ dataNodeId,
+ e.getMessage(),
+ e);
+ completeCount++;
+ }
+ } else {
+ completeCount++;
+ }
+ }
+
+ if (completeCount >= allDataNodes.size()) {
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ // Don't find any one data partition table generation task on all
registered DataNodes, go back
+ // to the REQUEST_PARTITION_TABLES step and re-execute
+ if (failedDataNodes.size() == allDataNodes.size()) {
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ sleep(
+ HEART_BEAT_REQUEST_INTERVAL,
+ "[DataPartitionIntegrity] Error checking DataPartitionTable status due
to thread interruption.");
+ setNextState(
+
DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private static void sleep(long intervalTime, String logMessage) {
+ try {
+ Thread.sleep(intervalTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error(logMessage);
+ }
+ }
+
+ /** Merge DataPartitionTables from all DataNodes into a final table. */
+ private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merging DataPartitionTables from {} DataNodes...",
dataPartitionTables.size());
+ }
+
+ if (dataPartitionTables.isEmpty()) {
+ LOG.error(
+ "[DataPartitionIntegrity] No DataPartitionTables to merge,
dataPartitionTables is empty");
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ for (String database : databasesWithLostDataPartition) {
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> finalDataPartitionMap =
new HashMap<>();
+
+ // Get current DataPartitionTable from ConfigManager
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
+ localDataPartitionTableMap = getLocalDataPartitionTable(env,
database);
+
+ // Check if ConfigNode has a data partition that is associated with the
earliestTimeslot
+ if (localDataPartitionTableMap == null
+ || localDataPartitionTableMap.isEmpty()
+ || localDataPartitionTableMap.get(database) == null
+ || localDataPartitionTableMap.get(database).isEmpty()) {
+ LOG.warn(
+ "[DataPartitionIntegrity] No data partition table related to
database {} was found from the ConfigNode, use data partition table of DataNode
directly",
+ database);
+ continue;
+ }
+
+ localDataPartitionTableMap
+ .values()
+ .forEach(
+ map ->
+ map.forEach(
+ (tSeriesPartitionSlot, seriesPartitionTableMap) -> {
+ if (tSeriesPartitionSlot == null
+ || seriesPartitionTableMap == null
+ || seriesPartitionTableMap.isEmpty()) {
+ return;
+ }
+ finalDataPartitionMap.computeIfAbsent(
+ tSeriesPartitionSlot,
+ k -> new
SeriesPartitionTable(seriesPartitionTableMap));
+ }));
+
+ dataPartitionTables.forEach(
+ (k, v) ->
+ v.forEach(
+ databaseScopedDataPartitionTable -> {
+ if
(!databaseScopedDataPartitionTable.getDatabase().equals(database)) {
+ return;
+ }
+ finalDataPartitionTables.put(
+ database,
+ new DataPartitionTable(finalDataPartitionMap)
+
.merge(databaseScopedDataPartitionTable.getDataPartitionTable()));
+ }));
+ }
+
+ LOG.info("[DataPartitionIntegrity] DataPartitionTables merge completed
successfully");
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.WRITE_PARTITION_TABLE_TO_CONSENSUS);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ /** Write the final DataPartitionTable to consensus log. */
+ private Flow writePartitionTableToConsensus(final ConfigNodeProcedureEnv
env) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing DataPartitionTable to consensus log...");
+ }
+
+ if (databasesWithLostDataPartition.isEmpty()) {
+ LOG.error("[DataPartitionIntegrity] No database lost data partition
table");
+ setFailure(
+ "DataPartitionTableIntegrityCheckProcedure",
+ new ProcedureException("No database lost data partition table for
consensus write"));
+ return getFlow();
+ }
+
+ if (finalDataPartitionTables.isEmpty()) {
+ LOG.error("[DataPartitionIntegrity] DataPartitionTable to write to
consensus");
+ setFailure(
+ "DataPartitionTableIntegrityCheckProcedure",
+ new ProcedureException("No DataPartitionTable available for
consensus write"));
+ return getFlow();
+ }
+
+ int failedCnt = 0;
+ final int maxRetryCountForConsensus = 3;
+ while (failedCnt < maxRetryCountForConsensus) {
+ try {
+ CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ Map<String, DataPartitionTable> assignedDataPartition = new
HashMap<>();
+ for (String database : databasesWithLostDataPartition) {
+ assignedDataPartition.put(database,
finalDataPartitionTables.get(database));
+ }
+ createPlan.setAssignedDataPartition(assignedDataPartition);
+ TSStatus tsStatus =
env.getConfigManager().getConsensusManager().write(createPlan);
+
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ LOG.info(
+ "[DataPartitionIntegrity] DataPartitionTable successfully
written to consensus log");
+ break;
+ } else {
+ LOG.error("[DataPartitionIntegrity] Failed to write
DataPartitionTable to consensus log");
+ setFailure(
+ "DataPartitionTableIntegrityCheckProcedure",
+ new ProcedureException("Failed to write DataPartitionTable to
consensus log"));
+ }
+ } catch (Exception e) {
+ LOG.error("[DataPartitionIntegrity] Error writing DataPartitionTable
to consensus log", e);
+ setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+ }
+ failedCnt++;
+ }
+
+ return getFlow();
+ }
+
+ /**
+ * Determine whether there are still DataNode nodes with failed execution of
a certain step in
+ * this round. If such nodes exist, calculate the skipDataNodes and exclude
these nodes when
+ * requesting the list of DataNode nodes in the cluster for the next round;
if no such nodes
+ * exist, it means the procedure has been completed
+ */
+ private Flow getFlow() {
+ if (!failedDataNodes.isEmpty()) {
+ allDataNodes.removeAll(failedDataNodes);
+ skipDataNodes = new HashSet<>(allDataNodes);
+
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+ return Flow.HAS_MORE_STATE;
+ } else {
+ skipDataNodes.clear();
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ public void serialize(final DataOutputStream stream) throws IOException {
+ super.serialize(stream);
+
+ // Serialize earliestTimeslots
+ stream.writeInt(earliestTimeslots.size());
+ for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ stream.writeLong(entry.getValue());
+ }
+
+ // Serialize dataPartitionTables count
+ stream.writeInt(dataPartitionTables.size());
+ for (Map.Entry<Integer, List<DatabaseScopedDataPartitionTable>> entry :
+ dataPartitionTables.entrySet()) {
+ stream.writeInt(entry.getKey());
+
+ List<DatabaseScopedDataPartitionTable> tableList = entry.getValue();
+ stream.writeInt(tableList.size());
+
+ for (DatabaseScopedDataPartitionTable table : tableList) {
+ try (final PublicBAOS publicBAOS = new PublicBAOS();
+ final DataOutputStream tmpStream = new
DataOutputStream(publicBAOS)) {
+
+ TTransport transport = new TIOStreamTransport(tmpStream);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ table.serialize(tmpStream, protocol);
+
+ byte[] buf = publicBAOS.getBuf();
+ int size = publicBAOS.size();
+ ReadWriteIOUtils.write(size, stream);
+ stream.write(buf, 0, size);
+ } catch (IOException | TException e) {
+ LOG.error(
+ "[DataPartitionIntegrity] {} serialize failed for dataNodeId:
{}",
+ this.getClass().getSimpleName(),
+ entry.getKey(),
+ e);
+ throw new IOException("Failed to serialize dataPartitionTables", e);
+ }
+ }
+ }
+
+ stream.writeInt(databasesWithLostDataPartition.size());
+ for (String database : databasesWithLostDataPartition) {
+ ReadWriteIOUtils.write(database, stream);
+ }
+
+ if (finalDataPartitionTables != null &&
!finalDataPartitionTables.isEmpty()) {
+ stream.writeInt(finalDataPartitionTables.size());
+
+ for (Map.Entry<String, DataPartitionTable> entry :
finalDataPartitionTables.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+
+ try (final PublicBAOS publicBAOS = new PublicBAOS();
+ final DataOutputStream tmpStream = new
DataOutputStream(publicBAOS)) {
+ TTransport transport = new TIOStreamTransport(tmpStream);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ entry.getValue().serialize(tmpStream, protocol);
+
+ byte[] buf = publicBAOS.getBuf();
+ int size = publicBAOS.size();
+ ReadWriteIOUtils.write(size, stream);
+ stream.write(buf, 0, size);
+ } catch (IOException | TException e) {
+ LOG.error(
+ "[DataPartitionIntegrity] {} serialize finalDataPartitionTables
failed",
+ this.getClass().getSimpleName(),
+ e);
+ throw new IOException("Failed to serialize
finalDataPartitionTables", e);
+ }
+ }
+ } else {
+ stream.writeInt(0);
+ }
+
+ stream.writeInt(skipDataNodes.size());
+ for (TDataNodeConfiguration skipDataNode : skipDataNodes) {
+ try (final PublicBAOS publicBAOS = new PublicBAOS();
+ final DataOutputStream tmpStream = new DataOutputStream(publicBAOS))
{
+ TTransport transport = new TIOStreamTransport(tmpStream);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ skipDataNode.write(protocol);
+
+ byte[] buf = publicBAOS.getBuf();
+ int size = publicBAOS.size();
+ ReadWriteIOUtils.write(size, stream);
+ stream.write(buf, 0, size);
+ } catch (TException e) {
+ LOG.error("[DataPartitionIntegrity] Failed to serialize skipDataNode",
e);
+ throw new IOException("Failed to serialize skipDataNode", e);
+ }
+ }
+
+ stream.writeInt(failedDataNodes.size());
+ for (TDataNodeConfiguration failedDataNode : failedDataNodes) {
+ try (final PublicBAOS publicBAOS = new PublicBAOS();
+ final DataOutputStream tmpStream = new DataOutputStream(publicBAOS))
{
+ TTransport transport = new TIOStreamTransport(tmpStream);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ failedDataNode.write(protocol);
+
+ byte[] buf = publicBAOS.getBuf();
+ int size = publicBAOS.size();
+ ReadWriteIOUtils.write(size, stream);
+ stream.write(buf, 0, size);
+ } catch (TException e) {
+ LOG.error("[DataPartitionIntegrity] Failed to serialize
failedDataNode", e);
+ throw new IOException("Failed to serialize failedDataNode", e);
+ }
+ }
+ }
+
+ @Override
+ public void deserialize(final ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+
+ // Deserialize earliestTimeslots
+ int earliestTimeslotsSize = byteBuffer.getInt();
+ earliestTimeslots = new ConcurrentHashMap<>();
+ for (int i = 0; i < earliestTimeslotsSize; i++) {
+ String database = ReadWriteIOUtils.readString(byteBuffer);
+ long timeslot = byteBuffer.getLong();
+ earliestTimeslots.put(database, timeslot);
+ }
+
+ // Deserialize dataPartitionTables count
+ int dataPartitionTablesSize = byteBuffer.getInt();
+ dataPartitionTables = new ConcurrentHashMap<>();
+ for (int i = 0; i < dataPartitionTablesSize; i++) {
+ int dataNodeId = byteBuffer.getInt();
+ int listSize = byteBuffer.getInt();
+
+ List<DatabaseScopedDataPartitionTable> tableList = new
ArrayList<>(listSize);
+
+ for (int j = 0; j < listSize; j++) {
+ int dataSize = byteBuffer.getInt();
+ byte[] bytes = new byte[dataSize];
+ byteBuffer.get(bytes);
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais)) {
+
+ TTransport transport = new TIOStreamTransport(dis);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ DatabaseScopedDataPartitionTable table =
+ DatabaseScopedDataPartitionTable.deserialize(dis, protocol);
+ tableList.add(table);
+
+ } catch (IOException | TException e) {
+ LOG.error(
+ "[DataPartitionIntegrity] {} deserialize failed for dataNodeId:
{}",
+ this.getClass().getSimpleName(),
+ dataNodeId,
+ e);
+ throw new RuntimeException("Failed to deserialize
dataPartitionTables", e);
+ }
+ }
+
+ dataPartitionTables.put(dataNodeId, tableList);
+ }
+
+ int databasesWithLostDataPartitionSize = byteBuffer.getInt();
+ for (int i = 0; i < databasesWithLostDataPartitionSize; i++) {
+ String database = ReadWriteIOUtils.readString(byteBuffer);
+ databasesWithLostDataPartition.add(database);
+ }
+
+ // Deserialize finalDataPartitionTable size
+ int finalDataPartitionTablesSize = byteBuffer.getInt();
+ finalDataPartitionTables = new ConcurrentHashMap<>();
+
+ for (int i = 0; i < finalDataPartitionTablesSize; i++) {
+ String database = ReadWriteIOUtils.readString(byteBuffer);
+
+ int dataSize = byteBuffer.getInt();
+ byte[] bytes = new byte[dataSize];
+ byteBuffer.get(bytes);
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais)) {
+
+ TTransport transport = new TIOStreamTransport(dis);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ DataPartitionTable dataPartitionTable = new DataPartitionTable();
+ dataPartitionTable.deserialize(dis, protocol);
+
+ finalDataPartitionTables.put(database, dataPartitionTable);
+
+ } catch (IOException | TException e) {
+ LOG.error(
+ "[DataPartitionIntegrity] {} deserialize finalDataPartitionTables
failed",
+ this.getClass().getSimpleName(),
+ e);
+ throw new RuntimeException("Failed to deserialize
finalDataPartitionTables", e);
+ }
+ }
+
+ skipDataNodes = new HashSet<>();
+ int skipDataNodesSize = byteBuffer.getInt();
+ for (int i = 0; i < skipDataNodesSize; i++) {
+ int size = byteBuffer.getInt();
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ TTransport transport = new TIOStreamTransport(bais);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ TDataNodeConfiguration dataNode = new TDataNodeConfiguration();
+ dataNode.read(protocol);
+ skipDataNodes.add(dataNode);
+ } catch (TException | IOException e) {
+ LOG.error("[DataPartitionIntegrity] Failed to deserialize
skipDataNode", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ failedDataNodes = new HashSet<>();
+ int failedDataNodesSize = byteBuffer.getInt();
+ for (int i = 0; i < failedDataNodesSize; i++) {
+ int size = byteBuffer.getInt();
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ TTransport transport = new TIOStreamTransport(bais);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ TDataNodeConfiguration dataNode = new TDataNodeConfiguration();
+ dataNode.read(protocol);
+ failedDataNodes.add(dataNode);
+ } catch (TException | IOException e) {
+ LOG.error("[DataPartitionIntegrity] Failed to deserialize
failedDataNode", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private List<DatabaseScopedDataPartitionTable>
deserializeDatabaseScopedTableList(
+ List<ByteBuffer> dataList) {
+ if (dataList == null || dataList.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<DatabaseScopedDataPartitionTable> result = new
ArrayList<>(dataList.size());
+
+ for (ByteBuffer data : dataList) {
+ if (data == null || data.remaining() == 0) {
+ LOG.warn("[DataPartitionIntegrity] Skipping empty ByteBuffer during
deserialization");
+ continue;
+ }
+
+ try {
+ DatabaseScopedDataPartitionTable table =
DatabaseScopedDataPartitionTable.deserialize(data);
+ result.add(table);
+ } catch (Exception e) {
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to deserialize
DatabaseScopedDataPartitionTable", e);
+ }
+ }
+
+ return result;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java
similarity index 55%
copy from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
copy to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java
index 4055398ddb7..bf302db755b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java
@@ -17,37 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.confignode.client.sync;
+package org.apache.iotdb.confignode.procedure.state;
-public enum CnToDnSyncRequestType {
- // Node Maintenance
- CLEAN_DATA_NODE_CACHE,
- STOP_AND_CLEAR_DATA_NODE,
- SET_SYSTEM_STATUS,
- SHOW_CONFIGURATION,
- SHOW_APPLIED_CONFIGURATIONS,
-
- // Region Maintenance
- CREATE_DATA_REGION,
- CREATE_SCHEMA_REGION,
- DELETE_REGION,
- CREATE_NEW_REGION_PEER,
- ADD_REGION_PEER,
- REMOVE_REGION_PEER,
- DELETE_OLD_REGION_PEER,
- RESET_PEER_LIST,
-
- // PartitionCache
- INVALIDATE_PARTITION_CACHE,
- INVALIDATE_PERMISSION_CACHE,
- INVALIDATE_SCHEMA_CACHE,
-
- // Template
- UPDATE_TEMPLATE,
-
- // Schema
- KILL_QUERY_INSTANCE,
-
- // Table
- UPDATE_TABLE,
+public enum DataPartitionTableIntegrityCheckProcedureState {
+ /** Collect earliest timeslot information from all DataNodes */
+ COLLECT_EARLIEST_TIMESLOTS,
+ /** Analyze missing data partitions */
+ ANALYZE_MISSING_PARTITIONS,
+ /** Request DataPartitionTable generation from DataNodes */
+ REQUEST_PARTITION_TABLES,
+ /** Round robin get DataPartitionTable generation result from DataNodes */
+ REQUEST_PARTITION_TABLES_HEART_BEAT,
+ /** Merge DataPartitionTables from all DataNodes */
+ MERGE_PARTITION_TABLES,
+ /** Write final DataPartitionTable to raft log */
+ WRITE_PARTITION_TABLE_TO_CONSENSUS
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index dd155586087..140fffa852c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
@@ -404,6 +405,9 @@ public class ProcedureFactory implements IProcedureFactory {
case ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE:
procedure = new AddNeverFinishSubProcedureProcedure();
break;
+ case DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE:
+ procedure = new DataPartitionTableIntegrityCheckProcedure();
+ break;
default:
LOGGER.error("Unknown Procedure type: {}", typeCode);
throw new IOException("Unknown Procedure type: " + typeCode);
@@ -554,6 +558,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.NEVER_FINISH_PROCEDURE;
} else if (procedure instanceof AddNeverFinishSubProcedureProcedure) {
return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE;
+ } else if (procedure instanceof DataPartitionTableIntegrityCheckProcedure)
{
+ return ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE;
}
throw new UnsupportedOperationException(
"Procedure type " + procedure.getClass() + " is not supported");
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 820a90f7ebf..839c8ace098 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -172,7 +172,10 @@ public enum ProcedureType {
@TestOnly
NEVER_FINISH_PROCEDURE((short) 30000),
@TestOnly
- ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001);
+ ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001),
+
+ /** Data Partition Table Integrity Check */
+ DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE((short) 1600);
private final short typeCode;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index f20f77095d9..3a6c93b12ec 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.ServerCommandLine;
import org.apache.iotdb.commons.client.ClientManagerMetrics;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadModule;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
@@ -79,6 +81,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean {
@@ -110,6 +115,11 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
private int exitStatusCode = 0;
+ private Future<Void> dataPartitionTableCheckFuture;
+
+ private ExecutorService dataPartitionTableCheckExecutor =
+
IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK");
+
public ConfigNode() {
super("ConfigNode");
// We do not init anything here, so that we can re-initialize the instance
in IT.
@@ -147,6 +157,15 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
}
active();
LOGGER.info("IoTDB started");
+ if (dataPartitionTableCheckFuture != null) {
+ try {
+ dataPartitionTableCheckFuture.get();
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.error("Data partition table check task execute failed", e);
+ } finally {
+ dataPartitionTableCheckExecutor.shutdownNow();
+ }
+ }
}
@Override
@@ -175,7 +194,7 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
int configNodeId = CONF.getConfigNodeId();
configManager.initConsensusManager();
upgrade();
- waitForLeaderElected();
+ TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to
ensure
// that the external service is not provided until ConfigNode is fully
available
@@ -203,6 +222,40 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
}
loadSecretKey();
loadHardwareCode();
+
+ /* After the ConfigNode leader election, a leader switch may occur,
which could cause the procedure not to be created. This can happen if the
original leader has not yet executed the procedure creation, while the other
followers have already finished starting up. Therefore, having the original
leader (before the leader switch) initiate the process ensures that only one
procedure will be created. */
+ if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
+ dataPartitionTableCheckFuture =
+ dataPartitionTableCheckExecutor.submit(
+ () -> {
+ LOGGER.info(
+ "[DataPartitionIntegrity] Prepare to start
dataPartitionTableIntegrityCheck after all datanodes started up");
+
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
+
+ while (true) {
+ List<Integer> dnList =
+ configManager
+ .getLoadManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running);
+ if (dnList != null && !dnList.isEmpty()) {
+ LOGGER.info("Starting
dataPartitionTableIntegrityCheck...");
+ TSStatus status =
+
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
+ if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.error(
+ "Data partition table integrity check failed!
Current status code is {}, status message is {}",
+ status.getCode(),
+ status.getMessage());
+ }
+ break;
+ } else {
+ LOGGER.info("No running datanodes found, waiting...");
+ Thread.sleep(5000);
+ }
+ }
+ return null;
+ });
+ }
return;
} else {
saveSecretKey();
@@ -469,7 +522,7 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
return new ConfigNodeRPCServiceProcessor(configManager);
}
- private void waitForLeaderElected() {
+ private TConfigNodeLocation waitForLeaderElected() {
while (!configManager.getConsensusManager().isLeaderExist()) {
LOGGER.info("Leader has not been elected yet, wait for 1 second");
try {
@@ -479,6 +532,7 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
LOGGER.warn("Unexpected interruption during waiting for leader
election.");
}
}
+ return configManager.getConsensusManager().getLeaderLocation();
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 722e04edb30..69eab2d79cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1239,6 +1239,11 @@ public class IoTDBConfig {
private int maxSubTaskNumForInformationTableScan = 4;
+ /* Need use these parameters when repair data partition table */
+ private int partitionTableRecoverWorkerNum = 10;
+ // Rate limit set to 10 MB/s
+ private int partitionTableRecoverMaxReadMBsPerSecond = 10;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -4435,4 +4440,21 @@ public class IoTDBConfig {
public void setMaxSubTaskNumForInformationTableScan(int
maxSubTaskNumForInformationTableScan) {
this.maxSubTaskNumForInformationTableScan =
maxSubTaskNumForInformationTableScan;
}
+
+ public int getPartitionTableRecoverWorkerNum() {
+ return partitionTableRecoverWorkerNum;
+ }
+
+ public void setPartitionTableRecoverWorkerNum(int
partitionTableRecoverWorkerNum) {
+ this.partitionTableRecoverWorkerNum = partitionTableRecoverWorkerNum;
+ }
+
+ public int getPartitionTableRecoverMaxReadMBsPerSecond() {
+ return partitionTableRecoverMaxReadMBsPerSecond;
+ }
+
+ public void setPartitionTableRecoverMaxReadMBsPerSecond(
+ int partitionTableRecoverMaxReadMBsPerSecond) {
+ this.partitionTableRecoverMaxReadMBsPerSecond =
partitionTableRecoverMaxReadMBsPerSecond;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 588e121135c..5fad7697b93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1139,6 +1139,17 @@ public class IoTDBDescriptor {
// update trusted_uri_pattern
loadTrustedUriPattern(properties);
+ conf.setPartitionTableRecoverWorkerNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "partition_table_recover_worker_num",
+ String.valueOf(conf.getPartitionTableRecoverWorkerNum()))));
+ conf.setPartitionTableRecoverMaxReadMBsPerSecond(
+ Integer.parseInt(
+ properties.getProperty(
+ "partition_table_recover_max_read_megabytes_per_second",
+
String.valueOf(conf.getPartitionTableRecoverMaxReadMBsPerSecond()))));
+
conf.setIncludeNullValueInWriteThroughputMetric(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java
new file mode 100644
index 00000000000..1b43cb2bc54
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Generator for DataPartitionTable by scanning tsfile resources. This class
scans the data
+ * directory structure and builds a complete DataPartitionTable based on
existing tsfiles.
+ */
+public class DataPartitionTableGenerator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataPartitionTableGenerator.class);
+
+ // Task status
+ private volatile TaskStatus status = TaskStatus.NOT_STARTED;
+ private volatile String errorMessage;
+ private Map<String, DataPartitionTable> databasePartitionTableMap = new
ConcurrentHashMap<>();
+
+ // Progress tracking
+ private final AtomicInteger processedTimePartitions = new AtomicInteger(0);
+ private final AtomicInteger failedTimePartitions = new AtomicInteger(0);
+ private long totalTimePartitions = 0;
+
+ // Configuration
+ private final ExecutorService executor;
+ private final Set<String> databases;
+ private final int seriesSlotNum;
+ private final String seriesPartitionExecutorClass;
+
+ private final RateLimiter limiter =
+ RateLimiter.create(
+ (long)
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getPartitionTableRecoverMaxReadMBsPerSecond()
+ * 1024
+ * 1024);
+
+ public static final Set<String> IGNORE_DATABASE =
+ new HashSet<String>() {
+ {
+ add("root.__audit");
+ add("root.__system");
+ }
+ };
+
+ public DataPartitionTableGenerator(
+ ExecutorService executor,
+ Set<String> databases,
+ int seriesSlotNum,
+ String seriesPartitionExecutorClass) {
+ this.executor = executor;
+ this.databases = databases;
+ this.seriesSlotNum = seriesSlotNum;
+ this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
+ }
+
+ public Map<String, DataPartitionTable> getDatabasePartitionTableMap() {
+ return databasePartitionTableMap;
+ }
+
+ public enum TaskStatus {
+ NOT_STARTED,
+ IN_PROGRESS,
+ COMPLETED,
+ FAILED
+ }
+
+ /** Start generating DataPartitionTable asynchronously. */
+ public CompletableFuture<Void> startGeneration() {
+ if (status != TaskStatus.NOT_STARTED) {
+ throw new IllegalStateException("Task is already started or completed");
+ }
+
+ status = TaskStatus.IN_PROGRESS;
+ return
CompletableFuture.runAsync(this::generateDataPartitionTableByMemory);
+ }
+
+ private void generateDataPartitionTableByMemory() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ SeriesPartitionExecutor seriesPartitionExecutor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ seriesPartitionExecutorClass, seriesSlotNum);
+
+ try {
+ totalTimePartitions =
+ StorageEngine.getInstance().getAllDataRegions().stream()
+ .mapToLong(
+ dataRegion ->
+ (dataRegion == null)
+ ? 0
+ :
dataRegion.getTsFileManager().getTimePartitions().size())
+ .sum();
+ for (DataRegion dataRegion :
StorageEngine.getInstance().getAllDataRegions()) {
+ CompletableFuture<Void> regionFuture =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ TsFileManager tsFileManager =
dataRegion.getTsFileManager();
+ String databaseName = dataRegion.getDatabaseName();
+ if (!databases.contains(databaseName)
+ || IGNORE_DATABASE.contains(databaseName)) {
+ return;
+ }
+
+ Map<TSeriesPartitionSlot, SeriesPartitionTable>
dataPartitionMap =
+ new ConcurrentHashMap<>();
+
+ tsFileManager.readLock();
+ List<TsFileResource> seqTsFileList =
tsFileManager.getTsFileList(true);
+ List<TsFileResource> unseqTsFileList =
tsFileManager.getTsFileList(false);
+ tsFileManager.readUnlock();
+
+ constructDataPartitionMap(
+ seqTsFileList, seriesPartitionExecutor,
dataPartitionMap);
+ constructDataPartitionMap(
+ unseqTsFileList, seriesPartitionExecutor,
dataPartitionMap);
+
+ if (dataPartitionMap.isEmpty()) {
+ LOG.error("Failed to generate DataPartitionTable,
dataPartitionMap is empty");
+ status = TaskStatus.FAILED;
+ errorMessage = "DataPartitionMap is empty after
processing resource file";
+ return;
+ }
+
+ DataPartitionTable dataPartitionTable =
+ new DataPartitionTable(dataPartitionMap);
+
+ databasePartitionTableMap.compute(
+ databaseName,
+ (k, v) -> {
+ if (v == null) {
+ return new DataPartitionTable(dataPartitionMap);
+ }
+ v.merge(dataPartitionTable);
+ return v;
+ });
+ } catch (Exception e) {
+ LOG.error("Error processing data region: {}",
dataRegion.getDatabaseName(), e);
+ failedTimePartitions.incrementAndGet();
+ errorMessage = "Failed to process data region: " +
e.getMessage();
+ }
+ },
+ executor);
+ futures.add(regionFuture);
+ }
+
+ // Wait for all tasks to complete
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+
+ status = TaskStatus.COMPLETED;
+ LOG.info(
+ "DataPartitionTable generation completed successfully. Processed:
{}, Failed: {}",
+ processedTimePartitions.get(),
+ failedTimePartitions.get());
+ } catch (Exception e) {
+ LOG.error("Failed to generate DataPartitionTable", e);
+ status = TaskStatus.FAILED;
+ errorMessage = "Generation failed: " + e.getMessage();
+ }
+ }
+
+ private void constructDataPartitionMap(
+ List<TsFileResource> seqTsFileList,
+ SeriesPartitionExecutor seriesPartitionExecutor,
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> dataPartitionMap) {
+ Set<Long> timeSlotIds = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+ for (TsFileResource tsFileResource : seqTsFileList) {
+ long timeSlotId = tsFileResource.getTsFileID().timePartitionId;
+ try {
+ Set<IDeviceID> devices = tsFileResource.getDevices(limiter);
+ int regionId = tsFileResource.getTsFileID().regionId;
+
+ TConsensusGroupId consensusGroupId = new TConsensusGroupId();
+ consensusGroupId.setId(regionId);
+ consensusGroupId.setType(TConsensusGroupType.DataRegion);
+
+ for (IDeviceID deviceId : devices) {
+ TSeriesPartitionSlot seriesSlotId =
+ seriesPartitionExecutor.getSeriesPartitionSlot(deviceId);
+ TTimePartitionSlot timePartitionSlot =
+ new
TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId));
+ dataPartitionMap
+ .computeIfAbsent(
+ seriesSlotId, empty ->
newSeriesPartitionTable(consensusGroupId, timeSlotId))
+ .putDataPartition(timePartitionSlot, consensusGroupId);
+ }
+ if (!timeSlotIds.contains(timeSlotId)) {
+ timeSlotIds.add(timeSlotId);
+ processedTimePartitions.incrementAndGet();
+ }
+ } catch (Exception e) {
+ if (!timeSlotIds.contains(timeSlotId)) {
+ timeSlotIds.add(timeSlotId);
+ failedTimePartitions.incrementAndGet();
+ }
+ LOG.error("Failed to process tsfile {}, {}",
tsFileResource.getTsFileID(), e.getMessage());
+ }
+ }
+
+ timeSlotIds.clear();
+ }
+
+ private static SeriesPartitionTable newSeriesPartitionTable(
+ TConsensusGroupId consensusGroupId, long timeSlotId) {
+ SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
+ TTimePartitionSlot timePartitionSlot =
+ new
TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId));
+ seriesPartitionTable.putDataPartition(timePartitionSlot, consensusGroupId);
+ return seriesPartitionTable;
+ }
+
+ // Getters
+ public TaskStatus getStatus() {
+ return status;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public double getProgress() {
+ if (totalTimePartitions == 0) {
+ return 0.0;
+ }
+ return (double) (processedTimePartitions.get() +
failedTimePartitions.get())
+ / totalTimePartitions;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
index 9c44de9f5fd..881e823ef2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
@@ -55,7 +55,10 @@ public enum OperationType {
WRITE_AUDIT_LOG("writeAuditLog"),
PREPARE_STATEMENT("prepareStatement"),
EXECUTE_PREPARED_STATEMENT("executePreparedStatement"),
- DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement");
+ DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"),
+ GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"),
+ GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"),
+ CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus");
private final String name;
OperationType(String name) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 42929be7418..e2bfe4f6ad9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -61,8 +61,11 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
import org.apache.iotdb.commons.path.ExtendedPartialPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -102,6 +105,7 @@ import
org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.partition.DataPartitionTableGenerator;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import
org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager;
@@ -193,6 +197,7 @@ import org.apache.iotdb.db.service.RegionMigrateService;
import
org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -201,6 +206,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -260,6 +267,10 @@ import
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
+import
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateColumnCacheReq;
@@ -317,11 +328,16 @@ import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.external.commons.lang3.StringUtils;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
@@ -347,6 +363,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -370,7 +387,6 @@ import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
@@ -3117,4 +3133,300 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
public void handleClientExit() {
// Do nothing
}
+
+ // ====================================================
+ // Data Partition Table Integrity Check Implementation
+ // ====================================================
+
+ private volatile DataPartitionTableGenerator currentGenerator;
+ private volatile CompletableFuture<Void> currentGeneratorFuture;
+ private volatile long currentTaskId = 0;
+
+ @Override
+ public TGetEarliestTimeslotsResp getEarliestTimeslots() {
+ TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp();
+
+ try {
+ Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+ processDataRegionForEarliestTimeslots(earliestTimeslots);
+
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ resp.setDatabaseToEarliestTimeslot(earliestTimeslots);
+
+ LOGGER.info("Retrieved earliest timeslots for {} databases",
earliestTimeslots.size());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get earliest timeslots", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GET_EARLIEST_TIMESLOTS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableResp generateDataPartitionTable(
+ TGenerateDataPartitionTableReq req) {
+ TGenerateDataPartitionTableResp resp = new
TGenerateDataPartitionTableResp();
+
+ try {
+ // Check if there's already a task in the progress
+ if (currentGenerator != null
+ && currentGenerator.getStatus() ==
DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) {
+
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
+ resp.setMessage(
+ String.format(
+ "DataPartitionTable generation is already in the progress:
%.1f%%",
+ currentGenerator.getProgress() * 100));
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ // Create generator for all data directories
+ int seriesSlotNum =
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+ String seriesPartitionExecutorClass =
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass();
+
+ final ExecutorService partitionTableRecoverExecutor =
+ new WrappedThreadPoolExecutor(
+ 0,
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+ 0L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+ new
IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()),
+ ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ currentGenerator =
+ new DataPartitionTableGenerator(
+ partitionTableRecoverExecutor,
+ req.getDatabases(),
+ seriesSlotNum,
+ seriesPartitionExecutorClass);
+ currentTaskId = System.currentTimeMillis();
+
+ // Start generation synchronously for now to return the data partition
table immediately
+ currentGeneratorFuture = currentGenerator.startGeneration();
+ parseGenerationStatus(resp);
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate DataPartitionTable", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GENERATE_DATA_PARTITION_TABLE,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat() {
+ TGenerateDataPartitionTableHeartbeatResp resp = new
TGenerateDataPartitionTableHeartbeatResp();
+ // Must be lower than the RPC request timeout, in milliseconds
+ final long timeoutMs = 50000;
+ // Set default value
+ resp.setDatabaseScopedDataPartitionTables(Collections.emptyList());
+ try {
+ // To resolve this situation that the DataNode is registered and didn't
request
+ // generateDataPartitionTable interface yet.
+ if (currentGeneratorFuture == null || currentGenerator == null) {
+ resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+ resp.setMessage("No DataPartitionTable generation task found");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+
+ parseGenerationStatus(resp);
+ if
(currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED))
{
+ boolean success = false;
+ List<DatabaseScopedDataPartitionTable>
databaseScopedDataPartitionTableList =
+ new ArrayList<>();
+ Map<String, DataPartitionTable> dataPartitionTableMap =
+ currentGenerator.getDatabasePartitionTableMap();
+ if (!dataPartitionTableMap.isEmpty()) {
+ for (Map.Entry<String, DataPartitionTable> entry :
dataPartitionTableMap.entrySet()) {
+ String database = entry.getKey();
+ DataPartitionTable dataPartitionTable = entry.getValue();
+ if (!StringUtils.isEmpty(database) && dataPartitionTable != null) {
+ DatabaseScopedDataPartitionTable
databaseScopedDataPartitionTable =
+ new DatabaseScopedDataPartitionTable(database,
dataPartitionTable);
+
databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable);
+ success = true;
+ }
+ }
+ }
+
+ if (success) {
+ List<ByteBuffer> result =
+
serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList);
+ resp.setDatabaseScopedDataPartitionTables(result);
+
+ // Clear current generator
+ currentGenerator = null;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to check DataPartitionTable generation status", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.CHECK_DATA_PARTITION_TABLE_STATUS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+ return resp;
+ }
+
+ private void parseGenerationStatus(Object resp) {
+ if (currentGenerator == null) {
+ return;
+ }
+
+ switch (currentGenerator.getStatus()) {
+ case IN_PROGRESS:
+ setResponseFields(
+ resp,
+ DataPartitionTableGeneratorState.IN_PROGRESS.getCode(),
+ String.format(
+ "DataPartitionTable generation in progress: %.1f%%",
+ currentGenerator.getProgress() * 100),
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ LOGGER.info(
+ String.format(
+ "DataPartitionTable generation with task ID: %s in progress:
%.1f%%",
+ currentTaskId, currentGenerator.getProgress() * 100));
+ break;
+ case COMPLETED:
+ setResponseFields(
+ resp,
+ DataPartitionTableGeneratorState.SUCCESS.getCode(),
+ "DataPartitionTable generation completed successfully",
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ LOGGER.info("DataPartitionTable generation completed with task ID:
{}", currentTaskId);
+ break;
+ case FAILED:
+ setResponseFields(
+ resp,
+ DataPartitionTableGeneratorState.FAILED.getCode(),
+ "DataPartitionTable generation failed: " +
currentGenerator.getErrorMessage(),
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ LOGGER.info("DataPartitionTable generation failed with task ID: {}",
currentTaskId);
+ break;
+ default:
+ setResponseFields(
+ resp,
+ DataPartitionTableGeneratorState.UNKNOWN.getCode(),
+ "Unknown task status: " + currentGenerator.getStatus(),
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ LOGGER.info("DataPartitionTable generation failed with task ID: {}",
currentTaskId);
+ break;
+ }
+ }
+
+ private void setResponseFields(Object resp, int errorCode, String message,
TSStatus status) {
+ if (resp instanceof TGenerateDataPartitionTableResp) {
+ ((TGenerateDataPartitionTableResp) resp).setErrorCode(errorCode);
+ ((TGenerateDataPartitionTableResp) resp).setMessage(message);
+ ((TGenerateDataPartitionTableResp) resp).setStatus(status);
+ } else if (resp instanceof TGenerateDataPartitionTableHeartbeatResp) {
+ ((TGenerateDataPartitionTableHeartbeatResp)
resp).setErrorCode(errorCode);
+ ((TGenerateDataPartitionTableHeartbeatResp) resp).setMessage(message);
+ ((TGenerateDataPartitionTableHeartbeatResp) resp).setStatus(status);
+ }
+ }
+
+ /**
+ * Scan the seq and unseq directory on every data region, then compute the
earliest time slot id
+ * of database
+ */
+ private void processDataRegionForEarliestTimeslots(Map<String, Long>
earliestTimeslots) {
+ final Set<String> ignoreDatabase =
+ new HashSet<String>() {
+ {
+ add("root.__audit");
+ add("root.__system");
+ }
+ };
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ final ExecutorService findEarliestTimeSlotExecutor =
+ new WrappedThreadPoolExecutor(
+ 0,
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+ 0L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+ new
IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()),
+ ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ for (DataRegion dataRegion :
StorageEngine.getInstance().getAllDataRegions()) {
+ CompletableFuture<Void> regionFuture =
+ CompletableFuture.runAsync(
+ () -> {
+ TsFileManager tsFileManager = dataRegion.getTsFileManager();
+ String databaseName = dataRegion.getDatabaseName();
+ if (ignoreDatabase.contains(databaseName)) {
+ return;
+ }
+
+ Set<Long> timePartitionIds = tsFileManager.getTimePartitions();
+ final long earliestTimeSlotId =
Collections.min(timePartitionIds);
+ earliestTimeslots.compute(
+ databaseName,
+ (k, v) -> v == null ? earliestTimeSlotId :
Math.min(earliestTimeSlotId, v));
+ },
+ findEarliestTimeSlotExecutor);
+ futures.add(regionFuture);
+ }
+
+ // Wait for all tasks to complete
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ LOGGER.info("Process data directory for earliestTimeslots completed
successfully");
+ }
+
+ private long findEarliestTimeslotInFiles(
+ List<TsFileResource> seqTsFileList, long earliestTimeSlotId) {
+ for (TsFileResource tsFileResource : seqTsFileList) {
+ long timeSlotId = tsFileResource.getTsFileID().timePartitionId;
+ earliestTimeSlotId =
+ earliestTimeSlotId == Long.MIN_VALUE
+ ? timeSlotId
+ : Math.min(earliestTimeSlotId, timeSlotId);
+ }
+
+ return earliestTimeSlotId;
+ }
+
+ private List<ByteBuffer> serializeDatabaseScopedTableList(
+ List<DatabaseScopedDataPartitionTable> list) {
+ if (list == null || list.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<ByteBuffer> result = new ArrayList<>(list.size());
+
+ for (DatabaseScopedDataPartitionTable table : list) {
+ try (PublicBAOS baos = new PublicBAOS();
+ DataOutputStream oos = new DataOutputStream(baos)) {
+ TTransport transport = new TIOStreamTransport(oos);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ table.serialize(oos, protocol);
+ result.add(ByteBuffer.wrap(baos.getBuf(), 0, baos.size()));
+ } catch (IOException | TException e) {
+ LOGGER.error(
+ "Failed to serialize DatabaseScopedDataPartitionTable for
database: {}",
+ table.getDatabase(),
+ e);
+ }
+ }
+
+ return result;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 61fbbf15935..3548a795404 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PlainDevice
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
@@ -677,6 +678,10 @@ public class TsFileResource implements PersistentResource,
Cloneable {
return timeIndex.getDevices(file.getPath(), this);
}
+ public Set<IDeviceID> getDevices(RateLimiter limiter) {
+ return timeIndex.getDevices(file.getPath(), this, limiter);
+ }
+
public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer
deserializer)
throws IOException {
readLock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
index fb29083a737..caca8e9fdba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.load.PartitionViolationException;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Deserializer;
import org.apache.tsfile.utils.FilePathUtils;
@@ -171,6 +172,12 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
return deviceToIndex.keySet();
}
+ @Override
+ public Set<IDeviceID> getDevices(
+ String tsFilePath, TsFileResource tsFileResource, RateLimiter limiter) {
+ return deviceToIndex.keySet();
+ }
+
public Map<IDeviceID, Integer> getDeviceToIndex() {
return deviceToIndex;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
index e4a812012a8..2cf89a02626 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java
@@ -21,10 +21,12 @@ package
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.load.PartitionViolationException;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.utils.FilePathUtils;
@@ -120,6 +122,45 @@ public class FileTimeIndex implements ITimeIndex {
}
}
+ @Override
+ public Set<IDeviceID> getDevices(
+ String tsFilePath, TsFileResource tsFileResource, RateLimiter limiter) {
+ tsFileResource.readLock();
+ try {
+ try (IOUtils.RatelimitedInputStream inputStream =
+ new IOUtils.RatelimitedInputStream(
+ FSFactoryProducer.getFSFactory()
+ .getBufferedInputStream(tsFilePath +
TsFileResource.RESOURCE_SUFFIX),
+ limiter)) {
+ // The first byte is VERSION_NUMBER, second byte is timeIndexType.
+ byte[] bytes = ReadWriteIOUtils.readBytes(inputStream, 2);
+
+ if (bytes[1] == ARRAY_DEVICE_TIME_INDEX_TYPE) {
+ return ArrayDeviceTimeIndex.getDevices(inputStream);
+ } else {
+ return PlainDeviceTimeIndex.getDevices(inputStream);
+ }
+ }
+ } catch (NoSuchFileException e) {
+ // deleted by ttl
+ if (tsFileResource.isDeleted()) {
+ return Collections.emptySet();
+ } else {
+ logger.error(
+ "Can't read file {} from disk ", tsFilePath +
TsFileResource.RESOURCE_SUFFIX, e);
+ throw new RuntimeException(
+ "Can't read file " + tsFilePath + TsFileResource.RESOURCE_SUFFIX +
" from disk");
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Failed to get devices from tsfile: {}", tsFilePath +
TsFileResource.RESOURCE_SUFFIX, e);
+ throw new RuntimeException(
+ "Failed to get devices from tsfile: " + tsFilePath +
TsFileResource.RESOURCE_SUFFIX);
+ } finally {
+ tsFileResource.readUnlock();
+ }
+ }
+
@Override
public boolean endTimeEmpty() {
return endTime == Long.MIN_VALUE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
index d705a2417d7..114a207d757 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.load.PartitionViolationException;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -74,6 +75,13 @@ public interface ITimeIndex {
*/
Set<IDeviceID> getDevices(String tsFilePath, TsFileResource tsFileResource);
+ /**
+ * get devices in TimeIndex and limit files reading rate
+ *
+ * @return device names
+ */
+ Set<IDeviceID> getDevices(String tsFilePath, TsFileResource tsFileResource,
RateLimiter limiter);
+
/**
* @return whether end time is empty (Long.MIN_VALUE)
*/
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index ed148b3a0b7..378a6226cbf 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -727,6 +727,24 @@ failure_detector_phi_acceptable_pause_in_ms=10000
# Datatype: double(percentage)
disk_space_warning_threshold=0.05
+# Purpose: for data partition repair
+# The number of threads used for parallel scanning in the partition table
recovery
+# effectiveMode: restart
+# Datatype: Integer
+partition_table_recover_worker_num=10
+
+# Purpose: for data partition repair
+# Limit the number of bytes read per second from a file, the unit is MB
+# effectiveMode: restart
+# Datatype: Integer
+partition_table_recover_max_read_megabytes_per_second=10
+
+# Purpose: for data partition repair
+# Set a timeout to wait for all datanodes complete startup, the unit is ms
+# effectiveMode: restart
+# Datatype: Integer
+partition_table_recover_wait_all_dn_up_timeout_ms=60000
+
####################
### Memory Control Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 05b9672c2aa..81f2aa7156c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -203,6 +203,8 @@ public enum ThreadName {
TABLE_SIZE_INDEX_RECORD("TableSizeIndexRecord"),
BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"),
BINARY_ALLOCATOR_AUTO_RELEASER("BinaryAllocator-Auto-Releaser"),
+ FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL("FindEarliestTimeSlot-Parallel-Pool"),
+ DATA_PARTITION_RECOVER_PARALLEL_POOL("DataPartitionRecover-Parallel-Pool"),
// the unknown thread name is used for metrics
UNKNOWN("UNKNOWN");
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java
similarity index 54%
copy from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java
index 4055398ddb7..93cca687799 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java
@@ -17,37 +17,36 @@
* under the License.
*/
-package org.apache.iotdb.confignode.client.sync;
+package org.apache.iotdb.commons.enums;
-public enum CnToDnSyncRequestType {
- // Node Maintenance
- CLEAN_DATA_NODE_CACHE,
- STOP_AND_CLEAR_DATA_NODE,
- SET_SYSTEM_STATUS,
- SHOW_CONFIGURATION,
- SHOW_APPLIED_CONFIGURATIONS,
+public enum DataPartitionTableGeneratorState {
+ SUCCESS(0),
+ FAILED(1),
+ IN_PROGRESS(2),
+ UNKNOWN(-1);
- // Region Maintenance
- CREATE_DATA_REGION,
- CREATE_SCHEMA_REGION,
- DELETE_REGION,
- CREATE_NEW_REGION_PEER,
- ADD_REGION_PEER,
- REMOVE_REGION_PEER,
- DELETE_OLD_REGION_PEER,
- RESET_PEER_LIST,
+ private final int code;
- // PartitionCache
- INVALIDATE_PARTITION_CACHE,
- INVALIDATE_PERMISSION_CACHE,
- INVALIDATE_SCHEMA_CACHE,
+ DataPartitionTableGeneratorState(int code) {
+ this.code = code;
+ }
- // Template
- UPDATE_TEMPLATE,
+ public int getCode() {
+ return code;
+ }
- // Schema
- KILL_QUERY_INSTANCE,
-
- // Table
- UPDATE_TABLE,
+ /**
+ * get DataPartitionTableGeneratorState by code
+ *
+ * @param code code
+ * @return DataPartitionTableGeneratorState
+ */
+ public static DataPartitionTableGeneratorState getStateByCode(int code) {
+ for (DataPartitionTableGeneratorState state :
DataPartitionTableGeneratorState.values()) {
+ if (state.code == code) {
+ return state;
+ }
+ }
+ return UNKNOWN;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 91346f0c69c..d154f1813e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -282,6 +282,48 @@ public class DataPartitionTable {
return removedTimePartitionSlots;
}
+ /**
+ * Merge a complete DataPartitionTable from the partition tables received
from multiple DataNodes
+ * (supports cross-database merging, which is exactly the logic implemented
in the current PR)
+ *
+ * @param sourceMap Map<databaseName, DataPartitionTableFromDN>
+ * @return The complete merged partition table
+ */
+ public DataPartitionTable merge(Map<Integer, DataPartitionTable> sourceMap) {
+ DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap);
+ for (DataPartitionTable table : sourceMap.values()) {
+ for (Map.Entry<TSeriesPartitionSlot, SeriesPartitionTable> entry :
+ table.dataPartitionMap.entrySet()) {
+ TSeriesPartitionSlot slot = entry.getKey();
+ SeriesPartitionTable seriesTable = entry.getValue();
+ merged
+ .dataPartitionMap
+ .computeIfAbsent(slot, k -> new SeriesPartitionTable())
+ .merge(seriesTable);
+ }
+ }
+ return merged;
+ }
+
+ /**
+ * Support single table merging Merge another DataPartitionTable into the
current object (used for
+ * incremental merging)
+ */
+ public DataPartitionTable merge(DataPartitionTable sourcePartitionTable) {
+ DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap);
+ if (sourcePartitionTable == null) {
+ return merged;
+ }
+ for (Map.Entry<TSeriesPartitionSlot, SeriesPartitionTable> entry :
+ sourcePartitionTable.dataPartitionMap.entrySet()) {
+ merged
+ .dataPartitionMap
+ .computeIfAbsent(entry.getKey(), k -> new SeriesPartitionTable())
+ .merge(entry.getValue());
+ }
+ return merged;
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java
new file mode 100644
index 00000000000..a47f4024eac
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class DatabaseScopedDataPartitionTable {
+ private final String database;
+ private DataPartitionTable dataPartitionTable;
+
+ public DatabaseScopedDataPartitionTable(String database, DataPartitionTable
dataPartitionTable) {
+ this.database = database;
+ this.dataPartitionTable = dataPartitionTable;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public DataPartitionTable getDataPartitionTable() {
+ return dataPartitionTable;
+ }
+
+ public void serialize(OutputStream outputStream, TProtocol protocol)
+ throws IOException, TException {
+ ReadWriteIOUtils.write(database, outputStream);
+
+ ReadWriteIOUtils.write(dataPartitionTable != null, outputStream);
+
+ if (dataPartitionTable != null) {
+ dataPartitionTable.serialize(outputStream, protocol);
+ }
+ }
+
+ public static DatabaseScopedDataPartitionTable deserialize(ByteBuffer
buffer) {
+ String database = ReadWriteIOUtils.readString(buffer);
+
+ boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(buffer);
+
+ DataPartitionTable dataPartitionTable = null;
+ if (hasDataPartitionTable) {
+ dataPartitionTable = new DataPartitionTable();
+ dataPartitionTable.deserialize(buffer);
+ }
+
+ return new DatabaseScopedDataPartitionTable(database, dataPartitionTable);
+ }
+
+ public static DatabaseScopedDataPartitionTable deserialize(
+ InputStream inputStream, TProtocol protocol) throws IOException,
TException {
+ String database = ReadWriteIOUtils.readString(inputStream);
+
+ boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(inputStream);
+
+ DataPartitionTable dataPartitionTable = null;
+ if (hasDataPartitionTable) {
+ dataPartitionTable = new DataPartitionTable();
+ dataPartitionTable.deserialize(inputStream, protocol);
+ }
+
+ return new DatabaseScopedDataPartitionTable(database, dataPartitionTable);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DatabaseScopedDataPartitionTable that = (DatabaseScopedDataPartitionTable)
o;
+ return Objects.equals(database, that.database)
+ && Objects.equals(dataPartitionTable, that.dataPartitionTable);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, dataPartitionTable);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index f46344566dc..915e3df4e32 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -73,7 +73,13 @@ public class SeriesPartitionTable {
}
public void putDataPartition(TTimePartitionSlot timePartitionSlot,
TConsensusGroupId groupId) {
- seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
Vector<>()).add(groupId);
+ List<TConsensusGroupId> groupList =
+ seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
Vector<>());
+ synchronized (groupList) {
+ if (!groupList.contains(groupId)) {
+ groupList.add(groupId);
+ }
+ }
}
/**
@@ -270,6 +276,14 @@ public class SeriesPartitionTable {
return removedTimePartitions;
}
+ public void merge(SeriesPartitionTable sourceMap) {
+ if (sourceMap == null) return;
+ sourceMap.seriesPartitionMap.forEach(
+ (timeSlot, groups) -> {
+ this.seriesPartitionMap.computeIfAbsent(timeSlot, k -> new
ArrayList<>()).addAll(groups);
+ });
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
index 8b63d29bd78..d234c6be305 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
@@ -25,10 +25,12 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.RateLimiter;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -289,4 +291,37 @@ public class IOUtils {
}
return Optional.empty();
}
+
+ public static class RatelimitedInputStream extends InputStream {
+ private RateLimiter rateLimiter;
+ private InputStream inner;
+
+ public RatelimitedInputStream(InputStream inner, RateLimiter limiter) {
+ this.inner = inner;
+ this.rateLimiter = limiter;
+ }
+
+ @Override
+ public int read() throws IOException {
+ rateLimiter.acquire(1);
+ return inner.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ rateLimiter.acquire(b.length);
+ return inner.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ rateLimiter.acquire(len);
+ return inner.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inner.close();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
index eb53cdb2798..250a347d149 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
@@ -122,6 +122,10 @@ public class TimePartitionUtils {
return partitionId.longValue();
}
+ public static long getStartTimeByPartitionId(long partitionId) {
+ return (partitionId * timePartitionInterval) + timePartitionOrigin;
+ }
+
public static boolean satisfyPartitionId(long startTime, long endTime, long
partitionId) {
long startPartition =
originMayCauseOverflow
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 07f5a822e89..8c3e12217e0 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -678,6 +678,36 @@ struct TAuditLogReq {
11: required i32 cnId
}
+/**
+* BEGIN: Data Partition Table Integrity Check Structures
+**/
+
+struct TGetEarliestTimeslotsResp {
+ 1: required common.TSStatus status
+ 2: optional map<string, i64> databaseToEarliestTimeslot
+}
+
+struct TGenerateDataPartitionTableReq {
+ 1: required set<string> databases
+}
+
+struct TGenerateDataPartitionTableResp {
+ 1: required common.TSStatus status
+ 2: required i32 errorCode
+ 3: optional string message
+}
+
+struct TGenerateDataPartitionTableHeartbeatResp {
+ 1: required common.TSStatus status
+ 2: required i32 errorCode
+ 3: optional string message
+ 4: optional list<binary> databaseScopedDataPartitionTables
+}
+
+/**
+* END: Data Partition Table Integrity Check Structures
+**/
+
/**
* BEGIN: Used for EXPLAIN ANALYZE
**/
@@ -1281,6 +1311,30 @@ service IDataNodeRPCService {
* Write an audit log entry to the DataNode's AuditEventLogger
*/
common.TSStatus writeAuditLog(TAuditLogReq req);
+
+ /**
+ * BEGIN: Data Partition Table Integrity Check
+ **/
+
+ /**
+ * Get earliest timeslot information from DataNode
+ * Returns map of database name to earliest timeslot id
+ */
+ TGetEarliestTimeslotsResp getEarliestTimeslots()
+
+ /**
+ * Request DataNode to generate DataPartitionTable by scanning tsfile
resources
+ */
+ TGenerateDataPartitionTableResp
generateDataPartitionTable(TGenerateDataPartitionTableReq req)
+
+ /**
+ * Check the status of DataPartitionTable generation task
+ */
+ TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat()
+
+ /**
+ * END: Data Partition Table Integrity Check
+ **/
}
service MPPDataExchangeService {