This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a2806355c03a4d22758907379aac0f4c6c01992d Author: Caideyipi <[email protected]> AuthorDate: Thu Nov 27 09:48:57 2025 +0800 Fixed the concurrency issue of region migrate and load (#16796) * rq * gra * fix * fix * coverage * fix * fix * fix (cherry picked from commit 03bc2606563872e07d3ba6ef5c59d7a80a74b53f) --- .../db/consensus/DataRegionConsensusImpl.java | 6 + .../dataregion/DataRegionStateMachine.java | 15 +- .../fragment/FragmentInstanceContext.java | 2 +- .../fragment/FragmentInstanceExecution.java | 2 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 6 +- .../iotdb/db/service/metrics/WritingMetrics.java | 3 +- .../iotdb/db/storageengine/StorageEngine.java | 13 +- .../db/storageengine/dataregion/DataRegion.java | 166 ++++++++++++--------- .../dataregion/IDataRegionForQuery.java | 2 +- .../dataregion/VirtualDataRegion.java | 2 +- .../compaction/repair/RepairTimePartition.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 18 +-- .../dataregion/snapshot/SnapshotTaker.java | 14 +- .../db/storageengine/load/LoadTsFileManager.java | 10 +- .../DataNodeInternalRPCServiceImplTest.java | 59 +++++++- .../iotdb/db/storageengine/StorageEngineTest.java | 4 +- .../storageengine/dataregion/DataRegionTest.java | 4 +- .../repair/RepairUnsortedFileSchedulerTest.java | 10 +- .../dataregion/snapshot/IoTDBSnapshotTest.java | 2 +- 19 files changed, 218 insertions(+), 122 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 2649f8de7af..700fd79e5eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.config.ConsensusConfig; @@ -65,6 +66,11 @@ public class DataRegionConsensusImpl { // do nothing } + @TestOnly + public static void setInstance(final IConsensus instance) { + DataRegionConsensusImplHolder.INSTANCE = instance; + } + public static IConsensus getInstance() { return DataRegionConsensusImplHolder.INSTANCE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index cfee2b54a6e..5fa375406b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -93,7 +93,7 @@ public class DataRegionStateMachine extends BaseStateMachine { logger.error( "Exception occurs when taking snapshot for {}-{} in {}", region.getDatabaseName(), - region.getDataRegionId(), + region.getDataRegionIdString(), snapshotDir, e); return false; @@ -109,7 +109,7 @@ public class DataRegionStateMachine extends BaseStateMachine { logger.error( "Exception occurs when taking snapshot for {}-{} in {}", region.getDatabaseName(), - region.getDataRegionId(), + region.getDataRegionIdString(), snapshotDir, e); return false; @@ -127,7 +127,7 @@ public class DataRegionStateMachine extends BaseStateMachine { new SnapshotLoader( latestSnapshotRootDir.getAbsolutePath(), region.getDatabaseName(), - region.getDataRegionId()) + region.getDataRegionIdString()) .loadSnapshotForStateMachine(); if (newRegion == null) { logger.error("Fail to load snapshot from {}", latestSnapshotRootDir); @@ -136,7 +136,8 @@ public class DataRegionStateMachine extends BaseStateMachine { this.region = newRegion; try { StorageEngine.getInstance() - .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region); + .setDataRegion( + new DataRegionId(Integer.parseInt(region.getDataRegionIdString())), region); ChunkCache.getInstance().clear(); TimeSeriesMetadataCache.getInstance().clear(); BloomFilterCache.getInstance().clear(); @@ -185,13 +186,13 @@ public class DataRegionStateMachine extends BaseStateMachine { return new SnapshotLoader( latestSnapshotRootDir.getAbsolutePath(), region.getDatabaseName(), - region.getDataRegionId()) + region.getDataRegionIdString()) .getSnapshotFileInfo(); } catch (IOException e) { logger.error( "Meets error when getting snapshot files for {}-{}", region.getDatabaseName(), - region.getDataRegionId(), + region.getDataRegionIdString(), e); return null; } @@ -276,7 +277,7 @@ public class DataRegionStateMachine extends BaseStateMachine { + File.separator + region.getDatabaseName() + "-" - + region.getDataRegionId(); + + region.getDataRegionIdString(); return new File(snapshotDir).getCanonicalFile(); } catch (IOException | NullPointerException e) { logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index e5bd68906cf..3fb3d30be4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -779,7 +779,7 @@ public class FragmentInstanceContext extends QueryContext { if (initQueryDataSourceRetryCount % 10 == 0) { LOGGER.warn( "Failed to acquire the read lock of DataRegion-{} for {} times", - dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(), + dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionIdString(), initQueryDataSourceRetryCount); } return UNFINISHED_QUERY_DATA_SOURCE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index a3d7a887352..08991ae7f7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -152,7 +152,7 @@ public class FragmentInstanceExecution { // We don't need to output the region having ExplainAnalyzeOperator only. return false; } - statistics.setDataRegion(context.getDataRegion().getDataRegionId()); + statistics.setDataRegion(context.getDataRegion().getDataRegionIdString()); statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort()); statistics.setStartTimeInMS(context.getStartTime()); statistics.setEndTimeInMS( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 7709e20b557..cf7ab8faedd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -531,7 +531,7 @@ public class LoadTsFileScheduler implements IScheduler { MemTableFlushTask.recordFlushPointsMetricInternal( node.getWritePointCount(), databaseName, - dataRegion.getDataRegionId()); + dataRegion.getDataRegionIdString()); MetricService.getInstance() .count( @@ -543,7 +543,7 @@ public class LoadTsFileScheduler implements IScheduler { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); MetricService.getInstance() @@ -556,7 +556,7 @@ public class LoadTsFileScheduler implements IScheduler { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); })); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index 15686b89546..c6251c4e6c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -520,7 +520,8 @@ public class WritingMetrics implements IMetricSet { } public void createDataRegionMemoryCostMetrics(DataRegion dataRegion) { - DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(dataRegion.getDataRegionId())); + DataRegionId dataRegionId = + new DataRegionId(Integer.parseInt(dataRegion.getDataRegionIdString())); MetricService.getInstance() .createAutoGauge( Metric.DATA_REGION_MEM_COST.toString(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 89e1f0c7c98..43e76e5a2b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -541,7 +541,7 @@ public class StorageEngine implements IService { public void syncCloseProcessorsInRegion(List<String> dataRegionIds) { List<Future<Void>> tasks = new ArrayList<>(); for (DataRegion dataRegion : dataRegionMap.values()) { - if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionId())) { + if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionIdString())) { tasks.add( cachedThreadPool.submit( () -> { @@ -797,7 +797,9 @@ public class StorageEngine implements IService { // delete wal WALManager.getInstance() .deleteWALNode( - region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId()); + region.getDatabaseName() + + FILE_NAME_SEPARATOR + + region.getDataRegionIdString()); // delete snapshot for (String dataDir : CONFIG.getLocalDataDirs()) { File regionSnapshotDir = @@ -817,7 +819,7 @@ public class StorageEngine implements IService { // delete region information in wal and may delete wal WALManager.getInstance() .deleteRegionAndMayDeleteWALNode( - region.getDatabaseName(), region.getDataRegionId()); + region.getDatabaseName(), region.getDataRegionIdString()); break; case ConsensusFactory.RATIS_CONSENSUS: default: @@ -826,14 +828,15 @@ public class StorageEngine implements IService { WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId); WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId); WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId); - FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionId()); + FileMetrics.getInstance() + .deleteRegion(region.getDatabaseName(), region.getDataRegionIdString()); CompressionRatio.getInstance().removeDataRegionRatio(String.valueOf(regionId.getId())); LOGGER.info("Removed data region {}", regionId); } catch (Exception e) { LOGGER.error( "Error occurs when deleting data region {}-{}", region.getDatabaseName(), - region.getDataRegionId(), + region.getDataRegionIdString(), e); } finally { deletingDataRegionMap.remove(regionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index f9c34f5c3dc..d615aae6cd1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -46,8 +46,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TDescTableResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTableResp; import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.iot.IoTConsensus; +import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; @@ -297,7 +300,9 @@ public class DataRegion implements IDataRegionForQuery { ConcurrentHashMap.newKeySet(); /** data region id. */ - private final String dataRegionId; + private final String dataRegionIdString; + + private final DataRegionId dataRegionId; /** database name. */ private final String databaseName; @@ -367,20 +372,25 @@ public class DataRegion implements IDataRegionForQuery { * Construct a database processor. * * @param systemDir system dir path - * @param dataRegionId data region id e.g. 1 + * @param dataRegionIdString data region id e.g. 1 * @param fileFlushPolicy file flush policy * @param databaseName database name e.g. root.sg1 */ public DataRegion( - String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName) + String systemDir, + String dataRegionIdString, + TsFileFlushPolicy fileFlushPolicy, + String databaseName) throws DataRegionException { - this.dataRegionId = dataRegionId; + this.dataRegionIdString = dataRegionIdString; + this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString)); this.databaseName = databaseName; this.fileFlushPolicy = fileFlushPolicy; acquireDirectBufferMemory(); - dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); - this.tsFileManager = new TsFileManager(databaseName, dataRegionId, dataRegionSysDir.getPath()); + dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionIdString); + this.tsFileManager = + new TsFileManager(databaseName, dataRegionIdString, dataRegionSysDir.getPath()); if (dataRegionSysDir.mkdirs()) { logger.info( "Database system Directory {} doesn't exist, create it", dataRegionSysDir.getPath()); @@ -391,7 +401,7 @@ public class DataRegion implements IDataRegionForQuery { lastFlushTimeMap = new HashLastFlushTimeMap(); upgradeModFileThreadPool = IoTDBThreadPoolFactory.newSingleThreadExecutor( - databaseName + "-" + dataRegionId + "-UpgradeMod"); + databaseName + "-" + dataRegionIdString + "-UpgradeMod"); // recover tsfiles unless consensus protocol is ratis and storage engine is not ready if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS) @@ -399,17 +409,17 @@ public class DataRegion implements IDataRegionForQuery { logger.debug( "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.", databaseName, - dataRegionId); + dataRegionIdString); for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) { File dataRegionFolder = - fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId); + fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionIdString); try { fsFactory.deleteDirectory(dataRegionFolder.getPath()); } catch (IOException e) { logger.error( "Exception occurs when deleting data region folder for {}-{}", databaseName, - dataRegionId, + dataRegionIdString, e); } if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) { @@ -433,10 +443,11 @@ public class DataRegion implements IDataRegionForQuery { } @TestOnly - public DataRegion(String databaseName, String id) { + public DataRegion(String databaseName, String dataRegionIdString) { this.databaseName = databaseName; - this.dataRegionId = id; - this.tsFileManager = new TsFileManager(databaseName, id, ""); + this.dataRegionIdString = dataRegionIdString; + this.dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionIdString)); + this.tsFileManager = new TsFileManager(databaseName, dataRegionIdString, ""); this.partitionMaxFileVersions = new HashMap<>(); partitionMaxFileVersions.put(0L, 0L); upgradeModFileThreadPool = null; @@ -517,7 +528,7 @@ public class DataRegion implements IDataRegionForQuery { logger.info( "The TsFiles of data region {}[{}] has recovered {}/{}.", databaseName, - dataRegionId, + dataRegionIdString, recoveredFilesNum, numOfFilesToRecover); lastLogTime = System.currentTimeMillis(); @@ -526,7 +537,7 @@ public class DataRegion implements IDataRegionForQuery { logger.info( "The TsFiles of data region {}[{}] has recovered completely {}/{}.", databaseName, - dataRegionId, + dataRegionIdString, numOfFilesToRecover, numOfFilesToRecover); } @@ -642,7 +653,7 @@ public class DataRegion implements IDataRegionForQuery { if (logFile.exists()) { try { FileTimeIndexCacheReader logReader = - new FileTimeIndexCacheReader(logFile, dataRegionId); + new FileTimeIndexCacheReader(logFile, dataRegionIdString); logReader.read(fileTimeIndexMap); } catch (Exception e) { throw new RuntimeException(e); @@ -677,7 +688,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), latestPartitionId, false, Long.MAX_VALUE, @@ -728,14 +739,16 @@ public class DataRegion implements IDataRegionForQuery { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { IWALNode walNode = WALManager.getInstance() - .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId); + .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString); if (walNode instanceof WALNode) { walNode.setSafelyDeletedSearchIndex(Long.MAX_VALUE); } } - logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId); + logger.info( + "The data region {}[{}] is created successfully", databaseName, dataRegionIdString); } else { - logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId); + logger.info( + "The data region {}[{}] is recovered successfully", databaseName, dataRegionIdString); } } @@ -789,7 +802,7 @@ public class DataRegion implements IDataRegionForQuery { private void recoverCompaction() { CompactionRecoverManager compactionRecoverManager = - new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId); + new CompactionRecoverManager(tsFileManager, databaseName, dataRegionIdString); compactionRecoverManager.recoverCompaction(); } @@ -806,7 +819,8 @@ public class DataRegion implements IDataRegionForQuery { // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition Map<String, File> tsFilePartitionPath2File = new HashMap<>(); for (String baseDir : folders) { - File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId); + File fileFolder = + fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionIdString); if (!fileFolder.exists()) { continue; } @@ -876,7 +890,7 @@ public class DataRegion implements IDataRegionForQuery { String.format( "data region %s[%s] is down, because the time of tsfile %s is larger than system current time, " + "file time is %d while system current time is %d, please check it.", - databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime)); + databaseName, dataRegionIdString, tsFile.getAbsolutePath(), fileTime, currentTime)); } } @@ -919,10 +933,10 @@ public class DataRegion implements IDataRegionForQuery { long timePartitionId = tsFileResource.getTimePartition(); TimePartitionManager.getInstance() .updateAfterOpeningTsFileProcessor( - new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId); + new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionId); TsFileProcessor tsFileProcessor = new TsFileProcessor( - dataRegionId, + dataRegionIdString, dataRegionInfo, tsFileResource, this::closeUnsealedTsFileProcessorCallBack, @@ -1023,7 +1037,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), partitionId, false, Long.MAX_VALUE, @@ -1034,7 +1048,7 @@ public class DataRegion implements IDataRegionForQuery { } TimePartitionManager.getInstance() .updateAfterFlushing( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), partitionId, System.currentTimeMillis(), lastFlushTimeMap.getMemSize(partitionId), @@ -1081,7 +1095,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), partitionId, false, Long.MAX_VALUE, @@ -1094,7 +1108,7 @@ public class DataRegion implements IDataRegionForQuery { } TimePartitionManager.getInstance() .updateAfterFlushing( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), partitionId, System.currentTimeMillis(), lastFlushTimeMap.getMemSize(partitionId), @@ -1355,7 +1369,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionId, true, Long.MAX_VALUE, @@ -1692,7 +1706,7 @@ public class DataRegion implements IDataRegionForQuery { // build new processor, memory control module will control the number of memtables TimePartitionManager.getInstance() .updateAfterOpeningTsFileProcessor( - new DataRegionId(Integer.parseInt(dataRegionId)), timeRangeId); + new DataRegionId(Integer.parseInt(dataRegionIdString)), timeRangeId); res = newTsFileProcessor(sequence, timeRangeId); if (workSequenceTsFileProcessors.get(timeRangeId) == null && workUnsequenceTsFileProcessors.get(timeRangeId) == null) { @@ -1714,7 +1728,7 @@ public class DataRegion implements IDataRegionForQuery { TsFileNameGenerator.generateNewTsFilePathWithMkdir( sequence, databaseName, - dataRegionId, + dataRegionIdString, timePartitionId, System.currentTimeMillis(), version, @@ -1728,7 +1742,7 @@ public class DataRegion implements IDataRegionForQuery { boolean sequence, String filePath, long timePartitionId) throws IOException { TsFileProcessor tsFileProcessor = new TsFileProcessor( - databaseName + FILE_NAME_SEPARATOR + dataRegionId, + databaseName + FILE_NAME_SEPARATOR + dataRegionIdString, fsFactory.getFileWithParent(filePath), dataRegionInfo, this::closeUnsealedTsFileProcessorCallBack, @@ -1803,15 +1817,15 @@ public class DataRegion implements IDataRegionForQuery { public void deleteFolder(String systemDir) { logger.info( "{} will close all files for deleting data folder {}", - databaseName + "-" + dataRegionId, + databaseName + "-" + dataRegionIdString, systemDir); FileTimeIndexCacheRecorder.getInstance() - .removeFileTimeIndexCache(Integer.parseInt(dataRegionId)); + .removeFileTimeIndexCache(Integer.parseInt(dataRegionIdString)); writeLock("deleteFolder"); try { File dataRegionSystemFolder = SystemFileFactory.INSTANCE.getFile( - systemDir + File.separator + databaseName, dataRegionId); + systemDir + File.separator + databaseName, dataRegionIdString); org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent( dataRegionSystemFolder); } finally { @@ -1820,7 +1834,7 @@ public class DataRegion implements IDataRegionForQuery { } public void deleteDALFolderAndClose() { - Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString)) .ifPresent( manager -> { manager.close(); @@ -1849,7 +1863,7 @@ public class DataRegion implements IDataRegionForQuery { /** delete tsfile */ public void syncDeleteDataFiles() throws TsFileProcessorException { logger.info( - "{} will close all files for deleting data files", databaseName + "-" + dataRegionId); + "{} will close all files for deleting data files", databaseName + "-" + dataRegionIdString); writeLock("syncDeleteDataFiles"); try { forceCloseAllWorkingTsFileProcessors(); @@ -1877,11 +1891,11 @@ public class DataRegion implements IDataRegionForQuery { lastFlushTimeMap.clearFlushedTime(); lastFlushTimeMap.clearGlobalFlushedTime(); TimePartitionManager.getInstance() - .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionId))); + .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionIdString))); } catch (InterruptedException e) { logger.error( "CloseFileNodeCondition error occurs while waiting for closing the storage " + "group {}", - databaseName + "-" + dataRegionId, + databaseName + "-" + dataRegionIdString, e); Thread.currentThread().interrupt(); } finally { @@ -1892,7 +1906,7 @@ public class DataRegion implements IDataRegionForQuery { private void deleteAllSGFolders(List<String> folder) { for (String tsfilePath : folder) { File dataRegionDataFolder = - fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId); + fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionIdString); if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) { try { fsFactory.deleteDirectory(dataRegionDataFolder.getPath()); @@ -1955,7 +1969,7 @@ public class DataRegion implements IDataRegionForQuery { "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", tsFileProcessor.getTimeRangeId(), databaseName, - dataRegionId); + dataRegionIdString); fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); count++; } @@ -1981,7 +1995,7 @@ public class DataRegion implements IDataRegionForQuery { "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", tsFileProcessor.getTimeRangeId(), databaseName, - dataRegionId); + dataRegionIdString); fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); count++; } @@ -2004,7 +2018,7 @@ public class DataRegion implements IDataRegionForQuery { } catch (InterruptedException | ExecutionException e) { logger.error( "CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}", - databaseName + "-" + dataRegionId, + databaseName + "-" + dataRegionIdString, e); Thread.currentThread().interrupt(); } @@ -2038,7 +2052,7 @@ public class DataRegion implements IDataRegionForQuery { } catch (InterruptedException | ExecutionException e) { logger.error( "CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}", - databaseName + "-" + dataRegionId, + databaseName + "-" + dataRegionIdString, e); Thread.currentThread().interrupt(); } @@ -2062,7 +2076,7 @@ public class DataRegion implements IDataRegionForQuery { if (System.currentTimeMillis() - startTime > 60_000) { logger.warn( "{} has spent {}s to wait for closing all TsFiles.", - databaseName + "-" + this.dataRegionId, + databaseName + "-" + this.dataRegionIdString, (System.currentTimeMillis() - startTime) / 1000); logger.warn( "Sseq files: {}, unseq files: {}", @@ -2078,7 +2092,8 @@ public class DataRegion implements IDataRegionForQuery { List<Future<?>> futures = new ArrayList<>(); int count = 0; try { - logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId); + logger.info( + "async force close all files in database: {}", databaseName + "-" + dataRegionIdString); // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsFileProcessor : new ArrayList<>(workSequenceTsFileProcessors.values())) { @@ -2102,7 +2117,8 @@ public class DataRegion implements IDataRegionForQuery { public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException { writeLock("forceCloseAllWorkingTsFileProcessors"); try { - logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId); + logger.info( + "force close all processors in database: {}", databaseName + "-" + dataRegionIdString); // to avoid concurrent modification problem, we need a new array list List<TsFileResource> closedTsFileResources = new ArrayList<>(); for (TsFileProcessor tsFileProcessor : @@ -2626,7 +2642,7 @@ public class DataRegion implements IDataRegionForQuery { deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, sealedTsFileResource); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionId, node); + PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2692,7 +2708,7 @@ public class DataRegion implements IDataRegionForQuery { // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionId, node); + PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2720,7 +2736,7 @@ public class DataRegion implements IDataRegionForQuery { final long searchIndex = node.getSearchIndex(); logger.info( "{} will delete data files directly for deleting data between {} and {}", - databaseName + "-" + dataRegionId, + databaseName + "-" + dataRegionIdString, startTime, endTime); @@ -2749,7 +2765,7 @@ public class DataRegion implements IDataRegionForQuery { deleteDataDirectlyInFile(unsealedTsFileResource, deletion); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionId, node); + PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -3182,7 +3198,7 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { TimePartitionManager.getInstance() .updateAfterFlushing( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), processor.getTimeRangeId(), systemFlushTime, lastFlushTimeMap.getMemSize(processor.getTimeRangeId()), @@ -3302,7 +3318,8 @@ public class DataRegion implements IDataRegionForQuery { if (skipCurrentTTLAndModificationCheck()) { return 0; } - logger.info("[TTL] {}-{} Start ttl and modification checking.", databaseName, dataRegionId); + logger.info( + "[TTL] {}-{} Start ttl and modification checking.", databaseName, dataRegionIdString); CompactionScheduleContext context = new CompactionScheduleContext( EncryptDBUtils.getFirstEncryptParamFromDatabase(databaseName)); @@ -3327,7 +3344,7 @@ public class DataRegion implements IDataRegionForQuery { logger.info( "[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.", databaseName, - dataRegionId, + dataRegionIdString, context.getFullyDirtyFileNum(), context.getPartiallyDirtyFileNum()); } catch (InterruptedException e) { @@ -3549,6 +3566,17 @@ public class DataRegion implements IDataRegionForQuery { final boolean isGeneratedByPipe, final boolean isFromConsensus) throws LoadFileException { + if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensus) { + final IoTConsensusServerImpl impl = + ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(dataRegionId); + if (Objects.nonNull(impl) && !impl.isActive()) { + throw new LoadFileException( + String.format( + "Peer is inactive and not ready to write request, %s, DataNode Id: %s", + dataRegionId, IoTDBDescriptor.getInstance().getConfig().getDataNodeId())); + } + } + final File tsfileToBeInserted = newTsFileResource.getTsFile().getAbsoluteFile(); final long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); @@ -3615,7 +3643,8 @@ public class DataRegion implements IDataRegionForQuery { newTsFileResource.getTsFile().getName()); if (config.isEnableSeparateData()) { - final DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionId)); + final DataRegionId dataRegionId = + new DataRegionId(Integer.parseInt(this.dataRegionIdString)); final long timePartitionId = newTsFileResource.getTimePartition(); if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) { TimePartitionManager.getInstance() @@ -3773,7 +3802,7 @@ public class DataRegion implements IDataRegionForQuery { final String fileName = databaseName + File.separatorChar - + dataRegionId + + dataRegionIdString + File.separatorChar + filePartitionId + File.separator @@ -3864,7 +3893,7 @@ public class DataRegion implements IDataRegionForQuery { // Listen before the tsFile is added into tsFile manager to avoid it being compacted PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionId, databaseName, tsFileResource, true); + .listenToTsFile(dataRegionIdString, databaseName, tsFileResource, true); tsFileManager.add(tsFileResource, false); @@ -4039,8 +4068,8 @@ public class DataRegion implements IDataRegionForQuery { } @Override - public String getDataRegionId() { - return dataRegionId; + public String getDataRegionIdString() { + return dataRegionIdString; } /** @@ -4049,14 +4078,15 @@ public class DataRegion implements IDataRegionForQuery { * @return data region path, like root.sg1/0 */ public String getStorageGroupPath() { - return databaseName + File.separator + dataRegionId; + return databaseName + File.separator + dataRegionIdString; } public void abortCompaction() { tsFileManager.setAllowCompaction(false); CompactionScheduleTaskManager.getInstance().unregisterDataRegion(this); List<AbstractCompactionTask> runningTasks = - CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId); + CompactionTaskManager.getInstance() + .abortCompaction(databaseName + "-" + dataRegionIdString); while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) { try { TimeUnit.MILLISECONDS.sleep(10); @@ -4115,7 +4145,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionId, true, Long.MAX_VALUE, @@ -4233,7 +4263,7 @@ public class DataRegion implements IDataRegionForQuery { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), + new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionIds[i], true, Long.MAX_VALUE, @@ -4351,7 +4381,7 @@ public class DataRegion implements IDataRegionForQuery { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegionId, + dataRegionIdString, Tag.TYPE.toString(), Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertNode.isGeneratedByRemoteConsensusLeader()) { @@ -4365,7 +4395,7 @@ public class DataRegion implements IDataRegionForQuery { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegionId, + dataRegionIdString, Tag.TYPE.toString(), Metric.MEMTABLE_POINT_COUNT.toString()); } @@ -4380,7 +4410,7 @@ public class DataRegion implements IDataRegionForQuery { .getAllLocalFilesFolders() .forEach( folder -> { - folder = folder + File.separator + databaseName + File.separator + dataRegionId; + folder = folder + File.separator + databaseName + File.separator + dataRegionIdString; countFolderDiskSize(folder, diskSize); }); return diskSize.get() / 1024 / 1024; @@ -4503,7 +4533,7 @@ public class DataRegion implements IDataRegionForQuery { // identifier should be same with getTsFileProcessor method return Optional.of( WALManager.getInstance() - .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId)); + .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString)); } /** Wait for this data region successfully deleted */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java index 108c75d50ad..b087ab2db54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java @@ -81,5 +81,5 @@ public interface IDataRegionForQuery { /** Get database name of this DataRegion */ String getDatabaseName(); - String getDataRegionId(); + String getDataRegionIdString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java index 5194d99b71d..6ead0b2eed5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java @@ -117,7 +117,7 @@ public class VirtualDataRegion implements IDataRegionForQuery { } @Override - public String getDataRegionId() { + public String getDataRegionIdString() { return VIRTUAL_DATA_REGION_ID; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java index 87a4d0f7e35..eea95562495 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java @@ -41,7 +41,7 @@ public class RepairTimePartition { public RepairTimePartition(DataRegion dataRegion, long timePartitionId, long maxFileTimestamp) { this.databaseName = dataRegion.getDatabaseName(); - this.dataRegionId = dataRegion.getDataRegionId(); + this.dataRegionId = dataRegion.getDataRegionIdString(); this.tsFileManager = dataRegion.getTsFileManager(); this.timePartitionId = timePartitionId; this.maxFileTimestamp = maxFileTimestamp; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 62756f4620b..68776c92c0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -271,7 +271,7 @@ public class TsFileProcessor { // recordCreateMemtableBlockCost infoForMetrics[0] += System.nanoTime() - startTime; WritingMetrics.getInstance() - .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1); } } @@ -338,7 +338,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDataRegionIdString(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowNode, tsFileResource); @@ -434,7 +434,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDataRegionIdString(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowsNode, tsFileResource); @@ -467,7 +467,7 @@ public class TsFileProcessor { MemTableManager.getInstance() .getAvailableMemTable( dataRegionInfo.getDataRegion().getDatabaseName(), - dataRegionInfo.getDataRegion().getDataRegionId()); + dataRegionInfo.getDataRegion().getDataRegionIdString()); walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath()); } @@ -606,7 +606,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDataRegionIdString(), dataRegionInfo.getDataRegion().getDatabaseName(), insertTabletNode, tsFileResource); @@ -1440,7 +1440,7 @@ public class TsFileProcessor { WritingMetrics.getInstance() .recordMemTableLiveDuration(System.currentTimeMillis() - getWorkMemTableCreatedTime()); WritingMetrics.getInstance() - .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), -1); + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), -1); WritingMetrics.getInstance().recordWALEntryNumForOneTsFile(walEntryNum); workMemTable = null; return FlushManager.getInstance().registerTsFileProcessor(this); @@ -1536,7 +1536,7 @@ public class TsFileProcessor { memTableToFlush, writer, dataRegionName, - dataRegionInfo.getDataRegion().getDataRegionId()); + dataRegionInfo.getDataRegion().getDataRegionIdString()); flushTask.syncFlushMemTable(); memTableFlushPointCount = memTableToFlush.getTotalPointsNum(); } catch (Throwable e) { @@ -1718,7 +1718,7 @@ public class TsFileProcessor { String.format("%.2f", compressionRatio), totalMemTableSize, writer.getPos()); - String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId(); + String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionIdString(); WritingMetrics.getInstance() .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio); CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos(), dataRegionId); @@ -1742,7 +1742,7 @@ public class TsFileProcessor { // before resource serialization to avoid missing hardlink after restart PipeInsertionDataNodeListener.getInstance() .listenToTsFile( - dataRegionInfo.getDataRegion().getDataRegionId(), + dataRegionInfo.getDataRegion().getDataRegionIdString(), dataRegionInfo.getDataRegion().getDatabaseName(), tsFileResource, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java index dde04c10952..f4313827c9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java @@ -110,14 +110,14 @@ public class SnapshotTaker { LOGGER.warn( "Failed to take snapshot for {}-{}, clean up", dataRegion.getDatabaseName(), - dataRegion.getDataRegionId()); + dataRegion.getDataRegionIdString()); cleanUpWhenFail(finalSnapshotId); } else { snapshotLogger.logEnd(); LOGGER.info( "Successfully take snapshot for {}-{}, snapshot directory is {}", dataRegion.getDatabaseName(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), snapshotDir.getParentFile().getAbsolutePath() + File.separator + finalSnapshotId); } @@ -126,7 +126,7 @@ public class SnapshotTaker { LOGGER.error( "Exception occurs when taking snapshot for {}-{}", dataRegion.getDatabaseName(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), e); return false; } finally { @@ -140,7 +140,7 @@ public class SnapshotTaker { private boolean snapshotCompressionRatio(String snapshotDir) { File compressionRatioFile = - CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionId()); + CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionIdString()); if (compressionRatioFile != null) { LOGGER.info("Snapshotting compression ratio {}.", compressionRatioFile.getName()); try { @@ -174,7 +174,9 @@ public class SnapshotTaker { StringBuilder pathBuilder = new StringBuilder(dataDir); pathBuilder.append(File.separator).append(IoTDBConstant.SNAPSHOT_FOLDER_NAME); pathBuilder.append(File.separator).append(dataRegion.getDatabaseName()); - pathBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR).append(dataRegion.getDataRegionId()); + pathBuilder + .append(IoTDBConstant.FILE_NAME_SEPARATOR) + .append(dataRegion.getDataRegionIdString()); try { String path = pathBuilder.toString(); if (new File(path).exists()) { @@ -344,7 +346,7 @@ public class SnapshotTaker { stringBuilder.append(File.separator); stringBuilder.append(dataRegion.getDatabaseName()); stringBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR); - stringBuilder.append(dataRegion.getDataRegionId()); + stringBuilder.append(dataRegion.getDataRegionIdString()); stringBuilder.append(File.separator); stringBuilder.append(snapshotId); stringBuilder.append(File.separator); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 30bb64cbd31..2b9fc0d8a87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -359,7 +359,7 @@ public class LoadTsFileManager { final long writePointCount, final boolean isGeneratedByPipeConsensusLeader) { MemTableFlushTask.recordFlushPointsMetricInternal( - writePointCount, databaseName, dataRegion.getDataRegionId()); + writePointCount, databaseName, dataRegion.getDataRegionIdString()); MetricService.getInstance() .count( writePointCount, @@ -370,7 +370,7 @@ public class LoadTsFileManager { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); // Because we cannot accurately judge who is the leader here, @@ -381,7 +381,7 @@ public class LoadTsFileManager { .getReplicationNum( ConsensusGroupId.Factory.create( TConsensusGroupType.DataRegion.getValue(), - Integer.parseInt(dataRegion.getDataRegionId()))); + Integer.parseInt(dataRegion.getDataRegionIdString()))); // It may happen that the replicationNum is 0 when load and db deletion occurs // concurrently, so we can just not to count the number of points in this case if (replicationNum != 0 && !isGeneratedByPipeConsensusLeader) { @@ -395,7 +395,7 @@ public class LoadTsFileManager { Tag.DATABASE.toString(), databaseName, Tag.REGION.toString(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); } @@ -844,7 +844,7 @@ public class LoadTsFileManager { return String.join( IoTDBConstant.FILE_NAME_SEPARATOR, dataRegion.getDatabaseName(), - dataRegion.getDataRegionId(), + dataRegion.getDataRegionIdString(), Long.toString(timePartitionSlot.getStartTime())); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index 0ecca76aa84..622c2c4ebbf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java @@ -25,16 +25,24 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.iot.IoTConsensus; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; +import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl; import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -42,6 +50,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.schemaengine.SchemaEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq; @@ -67,20 +77,50 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class DataNodeInternalRPCServiceImplTest { private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl; + private static IConsensus instance; private static final int dataNodeId = 0; + private static final File storageDir = new File("target" + java.io.File.separator + "impl"); + private static DataRegion dataRegion; @BeforeClass - public static void setUpBeforeClass() throws IOException, MetadataException { + public static void setUpBeforeClass() throws IOException, MetadataException, ConsensusException { // In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in RatisConsensus conf.setDataNodeId(dataNodeId); + org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir); SchemaEngine.getInstance().init(); SchemaEngine.getInstance().createSchemaRegion("root.ln", new SchemaRegionId(0)); + final DataRegionId id = new DataRegionId(1); + dataRegion = new DataRegion("root.ln", "1"); + instance = DataRegionConsensusImpl.getInstance(); + DataRegionConsensusImpl.setInstance( + ConsensusFactory.getConsensusImpl( + ConsensusFactory.IOT_CONSENSUS, + ConsensusConfig.newBuilder() + .setThisNodeId(1) + .setThisNode(new TEndPoint("0.0.0.0", 6667)) + .setStorageDir(storageDir.getAbsolutePath()) + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .build(), + gid -> new IoTConsensusDataRegionStateMachine(dataRegion)) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + ConsensusFactory.CONSTRUCT_FAILED_MSG, + ConsensusFactory.IOT_CONSENSUS)))); + if (Objects.isNull( + ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(new DataRegionId(1)))) { + DataRegionConsensusImpl.getInstance() + .createLocalPeer( + id, Collections.singletonList(new Peer(id, 1, new TEndPoint("0.0.0.0", 6667)))); + } DataRegionConsensusImpl.getInstance().start(); SchemaRegionConsensusImpl.getInstance().start(); DataNodeRegionManager.getInstance().init(); @@ -109,13 +149,15 @@ public class DataNodeInternalRPCServiceImplTest { public static void tearDownAfterClass() throws IOException, StorageEngineException { DataNodeRegionManager.getInstance().clear(); DataRegionConsensusImpl.getInstance().stop(); + DataRegionConsensusImpl.setInstance(instance); SchemaRegionConsensusImpl.getInstance().stop(); SchemaEngine.getInstance().clear(); EnvironmentUtils.cleanEnv(); + org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir); } @Test - public void testCreateTimeseries() throws MetadataException { + public void testCreateTimeSeries() throws MetadataException { CreateTimeSeriesNode createTimeSeriesNode = new CreateTimeSeriesNode( new PlanNodeId("0"), @@ -162,8 +204,19 @@ public class DataNodeInternalRPCServiceImplTest { Assert.assertTrue(response.getResponses().get(0).accepted); } + @Test(expected = LoadFileException.class) + public void testRejectLoad4NonActiveImpl() throws LoadFileException { + ((IoTConsensus) DataRegionConsensusImpl.getInstance()) + .getImpl(new DataRegionId(1)) + .setActive(false); + dataRegion.loadNewTsFile(new TsFileResource(), false, false, false); + ((IoTConsensus) DataRegionConsensusImpl.getInstance()) + .getImpl(new DataRegionId(1)) + .setActive(true); + } + @Test - public void testCreateAlignedTimeseries() throws MetadataException { + public void testCreateAlignedTimeSeries() throws MetadataException { CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode = new CreateAlignedTimeSeriesNode( new PlanNodeId("0"), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java index a298c7a409e..4c4ac20e9ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java @@ -58,8 +58,8 @@ public class StorageEngineTest { DataRegion rg1 = PowerMockito.mock(DataRegion.class); DataRegion rg2 = PowerMockito.mock(DataRegion.class); DataRegionId id2 = new DataRegionId(2); - PowerMockito.when(rg1.getDataRegionId()).thenReturn("1"); - PowerMockito.when(rg2.getDataRegionId()).thenReturn("2"); + PowerMockito.when(rg1.getDataRegionIdString()).thenReturn("1"); + PowerMockito.when(rg2.getDataRegionIdString()).thenReturn("2"); storageEngine.setDataRegion(id1, rg1); storageEngine.setDataRegion(id2, rg2); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index 921fe860b8c..c3895a058c6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -1212,7 +1212,7 @@ public class DataRegionTest { } for (DataRegion region : regionsToBeDeleted) { StorageEngine.getInstance() - .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId()))); + .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString()))); } Thread.sleep(500); @@ -1411,7 +1411,7 @@ public class DataRegionTest { } for (DataRegion region : regionsToBeDeleted) { StorageEngine.getInstance() - .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId()))); + .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString()))); } Thread.sleep(500); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java index 92337fe151c..3108a204686 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java @@ -89,7 +89,7 @@ public class RepairUnsortedFileSchedulerTest extends AbstractRepairDataTest { DataRegion mockDataRegion = Mockito.mock(DataRegion.class); Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); - Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); + Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0"); Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); TsFileResource seqResource1 = createEmptyFileAndResource(true); @@ -136,7 +136,7 @@ public class RepairUnsortedFileSchedulerTest extends AbstractRepairDataTest { DataRegion mockDataRegion = Mockito.mock(DataRegion.class); Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); - Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); + Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0"); Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); TsFileResource seqResource1 = createEmptyFileAndResource(true); @@ -206,7 +206,7 @@ public class RepairUnsortedFileSchedulerTest extends AbstractRepairDataTest { DataRegion mockDataRegion = Mockito.mock(DataRegion.class); Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); - Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); + Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0"); Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); TsFileResource seqResource1 = createEmptyFileAndResource(true); @@ -265,7 +265,7 @@ public class RepairUnsortedFileSchedulerTest extends AbstractRepairDataTest { DataRegion mockDataRegion = Mockito.mock(DataRegion.class); Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); - Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); + Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0"); Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); TsFileResource seqResource1 = createEmptyFileAndResource(true); @@ -313,7 +313,7 @@ public class RepairUnsortedFileSchedulerTest extends AbstractRepairDataTest { DataRegion mockDataRegion = Mockito.mock(DataRegion.class); Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager); Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg"); - Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0"); + Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0"); Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L)); TsFileResource seqResource1 = createEmptyFileAndResource(true); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java index 6bec0df050b..9314fc2fa0a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java @@ -225,7 +225,7 @@ public class IoTDBSnapshotTest { + "1-1-0-0.tsfile"); DataRegion region = Mockito.mock(DataRegion.class); Mockito.when(region.getDatabaseName()).thenReturn("root.test"); - Mockito.when(region.getDataRegionId()).thenReturn("0"); + Mockito.when(region.getDataRegionIdString()).thenReturn("0"); File snapshotFile = new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId"); Assert.assertEquals(
