This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch pipe-flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 086f9793ff22f898fd206c2b0416b908db25b845 Author: Caideyipi <[email protected]> AuthorDate: Fri Feb 13 17:40:26 2026 +0800 clean --- .../listener/PipeInsertionDataNodeListener.java | 18 ++++++------ .../db/storageengine/dataregion/DataRegion.java | 8 ++++-- .../dataregion/memtable/TsFileProcessor.java | 8 +++--- .../db/pipe/source/PipeRealtimeExtractTest.java | 32 +++++++--------------- 4 files changed, 29 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 8fb95d78fc4..d255d80166a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -51,8 +51,8 @@ public class PipeInsertionDataNodeListener { private final ConcurrentMap<Integer, PipeDataRegionAssigner> dataRegionId2Assigner = new ConcurrentHashMap<>(); - private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); - private final AtomicInteger listenToInsertNodeExtractorCount = new AtomicInteger(0); + private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0); + private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0); //////////////////////////// start & stop //////////////////////////// @@ -63,10 +63,10 @@ public class PipeInsertionDataNodeListener { .startAssignTo(extractor); if (extractor.isNeedListenToTsFile()) { - listenToTsFileExtractorCount.incrementAndGet(); + listenToTsFileSourceCount.incrementAndGet(); } if (extractor.isNeedListenToInsertNode()) { - listenToInsertNodeExtractorCount.incrementAndGet(); + listenToInsertNodeSourceCount.incrementAndGet(); } } @@ -80,10 +80,10 @@ public class PipeInsertionDataNodeListener { assigner.stopAssignTo(extractor); if (extractor.isNeedListenToTsFile()) { - listenToTsFileExtractorCount.decrementAndGet(); + listenToTsFileSourceCount.decrementAndGet(); } if (extractor.isNeedListenToInsertNode()) { - listenToInsertNodeExtractorCount.decrementAndGet(); + listenToInsertNodeSourceCount.decrementAndGet(); } if (assigner.notMoreExtractorNeededToBeAssigned()) { @@ -97,7 +97,7 @@ public class PipeInsertionDataNodeListener { //////////////////////////// listen to events //////////////////////////// public void listenToTsFile( - final String dataRegionId, + final int dataRegionId, final String databaseName, final TsFileResource tsFileResource, final boolean isLoaded) { @@ -118,11 +118,11 @@ public class PipeInsertionDataNodeListener { } public void listenToInsertNode( - final String dataRegionId, + final int dataRegionId, final String databaseName, final InsertNode insertNode, final TsFileResource tsFileResource) { - if (listenToInsertNodeExtractorCount.get() == 0) { + if (listenToInsertNodeSourceCount.get() == 0) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8a76f891679..7a0d5b86833 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -4157,7 +4157,7 @@ public class DataRegion implements IDataRegionForQuery { // Listen before the tsFile is added into tsFile manager to avoid it being compacted PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionIdString, databaseName, tsFileResource, true); + .listenToTsFile(dataRegionId.getId(), databaseName, tsFileResource, true); tsFileManager.add(tsFileResource, false); @@ -4336,12 +4336,16 @@ public class DataRegion implements IDataRegionForQuery { return dataRegionIdString; } + public int getDataRegionId() { + return dataRegionId.getId(); + } + /** * Get the storageGroupPath with dataRegionId. * * @return data region path, like root.sg1/0 */ - public String getStorageGroupPath() { + public String getDatabasePath() { return databaseName + File.separator + dataRegionIdString; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index acdac61180b..85e0bbba055 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -340,7 +340,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowNode, tsFileResource); @@ -437,7 +437,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowsNode, tsFileResource); @@ -610,7 +610,7 @@ public class TsFileProcessor { } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertTabletNode, tsFileResource); @@ -1746,7 +1746,7 @@ public class TsFileProcessor { // before resource serialization to avoid missing hardlink after restart PipeInsertionDataNodeListener.getInstance() .listenToTsFile( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), tsFileResource, false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java index 59aec4b1f7b..9e07e91e983 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java @@ -70,8 +70,8 @@ public class PipeRealtimeExtractTest { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class); - private final String dataRegion1 = "1"; - private final String dataRegion2 = "2"; + private final int dataRegion1 = 1; + private final int dataRegion2 = 2; private final String pattern1 = "root.sg.d"; private final String pattern2 = "root.sg.d.a"; private final String[] device = new String[] {"root", "sg", "d"}; @@ -151,31 +151,19 @@ public class PipeRealtimeExtractTest { final PipeTaskRuntimeConfiguration configuration0 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion1), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration1 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion1), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration2 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion2), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion2, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration3 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion2), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion2, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); // Some parameters of extractor are validated and initialized during the validation process. extractor0.validate(new PipeParameterValidator(parameters0)); @@ -274,7 +262,7 @@ public class PipeRealtimeExtractTest { } private Future<?> write2DataRegion( - final int writeNum, final String dataRegionId, final int startNum) { + final int writeNum, final int dataRegionId, final int startNum) { final File dataRegionDir = new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0"); final boolean ignored = dataRegionDir.mkdirs(); @@ -305,7 +293,7 @@ public class PipeRealtimeExtractTest { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, - dataRegionId, + Integer.toString(dataRegionId), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device), @@ -319,7 +307,7 @@ public class PipeRealtimeExtractTest { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, - dataRegionId, + Integer.toString(dataRegionId), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device), @@ -331,7 +319,7 @@ public class PipeRealtimeExtractTest { false), resource); PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionId, dataRegionId, resource, false); + .listenToTsFile(dataRegionId, Integer.toString(dataRegionId), resource, false); } }); }
