This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch pipe-flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8eac942c9984c03f2b4713bc75ecbff5fae23615 Author: Caideyipi <[email protected]> AuthorDate: Fri Feb 13 17:22:54 2026 +0800 IT-fix --- .../it/env/cluster/config/MppCommonConfig.java | 7 +++ .../env/cluster/config/MppSharedCommonConfig.java | 7 +++ .../it/env/remote/config/RemoteCommonConfig.java | 5 ++ .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../treemodel/auto/basic/IoTDBPipeSourceIT.java | 53 +++++++++++++++++++++- .../agent/task/subtask/sink/PipeSinkSubtask.java | 3 +- .../pipe/consensus/deletion/DeletionResource.java | 9 ++-- .../deletion/DeletionResourceManager.java | 8 ++-- .../deletion/persist/PageCacheDeletionBuffer.java | 4 +- .../consensus/deletion/recover/DeletionReader.java | 4 +- .../event/common/heartbeat/PipeHeartbeatEvent.java | 6 +-- .../event/realtime/PipeRealtimeEventFactory.java | 2 +- .../db/pipe/metric/source/PipeAssignerMetrics.java | 32 ++++++------- ...istoricalDataRegionTsFileAndDeletionSource.java | 4 +- .../realtime/PipeRealtimeDataRegionSource.java | 4 +- .../realtime/assigner/PipeDataRegionAssigner.java | 8 ++-- .../listener/PipeInsertionDataNodeListener.java | 8 ++-- .../db/storageengine/dataregion/DataRegion.java | 11 +++-- .../db/pipe/consensus/DeletionRecoverTest.java | 6 +-- .../db/pipe/consensus/DeletionResourceTest.java | 22 ++++----- 20 files changed, 137 insertions(+), 68 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 53aa395491b..f2b788b772f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -465,6 +465,13 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeTsFileFlushIntervalSeconds(long pipeTsFileFlushIntervalSeconds) { + setProperty( + "pipe_tsfile_flush_interval_seconds", String.valueOf(pipeTsFileFlushIntervalSeconds)); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index ed8a8943303..88a85e27f46 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -479,6 +479,13 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeTsFileFlushIntervalSeconds(long pipeTsFileFlushIntervalSeconds) { + dnConfig.setPipeTsFileFlushIntervalSeconds(pipeTsFileFlushIntervalSeconds); + cnConfig.setPipeTsFileFlushIntervalSeconds(pipeTsFileFlushIntervalSeconds); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 0c05af17f6a..31fbe9e78fb 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -338,6 +338,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeTsFileFlushIntervalSeconds(long pipeTsFileFlushIntervalSeconds) { + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 95ac239e89b..92a6af5851d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -150,6 +150,8 @@ public interface CommonConfig { CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled); + CommonConfig setPipeTsFileFlushIntervalSeconds(long pipeTsFileFlushIntervalSeconds); + CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck); CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java index a45115f5a49..3351958d2a1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java @@ -74,6 +74,7 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualTreeModelAutoIT { .setEnableUnseqSpaceCompaction(false) .setEnableCrossSpaceCompaction(false) .setPipeMemoryManagementEnabled(false) + .setPipeTsFileFlushIntervalSeconds(10) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH"); @@ -323,7 +324,7 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualTreeModelAutoIT { } @Test - public void testExtractorInvalidParameter() throws Exception { + public void testSourceInvalidParameter() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -608,6 +609,56 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualTreeModelAutoIT { } } + @Test + public void testSourceAutoFlush() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map<String, String> sourceAttributes = new HashMap<>(); + final Map<String, String> processorAttributes = new HashMap<>(); + final Map<String, String> sinkAttributes = new HashMap<>(); + + sourceAttributes.put("source.realtime.mode", "batch"); + sourceAttributes.put("user", "root"); + + sinkAttributes.put("sink", "iotdb-thrift-sink"); + sinkAttributes.put("sink.ip", receiverIp); + sinkAttributes.put("sink.port", Integer.toString(receiverPort)); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db1.d1 (time, at1) values (1, 10)", + "insert into root.db1.d2 (time, at1) values (1, 20)"), + null); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", sinkAttributes) + .setExtractorAttributes(sourceAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db1.d1 (time, at1) values (2, 10)", + "insert into root.db1.d2 (time, at1) values (2, 20)"), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(*) from root.db*.**", + "count(root.db1.d1.at1),count(root.db2.d1.at1),", + Collections.singleton("2,2,"), + 60); + } + } + @Test public void testHistoryAndRealtime() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 96c519ce636..91bccc6cbda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // to trigger the general event transfer function, causing potentially such as // the random delay of the batch transmission. Therefore, here we inject cron events // when no event can be pulled. - public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = - new PipeHeartbeatEvent("cron", false); + public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent(-1, false); public PipeSinkSubtask( final String taskID, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java index 3bf5c1287b7..520b57a229f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java @@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource { private volatile Exception cause; public DeletionResource( - AbstractDeleteDataNode deleteDataNode, - Consumer<DeletionResource> removeHook, - String regionId) { + AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, int regionId) { this.deleteDataNode = deleteDataNode; this.removeHook = removeHook; this.currentStatus = Status.RUNNING; this.consensusGroupId = - ConsensusGroupId.Factory.create( - TConsensusGroupType.DataRegion.getValue(), Integer.parseInt(regionId)); + ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), regionId); this.pipeTaskReferenceCount = new AtomicInteger( DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1); @@ -151,7 +148,7 @@ public class DeletionResource implements PersistentResource { } public static DeletionResource deserialize( - final ByteBuffer buffer, final String regionId, final Consumer<DeletionResource> removeHook) + final ByteBuffer buffer, final int regionId, final Consumer<DeletionResource> removeHook) throws IOException { AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer); return new DeletionResource(node, removeHook, regionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java index ff868ffd445..91706fbf54f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java @@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable { String.format( "^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$", REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX); - private final String dataRegionId; + private final int dataRegionId; private final DeletionBuffer deletionBuffer; private final File storageDir; private final Map<AbstractDeleteDataNode, DeletionResource> deleteNode2ResourcesMap = @@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable { private final Condition recoveryReadyCondition = recoverLock.newCondition(); private volatile boolean hasCompletedRecovery = false; - private DeletionResourceManager(String dataRegionId) throws IOException { + private DeletionResourceManager(int dataRegionId) throws IOException { this.dataRegionId = dataRegionId; this.storageDir = new File( @@ -269,7 +269,7 @@ public class DeletionResourceManager implements AutoCloseable { //////////////////////////// singleton //////////////////////////// private static class DeletionResourceManagerHolder { - private static Map<String, DeletionResourceManager> CONSENSU_GROUP_ID_2_INSTANCE_MAP; + private static Map<Integer, DeletionResourceManager> CONSENSU_GROUP_ID_2_INSTANCE_MAP; private DeletionResourceManagerHolder() {} @@ -280,7 +280,7 @@ public class DeletionResourceManager implements AutoCloseable { } } - public static DeletionResourceManager getInstance(String groupId) { + public static DeletionResourceManager getInstance(int groupId) { // If consensusImpl is not PipeConsensus. if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java index f401cbbc76a..b77105720ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java @@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { ? 0 : (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 : -1)); // Data region id - private final String dataRegionId; + private final int dataRegionId; // directory to store .deletion files private final String baseDirectory; // single thread to serialize WALEntry to workingBuffer @@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { // maxProgressIndex of each batch increases in the same order as the physical time. private ProgressIndex maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE; - public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) { + public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) { this.dataRegionId = dataRegionId; this.baseDirectory = baseDirectory; allocateBuffers(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java index 943693e7b00..1fa00cd0a0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java @@ -40,13 +40,13 @@ public class DeletionReader implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(DeletionReader.class); private static final int MAGIC_STRING_BYTES_SIZE = DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length; - private final String regionId; + private final int regionId; private final Consumer<DeletionResource> removeHook; private final File logFile; private final FileInputStream fileInputStream; private final FileChannel fileChannel; - public DeletionReader(File logFile, String regionId, Consumer<DeletionResource> removeHook) + public DeletionReader(File logFile, int regionId, Consumer<DeletionResource> removeHook) throws IOException { this.logFile = logFile; this.regionId = regionId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 468292b8ecc..e90ae1952aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class); - private final String dataRegionId; + private final int dataRegionId; private long timePublished; private long timeAssigned; @@ -62,7 +62,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { private final boolean shouldPrintMessage; - public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) { + public PipeHeartbeatEvent(final int dataRegionId, final boolean shouldPrintMessage) { super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; this.shouldPrintMessage = shouldPrintMessage; @@ -72,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final String dataRegionId, + final int dataRegionId, final long timePublished, final boolean shouldPrintMessage) { super( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index acc62f7e7a4..540589326ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -57,7 +57,7 @@ public class PipeRealtimeEventFactory { } public static PipeRealtimeEvent createRealtimeEvent( - final String dataRegionId, final boolean shouldPrintMessage) { + final int dataRegionId, final boolean shouldPrintMessage) { return new PipeRealtimeEvent( new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java index 7f89cdc5372..58f8e31da26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java @@ -41,7 +41,7 @@ public class PipeAssignerMetrics implements IMetricSet { private AbstractMetricService metricService; - private final Map<String, PipeDataRegionAssigner> assignerMap = new HashMap<>(); + private final Map<Integer, PipeDataRegionAssigner> assignerMap = new HashMap<>(); //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -49,44 +49,44 @@ public class PipeAssignerMetrics implements IMetricSet { public void bindTo(AbstractMetricService metricService) { this.metricService = metricService; synchronized (this) { - for (String dataRegionId : assignerMap.keySet()) { + for (Integer dataRegionId : assignerMap.keySet()) { createMetrics(dataRegionId); } } } - private void createMetrics(String dataRegionId) { + private void createMetrics(int dataRegionId) { createAutoGauge(dataRegionId); } - private void createAutoGauge(String dataRegionId) { + private void createAutoGauge(int dataRegionId) { metricService.createAutoGauge( Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getPipeHeartbeatEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.createAutoGauge( Metric.UNASSIGNED_TABLET_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getTabletInsertionEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.createAutoGauge( Metric.UNASSIGNED_TSFILE_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getTsFileInsertionEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); } @Override public void unbindFrom(AbstractMetricService metricService) { - ImmutableSet<String> dataRegionIds = ImmutableSet.copyOf(assignerMap.keySet()); - for (String dataRegionId : dataRegionIds) { + ImmutableSet<Integer> dataRegionIds = ImmutableSet.copyOf(assignerMap.keySet()); + for (int dataRegionId : dataRegionIds) { deregister(dataRegionId); } if (!assignerMap.isEmpty()) { @@ -94,32 +94,32 @@ public class PipeAssignerMetrics implements IMetricSet { } } - private void removeMetrics(String dataRegionId) { + private void removeMetrics(int dataRegionId) { removeAutoGauge(dataRegionId); } - private void removeAutoGauge(String dataRegionId) { + private void removeAutoGauge(int dataRegionId) { metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_TABLET_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_TSFILE_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); } //////////////////////////// register & deregister (pipe integration) //////////////////////////// public void register(PipeDataRegionAssigner pipeDataRegionAssigner) { - String dataRegionId = pipeDataRegionAssigner.getDataRegionId(); + int dataRegionId = pipeDataRegionAssigner.getDataRegionId(); synchronized (this) { assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner); if (Objects.nonNull(metricService)) { @@ -128,7 +128,7 @@ public class PipeAssignerMetrics implements IMetricSet { } } - public void deregister(String dataRegionId) { + public void deregister(int dataRegionId) { synchronized (this) { if (!assignerMap.containsKey(dataRegionId)) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index ab74d094740..8aac47b01c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -498,7 +498,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource extractTsFiles(dataRegion, startHistoricalExtractionTime, originalResourceList); } if (shouldExtractDeletion) { - Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId))) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } @@ -946,7 +946,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource dataRegionId, event); } else { - Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId))) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent( manager -> event.setDeletionResource( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 3dec073f057..3f0d5f4687a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { protected String pipeName; protected long creationTime; - protected int dataRegionId; + protected int dataRegionId = -1; protected PipeTaskMeta pipeTaskMeta; protected boolean shouldExtractInsertion; @@ -317,7 +317,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { @Override public void close() throws Exception { - if (Objects.nonNull(dataRegionId)) { + if (dataRegionId >= 0) { PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this); PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 8ea973fbf0a..e60e1ad898d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -66,24 +66,24 @@ public class PipeDataRegionAssigner implements Closeable { /** The {@link DisruptorQueue} is used to assign the event to the extractor. */ private final DisruptorQueue disruptor; - private final String dataRegionId; + private final int dataRegionId; private Boolean isTableModel; private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); - public String getDataRegionId() { + public int getDataRegionId() { return dataRegionId; } - public PipeDataRegionAssigner(final String dataRegionId) { + public PipeDataRegionAssigner(final int dataRegionId) { this.matcher = new CachedSchemaPatternMatcher(); this.disruptor = new DisruptorQueue(this::assignToExtractor, this::onAssignedHook); this.dataRegionId = dataRegionId; PipeAssignerMetrics.getInstance().register(this); final DataRegion dataRegion = - StorageEngine.getInstance().getDataRegion(new DataRegionId(Integer.parseInt(dataRegionId))); + StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); if (Objects.nonNull(dataRegion)) { final String databaseName = dataRegion.getDatabaseName(); if (Objects.nonNull(databaseName)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 8c49dd0a3dd..8fb95d78fc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; * will filter events and assign them to different PipeRealtimeEventDataRegionExtractors. */ public class PipeInsertionDataNodeListener { - private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner = + private final ConcurrentMap<Integer, PipeDataRegionAssigner> dataRegionId2Assigner = new ConcurrentHashMap<>(); private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); @@ -57,7 +57,7 @@ public class PipeInsertionDataNodeListener { //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( - final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { + final int dataRegionId, final PipeRealtimeDataRegionSource extractor) { dataRegionId2Assigner .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) .startAssignTo(extractor); @@ -71,7 +71,7 @@ public class PipeInsertionDataNodeListener { } public synchronized void stopListenAndAssign( - final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { + final int dataRegionId, final PipeRealtimeDataRegionSource extractor) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); if (assigner == null) { return; @@ -139,7 +139,7 @@ public class PipeInsertionDataNodeListener { } public DeletionResource listenToDeleteData( - final String regionId, final AbstractDeleteDataNode node) { + final int regionId, final AbstractDeleteDataNode node) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId); // only events from registered data region will be extracted if (assigner == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d95d51e390e..8a76f891679 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2009,7 +2009,7 @@ public class DataRegion implements IDataRegionForQuery { } public void deleteDALFolderAndClose() { - Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString)) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId.getId())) .ifPresent( manager -> { manager.close(); @@ -2823,7 +2823,8 @@ public class DataRegion implements IDataRegionForQuery { deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, sealedTsFileResource); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2926,7 +2927,8 @@ public class DataRegion implements IDataRegionForQuery { // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2983,7 +2985,8 @@ public class DataRegion implements IDataRegionForQuery { deleteDataDirectlyInFile(unsealedTsFileResource, deletion); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java index 06f823c0e23..c7a93b9d072 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java @@ -42,7 +42,7 @@ import java.io.File; import java.util.Collections; public class DeletionRecoverTest { - private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"}; + private static final Integer[] FAKE_DATA_REGION_IDS = {2, 3}; private static final int THIS_DATANODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private static final String DELETION_BASE_DIR = @@ -50,7 +50,7 @@ public class DeletionRecoverTest { private final int deletionCount = 10; private DeletionResourceManager deletionResourceManager; - public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws Exception { + public void setUp(boolean isRelational, int FAKE_DATA_REGION_ID) throws Exception { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); @@ -84,7 +84,7 @@ public class DeletionRecoverTest { @After public void tearDown() throws Exception { - for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { + for (Integer FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java index f94d909f94b..aa7c5c0c954 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java @@ -66,7 +66,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; public class DeletionResourceTest { - private static final String[] FAKE_DATA_REGION_IDS = {"2", "3", "4", "5", "6"}; + private static final Integer[] FAKE_DATA_REGION_IDS = {2, 3, 4, 5, 6}; private static final String DELETION_BASE_DIR = IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir(); private static final int THIS_DATANODE_ID = 0; @@ -84,7 +84,7 @@ public class DeletionResourceTest { @After public void tearDown() throws Exception { IoTDBDescriptor.getInstance().getConfig().setDataNodeId(previousDataNodeId); - for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { + for (Integer FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); @@ -212,7 +212,7 @@ public class DeletionResourceTest { new PipeDeleteDataNodeEvent( deleteDataNode, "Test", 10, null, null, null, null, null, null, true, true); deletionEvent.setCommitterKeyAndCommitId( - new CommitterKey("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1); + new CommitterKey("Test", 10, FAKE_DATA_REGION_IDS[3], 0), i + 1); deletionEvents.add(deletionEvent); final DeletionResource deletionResource = @@ -227,8 +227,7 @@ public class DeletionResourceTest { // for event commit to invoke onCommit() to removeDAL if (initialIndex == 0) { - PipeEventCommitManager.getInstance() - .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), "Test"); + PipeEventCommitManager.getInstance().register("Test", 10, FAKE_DATA_REGION_IDS[3], "Test"); } deletionEvents.forEach(deletionEvent -> deletionEvent.increaseReferenceCount("test")); final List<Path> paths = @@ -255,7 +254,7 @@ public class DeletionResourceTest { @Test public void testWaitForResult() throws Exception { // prepare pipe component - final PipeRealtimeDataRegionSource extractor = new PipeRealtimeDataRegionHybridSource(); + final PipeRealtimeDataRegionSource source = new PipeRealtimeDataRegionHybridSource(); final PipeParameters parameters = new PipeParameters( new HashMap<String, String>() { @@ -265,13 +264,12 @@ public class DeletionResourceTest { }); final PipeTaskRuntimeConfiguration configuration = new PipeTaskRuntimeConfiguration( - new PipeTaskSourceRuntimeEnvironment( - "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null)); - extractor.customize(parameters, configuration); - Assert.assertTrue(extractor.shouldExtractDeletion()); + new PipeTaskSourceRuntimeEnvironment("1", 1, FAKE_DATA_REGION_IDS[4], null)); + source.customize(parameters, configuration); + Assert.assertTrue(source.shouldExtractDeletion()); PipeInsertionDataNodeListener.getInstance() - .startListenAndAssign(FAKE_DATA_REGION_IDS[4], extractor); + .startListenAndAssign(FAKE_DATA_REGION_IDS[4], source); deletionResourceManager = DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[4]); final int rebootTimes = 0; final MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0"); @@ -285,6 +283,6 @@ public class DeletionResourceTest { Assert.assertSame(Status.SUCCESS, deletionResource.waitForResult()); // close pipe resource PipeInsertionDataNodeListener.getInstance() - .stopListenAndAssign(FAKE_DATA_REGION_IDS[4], extractor); + .stopListenAndAssign(FAKE_DATA_REGION_IDS[4], source); } }
