This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1fdd4b3a557 Pipe: Refactored the dataRegionId data type to integer
(#17214)
1fdd4b3a557 is described below
commit 1fdd4b3a557fdaa7188b4d411b0ed8e597d6644a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 24 17:56:36 2026 +0800
Pipe: Refactored the dataRegionId data type to integer (#17214)
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 3 +-
.../pipe/consensus/deletion/DeletionResource.java | 9 +--
.../deletion/DeletionResourceManager.java | 20 +++---
.../deletion/persist/PageCacheDeletionBuffer.java | 4 +-
.../consensus/deletion/recover/DeletionReader.java | 4 +-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 80 +++++++++++-----------
.../tsfile/PipeCompactedTsFileInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 3 +-
.../event/realtime/PipeRealtimeEventFactory.java | 2 +-
.../db/pipe/metric/source/PipeAssignerMetrics.java | 32 ++++-----
...istoricalDataRegionTsFileAndDeletionSource.java | 4 +-
.../realtime/PipeRealtimeDataRegionSource.java | 8 +--
.../realtime/assigner/PipeDataRegionAssigner.java | 75 ++++++++++----------
.../PipeTsFileEpochProgressIndexKeeper.java | 8 +--
.../listener/PipeInsertionDataNodeListener.java | 12 ++--
.../listener/PipeTimePartitionListener.java | 21 +++---
.../db/storageengine/dataregion/DataRegion.java | 17 +++--
.../dataregion/memtable/TsFileProcessor.java | 8 +--
.../rescon/memory/TimePartitionManager.java | 2 +-
.../db/pipe/consensus/DeletionRecoverTest.java | 6 +-
.../db/pipe/consensus/DeletionResourceTest.java | 12 ++--
.../db/pipe/source/PipeRealtimeExtractTest.java | 32 +++------
22 files changed, 174 insertions(+), 192 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 96c519ce636..91bccc6cbda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// to trigger the general event transfer function, causing potentially such
as
// the random delay of the batch transmission. Therefore, here we inject
cron events
// when no event can be pulled.
- public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
- new PipeHeartbeatEvent("cron", false);
+ public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new
PipeHeartbeatEvent(-1, false);
public PipeSinkSubtask(
final String taskID,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 3bf5c1287b7..520b57a229f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource
{
private volatile Exception cause;
public DeletionResource(
- AbstractDeleteDataNode deleteDataNode,
- Consumer<DeletionResource> removeHook,
- String regionId) {
+ AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource>
removeHook, int regionId) {
this.deleteDataNode = deleteDataNode;
this.removeHook = removeHook;
this.currentStatus = Status.RUNNING;
this.consensusGroupId =
- ConsensusGroupId.Factory.create(
- TConsensusGroupType.DataRegion.getValue(),
Integer.parseInt(regionId));
+
ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(),
regionId);
this.pipeTaskReferenceCount =
new AtomicInteger(
DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1);
@@ -151,7 +148,7 @@ public class DeletionResource implements PersistentResource
{
}
public static DeletionResource deserialize(
- final ByteBuffer buffer, final String regionId, final
Consumer<DeletionResource> removeHook)
+ final ByteBuffer buffer, final int regionId, final
Consumer<DeletionResource> removeHook)
throws IOException {
AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
return new DeletionResource(node, removeHook, regionId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
index ff868ffd445..8e1dc8bb470 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
@@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable
{
String.format(
"^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
- private final String dataRegionId;
+ private final int dataRegionId;
private final DeletionBuffer deletionBuffer;
private final File storageDir;
private final Map<AbstractDeleteDataNode, DeletionResource>
deleteNode2ResourcesMap =
@@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable
{
private final Condition recoveryReadyCondition = recoverLock.newCondition();
private volatile boolean hasCompletedRecovery = false;
- private DeletionResourceManager(String dataRegionId) throws IOException {
+ private DeletionResourceManager(int dataRegionId) throws IOException {
this.dataRegionId = dataRegionId;
this.storageDir =
new File(
@@ -269,23 +269,23 @@ public class DeletionResourceManager implements
AutoCloseable {
//////////////////////////// singleton ////////////////////////////
private static class DeletionResourceManagerHolder {
- private static Map<String, DeletionResourceManager>
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+ private static Map<Integer, DeletionResourceManager>
CONSENSUS_GROUP_ID_2_INSTANCE_MAP;
private DeletionResourceManagerHolder() {}
public static void build() {
- if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
- CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+ if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
+ CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
}
}
}
- public static DeletionResourceManager getInstance(String groupId) {
+ public static DeletionResourceManager getInstance(int groupId) {
// If consensusImpl is not PipeConsensus.
- if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP ==
null) {
+ if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP ==
null) {
return null;
}
- return
DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+ return
DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
groupId,
key -> {
try {
@@ -305,10 +305,10 @@ public class DeletionResourceManager implements
AutoCloseable {
}
public static void exit() {
- if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP ==
null) {
+ if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP ==
null) {
return;
}
- DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.forEach(
+ DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.forEach(
(groupId, resourceManager) -> {
resourceManager.close();
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index f401cbbc76a..b77105720ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
? 0
: (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1
: -1));
// Data region id
- private final String dataRegionId;
+ private final int dataRegionId;
// directory to store .deletion files
private final String baseDirectory;
// single thread to serialize WALEntry to workingBuffer
@@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
// maxProgressIndex of each batch increases in the same order as the
physical time.
private ProgressIndex maxProgressIndexInCurrentFile =
MinimumProgressIndex.INSTANCE;
- public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
+ public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) {
this.dataRegionId = dataRegionId;
this.baseDirectory = baseDirectory;
allocateBuffers();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
index 943693e7b00..1fa00cd0a0e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java
@@ -40,13 +40,13 @@ public class DeletionReader implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(DeletionReader.class);
private static final int MAGIC_STRING_BYTES_SIZE =
DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length;
- private final String regionId;
+ private final int regionId;
private final Consumer<DeletionResource> removeHook;
private final File logFile;
private final FileInputStream fileInputStream;
private final FileChannel fileChannel;
- public DeletionReader(File logFile, String regionId,
Consumer<DeletionResource> removeHook)
+ public DeletionReader(File logFile, int regionId, Consumer<DeletionResource>
removeHook)
throws IOException {
this.logFile = logFile;
this.regionId = regionId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 468292b8ecc..8ed7317d2bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
- private final String dataRegionId;
+ private final int dataRegionId;
private long timePublished;
private long timeAssigned;
@@ -52,17 +52,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// The disruptor is usually nearly empty.
private int disruptorSize;
- private int extractorQueueTabletSize;
- private int extractorQueueTsFileSize;
- private int extractorQueueSize;
+ private int sourceQueueTabletSize;
+ private int sourceQueueTsFileSize;
+ private int sourceQueueSize;
- private int connectorQueueTabletSize;
- private int connectorQueueTsFileSize;
- private int connectorQueueSize;
+ private int sinkQueueTabletSize;
+ private int sinkQueueTsFileSize;
+ private int sinkQueueSize;
private final boolean shouldPrintMessage;
- public PipeHeartbeatEvent(final String dataRegionId, final boolean
shouldPrintMessage) {
+ public PipeHeartbeatEvent(final int dataRegionId, final boolean
shouldPrintMessage) {
super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE,
Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
@@ -72,7 +72,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
- final String dataRegionId,
+ final int dataRegionId,
final long timePublished,
final boolean shouldPrintMessage) {
super(
@@ -104,7 +104,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
- // not the event copied and passed to the extractor
+ // not the event copied and passed to the source
if (Objects.nonNull(pipeName)) {
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName, creationTime);
@@ -208,17 +208,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public void recordExtractorQueueSize(final
UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
- extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
- extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
- extractorQueueSize = pendingQueue.size();
+ sourceQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+ sourceQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
+ sourceQueueSize = pendingQueue.size();
}
}
public void recordConnectorQueueSize(final
UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
- connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
- connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
- connectorQueueSize = pendingQueue.size();
+ sinkQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+ sinkQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
+ sinkQueueSize = pendingQueue.size();
}
}
@@ -259,19 +259,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
final String disruptorSizeMessage = Integer.toString(disruptorSize);
- final String extractorQueueTabletSizeMessage =
- timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) :
unknownMessage;
- final String extractorQueueTsFileSizeMessage =
- timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) :
unknownMessage;
- final String extractorQueueSizeMessage =
- timeAssigned != 0 ? Integer.toString(extractorQueueSize) :
unknownMessage;
+ final String sourceQueueTabletSizeMessage =
+ timeAssigned != 0 ? Integer.toString(sourceQueueTabletSize) :
unknownMessage;
+ final String sourceQueueTsFileSizeMessage =
+ timeAssigned != 0 ? Integer.toString(sourceQueueTsFileSize) :
unknownMessage;
+ final String sourceQueueSizeMessage =
+ timeAssigned != 0 ? Integer.toString(sourceQueueSize) : unknownMessage;
- final String connectorQueueTabletSizeMessage =
- timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) :
unknownMessage;
- final String connectorQueueTsFileSizeMessage =
- timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) :
unknownMessage;
- final String connectorQueueSizeMessage =
- timeProcessed != 0 ? Integer.toString(connectorQueueSize) :
unknownMessage;
+ final String sinkQueueTabletSizeMessage =
+ timeProcessed != 0 ? Integer.toString(sinkQueueTabletSize) :
unknownMessage;
+ final String sinkQueueTsFileSizeMessage =
+ timeProcessed != 0 ? Integer.toString(sinkQueueTsFileSize) :
unknownMessage;
+ final String sinkQueueSizeMessage =
+ timeProcessed != 0 ? Integer.toString(sinkQueueSize) : unknownMessage;
return "PipeHeartbeatEvent{"
+ "pipeName='"
@@ -290,18 +290,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
+ totalTimeMessage
+ ", disruptorSize="
+ disruptorSizeMessage
- + ", extractorQueueTabletSize="
- + extractorQueueTabletSizeMessage
- + ", extractorQueueTsFileSize="
- + extractorQueueTsFileSizeMessage
- + ", extractorQueueSize="
- + extractorQueueSizeMessage
- + ", connectorQueueTabletSize="
- + connectorQueueTabletSizeMessage
- + ", connectorQueueTsFileSize="
- + connectorQueueTsFileSizeMessage
- + ", connectorQueueSize="
- + connectorQueueSizeMessage
+ + ", sourceQueueTabletSize="
+ + sourceQueueTabletSizeMessage
+ + ", sourceQueueTsFileSize="
+ + sourceQueueTsFileSizeMessage
+ + ", sourceQueueSize="
+ + sourceQueueSizeMessage
+ + ", sinkQueueTabletSize="
+ + sinkQueueTabletSizeMessage
+ + ", sinkQueueTsFileSize="
+ + sinkQueueTsFileSizeMessage
+ + ", sinkQueueSize="
+ + sinkQueueSizeMessage
+ "}";
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index bf3f2c97acb..cdcb8734503 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent {
- private final String dataRegionId;
+ private final int dataRegionId;
private final Set<String> originFilePaths;
private final List<Long> commitIds;
@@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent
anyOfOriginalEvents.getStartTime(),
anyOfOriginalEvents.getEndTime());
- this.dataRegionId = String.valueOf(committerKey.getRegionId());
+ this.dataRegionId = committerKey.getRegionId();
this.originFilePaths =
originalEvents.stream()
.map(PipeTsFileInsertionEvent::getTsFile)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 6b3e505d2ea..2904e3e37cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -400,7 +400,8 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(resource.getDataRegionId(), pipeName,
resource.getTsFilePath());
+ .eliminateProgressIndex(
+ Integer.parseInt(resource.getDataRegionId()), pipeName,
resource.getTsFilePath());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index acc62f7e7a4..540589326ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -57,7 +57,7 @@ public class PipeRealtimeEventFactory {
}
public static PipeRealtimeEvent createRealtimeEvent(
- final String dataRegionId, final boolean shouldPrintMessage) {
+ final int dataRegionId, final boolean shouldPrintMessage) {
return new PipeRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
index 7f89cdc5372..59e4e892d8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java
@@ -41,7 +41,7 @@ public class PipeAssignerMetrics implements IMetricSet {
private AbstractMetricService metricService;
- private final Map<String, PipeDataRegionAssigner> assignerMap = new
HashMap<>();
+ private final Map<Integer, PipeDataRegionAssigner> assignerMap = new
HashMap<>();
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@@ -49,44 +49,44 @@ public class PipeAssignerMetrics implements IMetricSet {
public void bindTo(AbstractMetricService metricService) {
this.metricService = metricService;
synchronized (this) {
- for (String dataRegionId : assignerMap.keySet()) {
+ for (int dataRegionId : assignerMap.keySet()) {
createMetrics(dataRegionId);
}
}
}
- private void createMetrics(String dataRegionId) {
+ private void createMetrics(int dataRegionId) {
createAutoGauge(dataRegionId);
}
- private void createAutoGauge(String dataRegionId) {
+ private void createAutoGauge(int dataRegionId) {
metricService.createAutoGauge(
Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
MetricLevel.IMPORTANT,
assignerMap.get(dataRegionId),
PipeDataRegionAssigner::getPipeHeartbeatEventCount,
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
metricService.createAutoGauge(
Metric.UNASSIGNED_TABLET_COUNT.toString(),
MetricLevel.IMPORTANT,
assignerMap.get(dataRegionId),
PipeDataRegionAssigner::getTabletInsertionEventCount,
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
metricService.createAutoGauge(
Metric.UNASSIGNED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
assignerMap.get(dataRegionId),
PipeDataRegionAssigner::getTsFileInsertionEventCount,
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
}
@Override
public void unbindFrom(AbstractMetricService metricService) {
- ImmutableSet<String> dataRegionIds =
ImmutableSet.copyOf(assignerMap.keySet());
- for (String dataRegionId : dataRegionIds) {
+ ImmutableSet<Integer> dataRegionIds =
ImmutableSet.copyOf(assignerMap.keySet());
+ for (int dataRegionId : dataRegionIds) {
deregister(dataRegionId);
}
if (!assignerMap.isEmpty()) {
@@ -94,32 +94,32 @@ public class PipeAssignerMetrics implements IMetricSet {
}
}
- private void removeMetrics(String dataRegionId) {
+ private void removeMetrics(int dataRegionId) {
removeAutoGauge(dataRegionId);
}
- private void removeAutoGauge(String dataRegionId) {
+ private void removeAutoGauge(int dataRegionId) {
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNASSIGNED_TABLET_COUNT.toString(),
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNASSIGNED_TSFILE_COUNT.toString(),
Tag.REGION.toString(),
- dataRegionId);
+ Integer.toString(dataRegionId));
}
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
public void register(PipeDataRegionAssigner pipeDataRegionAssigner) {
- String dataRegionId = pipeDataRegionAssigner.getDataRegionId();
+ int dataRegionId = pipeDataRegionAssigner.getDataRegionId();
synchronized (this) {
assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner);
if (Objects.nonNull(metricService)) {
@@ -128,7 +128,7 @@ public class PipeAssignerMetrics implements IMetricSet {
}
}
- public void deregister(String dataRegionId) {
+ public void deregister(final int dataRegionId) {
synchronized (this) {
if (!assignerMap.containsKey(dataRegionId)) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index ab74d094740..8aac47b01c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -498,7 +498,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
extractTsFiles(dataRegion, startHistoricalExtractionTime,
originalResourceList);
}
if (shouldExtractDeletion) {
-
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+ Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
.ifPresent(manager -> extractDeletions(manager,
originalResourceList));
}
@@ -946,7 +946,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
dataRegionId,
event);
} else {
-
Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId)))
+ Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
.ifPresent(
manager ->
event.setDeletionResource(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index cd00e2975a0..de0ee264473 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements
PipeExtractor {
protected String pipeName;
protected long creationTime;
- protected String dataRegionId;
+ protected int dataRegionId = -1;
protected PipeTaskMeta pipeTaskMeta;
protected boolean shouldExtractInsertion;
@@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
pipeName = environment.getPipeName();
- dataRegionId = String.valueOf(environment.getRegionId());
+ dataRegionId = environment.getRegionId();
pipeTaskMeta = environment.getPipeTaskMeta();
// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics.
These metrics are
@@ -317,7 +317,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
@Override
public void close() throws Exception {
- if (Objects.nonNull(dataRegionId)) {
+ if (dataRegionId >= 0) {
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId,
this);
PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
}
@@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
return skipIfNoPrivileges;
}
- public final String getDataRegionId() {
+ public final int getDataRegionId() {
return dataRegionId;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8ea973fbf0a..0f4193af7a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -58,32 +58,32 @@ public class PipeDataRegionAssigner implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
/**
- * The {@link PipeDataRegionMatcher} is used to match the event with the
extractor based on the
+ * The {@link PipeDataRegionMatcher} is used to match the event with the
source based on the
* pattern.
*/
private final PipeDataRegionMatcher matcher;
- /** The {@link DisruptorQueue} is used to assign the event to the extractor.
*/
+ /** The {@link DisruptorQueue} is used to assign the event to the source. */
private final DisruptorQueue disruptor;
- private final String dataRegionId;
+ private final int dataRegionId;
private Boolean isTableModel;
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
- public String getDataRegionId() {
+ public int getDataRegionId() {
return dataRegionId;
}
- public PipeDataRegionAssigner(final String dataRegionId) {
+ public PipeDataRegionAssigner(final int dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor = new DisruptorQueue(this::assignToExtractor,
this::onAssignedHook);
+ this.disruptor = new DisruptorQueue(this::assignToSource,
this::onAssignedHook);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
final DataRegion dataRegion =
- StorageEngine.getInstance().getDataRegion(new
DataRegionId(Integer.parseInt(dataRegionId)));
+ StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
if (Objects.nonNull(dataRegion)) {
final String databaseName = dataRegion.getDatabaseName();
if (Objects.nonNull(databaseName)) {
@@ -128,7 +128,7 @@ public class PipeDataRegionAssigner implements Closeable {
eventCounter.decreaseEventCount(innerEvent);
}
- private void assignToExtractor(
+ private void assignToSource(
final PipeRealtimeEvent event, final long sequence, final boolean
endOfBatch) {
if (disruptor.isClosed()) {
return;
@@ -140,17 +140,15 @@ public class PipeDataRegionAssigner implements Closeable {
matchedAndUnmatched
.getLeft()
.forEach(
- extractor -> {
+ source -> {
if (disruptor.isClosed()) {
return;
}
- if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
+ if (event.getEvent().isGeneratedByPipe() &&
!source.isForwardingPipeRequests()) {
final ProgressReportEvent reportEvent =
new ProgressReportEvent(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta());
+ source.getPipeName(), source.getCreationTime(),
source.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -158,33 +156,33 @@ public class PipeDataRegionAssigner implements Closeable {
reportEvent);
return;
}
-
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
return;
}
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getTreePattern(),
- extractor.getTablePattern(),
- String.valueOf(extractor.getUserId()),
- extractor.getUserName(),
- extractor.getCliHostname(),
- extractor.isSkipIfNoPrivileges(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ source.getPipeName(),
+ source.getCreationTime(),
+ source.getPipeTaskMeta(),
+ source.getTreePattern(),
+ source.getTablePattern(),
+ String.valueOf(source.getUserId()),
+ source.getUserName(),
+ source.getCliHostname(),
+ source.isSkipIfNoPrivileges(),
+ source.getRealtimeDataExtractionStartTime(),
+ source.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
// if using IoTV2, assign a replicateIndex for this realtime
event
if (DataRegionConsensusImpl.getInstance() instanceof
PipeConsensus
&& PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
innerEvent.setReplicateIndexForIoTV2(
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
- extractor.getPipeName()));
+ source.getPipeName()));
LOGGER.debug(
"[{}]Set {} for realtime event {}",
- extractor.getPipeName(),
+ source.getPipeName(),
innerEvent.getReplicateIndexForIoTV2(),
innerEvent);
}
@@ -192,15 +190,14 @@ public class PipeDataRegionAssigner implements Closeable {
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) innerEvent;
- tsFileInsertionEvent.disableMod4NonTransferPipes(
- extractor.isShouldTransferModFile());
+
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
}
if (innerEvent instanceof PipeDeleteDataNodeEvent) {
final PipeDeleteDataNodeEvent deleteDataNodeEvent =
(PipeDeleteDataNodeEvent) innerEvent;
final DeletionResourceManager manager =
-
DeletionResourceManager.getInstance(extractor.getDataRegionId());
+
DeletionResourceManager.getInstance(source.getDataRegionId());
// increase deletion resource's reference and bind real
deleteEvent
if (Objects.nonNull(manager)
&& DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(
@@ -217,13 +214,13 @@ public class PipeDataRegionAssigner implements Closeable {
copiedEvent);
return;
}
- extractor.extract(copiedEvent);
+ source.extract(copiedEvent);
});
matchedAndUnmatched
.getRight()
.forEach(
- extractor -> {
+ source -> {
if (disruptor.isClosed()) {
return;
}
@@ -233,9 +230,7 @@ public class PipeDataRegionAssigner implements Closeable {
|| innerEvent instanceof TsFileInsertionEvent) {
final ProgressReportEvent reportEvent =
new ProgressReportEvent(
- extractor.getPipeName(),
- extractor.getCreationTime(),
- extractor.getPipeTaskMeta());
+ source.getPipeName(), source.getCreationTime(),
source.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -243,17 +238,17 @@ public class PipeDataRegionAssigner implements Closeable {
reportEvent);
return;
}
-
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+
source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
}
});
}
- public void startAssignTo(final PipeRealtimeDataRegionSource extractor) {
- matcher.register(extractor);
+ public void startAssignTo(final PipeRealtimeDataRegionSource source) {
+ matcher.register(source);
}
- public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
- matcher.deregister(extractor);
+ public void stopAssignTo(final PipeRealtimeDataRegionSource source) {
+ matcher.deregister(source);
}
public void invalidateCache() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..bf15dcdc547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -32,11 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTsFileEpochProgressIndexKeeper {
// data region id -> pipeName -> tsFile path -> max progress index
- private final Map<String, Map<String, Map<String, TsFileResource>>>
progressIndexKeeper =
+ private final Map<Integer, Map<String, Map<String, TsFileResource>>>
progressIndexKeeper =
new ConcurrentHashMap<>();
public synchronized void registerProgressIndex(
- final String dataRegionId, final String pipeName, final TsFileResource
resource) {
+ final int dataRegionId, final String pipeName, final TsFileResource
resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -44,7 +44,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
}
public synchronized void eliminateProgressIndex(
- final String dataRegionId, final @Nonnull String pipeName, final String
filePath) {
+ final int dataRegionId, final @Nonnull String pipeName, final String
filePath) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -52,7 +52,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
}
public synchronized boolean isProgressIndexAfterOrEquals(
- final String dataRegionId,
+ final int dataRegionId,
final String pipeName,
final String tsFilePath,
final ProgressIndex progressIndex) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 8c49dd0a3dd..3cce521a51e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* will filter events and assign them to different
PipeRealtimeEventDataRegionExtractors.
*/
public class PipeInsertionDataNodeListener {
- private final ConcurrentMap<String, PipeDataRegionAssigner>
dataRegionId2Assigner =
+ private final ConcurrentMap<Integer, PipeDataRegionAssigner>
dataRegionId2Assigner =
new ConcurrentHashMap<>();
private final AtomicInteger listenToTsFileExtractorCount = new
AtomicInteger(0);
@@ -57,7 +57,7 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
- final String dataRegionId, final PipeRealtimeDataRegionSource extractor)
{
+ final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new
PipeDataRegionAssigner(dataRegionId))
.startAssignTo(extractor);
@@ -71,7 +71,7 @@ public class PipeInsertionDataNodeListener {
}
public synchronized void stopListenAndAssign(
- final String dataRegionId, final PipeRealtimeDataRegionSource extractor)
{
+ final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
if (assigner == null) {
return;
@@ -97,7 +97,7 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
public void listenToTsFile(
- final String dataRegionId,
+ final int dataRegionId,
final String databaseName,
final TsFileResource tsFileResource,
final boolean isLoaded) {
@@ -118,7 +118,7 @@ public class PipeInsertionDataNodeListener {
}
public void listenToInsertNode(
- final String dataRegionId,
+ final int dataRegionId,
final String databaseName,
final InsertNode insertNode,
final TsFileResource tsFileResource) {
@@ -139,7 +139,7 @@ public class PipeInsertionDataNodeListener {
}
public DeletionResource listenToDeleteData(
- final String regionId, final AbstractDeleteDataNode node) {
+ final int regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
if (assigner == null
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
index c5e98cd1b2c..7c674a3bd1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
@@ -30,19 +30,18 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTimePartitionListener {
- private final Map<String, Map<String, PipeRealtimeDataRegionSource>>
dataRegionId2Extractors =
+ private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>>
dataRegionId2Sources =
new ConcurrentHashMap<>();
// This variable is used to record the upper and lower bounds that each data
region's time
// partition ID has ever reached.
- private final Map<String, Pair<Long, Long>>
dataRegionId2TimePartitionIdBound =
+ private final Map<Integer, Pair<Long, Long>>
dataRegionId2TimePartitionIdBound =
new ConcurrentHashMap<>();
//////////////////////////// start & stop ////////////////////////////
- public synchronized void startListen(
- String dataRegionId, PipeRealtimeDataRegionSource extractor) {
- dataRegionId2Extractors
+ public synchronized void startListen(int dataRegionId,
PipeRealtimeDataRegionSource extractor) {
+ dataRegionId2Sources
.computeIfAbsent(dataRegionId, o -> new HashMap<>())
.put(extractor.getTaskID(), extractor);
// Assign the previously recorded upper and lower bounds of time partition
to the extractor that
@@ -53,22 +52,21 @@ public class PipeTimePartitionListener {
}
}
- public synchronized void stopListen(String dataRegionId,
PipeRealtimeDataRegionSource extractor) {
- Map<String, PipeRealtimeDataRegionSource> extractors =
- dataRegionId2Extractors.get(dataRegionId);
+ public synchronized void stopListen(int dataRegionId,
PipeRealtimeDataRegionSource extractor) {
+ Map<String, PipeRealtimeDataRegionSource> extractors =
dataRegionId2Sources.get(dataRegionId);
if (Objects.isNull(extractors)) {
return;
}
extractors.remove(extractor.getTaskID());
if (extractors.isEmpty()) {
- dataRegionId2Extractors.remove(dataRegionId);
+ dataRegionId2Sources.remove(dataRegionId);
}
}
//////////////////////////// listen to changes ////////////////////////////
public synchronized void listenToTimePartitionGrow(
- String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
+ int dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
boolean shouldBroadcastTimePartitionChange = false;
Pair<Long, Long> oldTimePartitionIdBound =
dataRegionId2TimePartitionIdBound.get(dataRegionId);
@@ -86,8 +84,7 @@ public class PipeTimePartitionListener {
}
if (shouldBroadcastTimePartitionChange) {
- Map<String, PipeRealtimeDataRegionSource> extractors =
- dataRegionId2Extractors.get(dataRegionId);
+ Map<String, PipeRealtimeDataRegionSource> extractors =
dataRegionId2Sources.get(dataRegionId);
if (Objects.isNull(extractors)) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index ef4a4910e1b..20c78fd0b8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2019,7 +2019,7 @@ public class DataRegion implements IDataRegionForQuery {
}
public void deleteDALFolderAndClose() {
-
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString))
+
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId.getId()))
.ifPresent(
manager -> {
manager.close();
@@ -2833,7 +2833,8 @@ public class DataRegion implements IDataRegionForQuery {
deleteDataInUnsealedFiles(unsealedTsFileResource, deletion,
sealedTsFileResource);
// capture deleteDataNode and wait it to be persisted to DAL.
DeletionResource deletionResource =
-
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
node);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
// just get result. We have already waited for result in
`listenToDeleteData`
if (deletionResource != null && deletionResource.waitForResult() ==
Status.FAILURE) {
throw deletionResource.getCause();
@@ -2936,7 +2937,8 @@ public class DataRegion implements IDataRegionForQuery {
// capture deleteDataNode and wait it to be persisted to DAL.
DeletionResource deletionResource =
-
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
node);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
// just get result. We have already waited for result in
`listenToDeleteData`
if (deletionResource != null && deletionResource.waitForResult() ==
Status.FAILURE) {
throw deletionResource.getCause();
@@ -2993,7 +2995,8 @@ public class DataRegion implements IDataRegionForQuery {
deleteDataDirectlyInFile(unsealedTsFileResource, deletion);
// capture deleteDataNode and wait it to be persisted to DAL.
DeletionResource deletionResource =
-
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString,
node);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
// just get result. We have already waited for result in
`listenToDeleteData`
if (deletionResource != null && deletionResource.waitForResult() ==
Status.FAILURE) {
throw deletionResource.getCause();
@@ -4164,7 +4167,7 @@ public class DataRegion implements IDataRegionForQuery {
// Listen before the tsFile is added into tsFile manager to avoid it being
compacted
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionIdString, databaseName, tsFileResource,
true);
+ .listenToTsFile(dataRegionId.getId(), databaseName, tsFileResource,
true);
tsFileManager.add(tsFileResource, false);
@@ -4343,6 +4346,10 @@ public class DataRegion implements IDataRegionForQuery {
return dataRegionIdString;
}
+ public int getDataRegionId() {
+ return dataRegionId.getId();
+ }
+
/**
* Get the storageGroupPath with dataRegionId.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 78b33c02c0b..518df5322a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -342,7 +342,7 @@ public class TsFileProcessor {
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionIdString(),
+ dataRegionInfo.getDataRegion().getDataRegionId(),
dataRegionInfo.getDataRegion().getDatabaseName(),
insertRowNode,
tsFileResource);
@@ -439,7 +439,7 @@ public class TsFileProcessor {
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionIdString(),
+ dataRegionInfo.getDataRegion().getDataRegionId(),
dataRegionInfo.getDataRegion().getDatabaseName(),
insertRowsNode,
tsFileResource);
@@ -612,7 +612,7 @@ public class TsFileProcessor {
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionIdString(),
+ dataRegionInfo.getDataRegion().getDataRegionId(),
dataRegionInfo.getDataRegion().getDatabaseName(),
insertTabletNode,
tsFileResource);
@@ -1752,7 +1752,7 @@ public class TsFileProcessor {
// before resource serialization to avoid missing hardlink after restart
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(
- dataRegionInfo.getDataRegion().getDataRegionIdString(),
+ dataRegionInfo.getDataRegion().getDataRegionId(),
dataRegionInfo.getDataRegion().getDatabaseName(),
tsFileResource,
false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 2e1161977f8..5ded82a36ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -67,7 +67,7 @@ public class TimePartitionManager {
// PipeInsertionDataNodeListener.listenToInsertNode.
PipeTimePartitionListener.getInstance()
.listenToTimePartitionGrow(
- String.valueOf(timePartitionInfo.dataRegionId.getId()),
+ timePartitionInfo.dataRegionId.getId(),
new Pair<>(
timePartitionInfoMapForRegion.firstKey(),
timePartitionInfoMapForRegion.lastKey()));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
index 06f823c0e23..b69f2d85ef7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
@@ -42,7 +42,7 @@ import java.io.File;
import java.util.Collections;
public class DeletionRecoverTest {
- private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"};
+ private static final int[] FAKE_DATA_REGION_IDS = {2, 3};
private static final int THIS_DATANODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private static final String DELETION_BASE_DIR =
@@ -50,7 +50,7 @@ public class DeletionRecoverTest {
private final int deletionCount = 10;
private DeletionResourceManager deletionResourceManager;
- public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws
Exception {
+ public void setUp(boolean isRelational, int FAKE_DATA_REGION_ID) throws
Exception {
File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
if (baseDir.exists()) {
FileUtils.deleteFileOrDirectory(baseDir);
@@ -84,7 +84,7 @@ public class DeletionRecoverTest {
@After
public void tearDown() throws Exception {
- for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+ for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
if (baseDir.exists()) {
FileUtils.deleteFileOrDirectory(baseDir);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index f94d909f94b..8b4ace5ec22 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -66,7 +66,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DeletionResourceTest {
- private static final String[] FAKE_DATA_REGION_IDS = {"2", "3", "4", "5",
"6"};
+ private static final int[] FAKE_DATA_REGION_IDS = {2, 3, 4, 5, 6};
private static final String DELETION_BASE_DIR =
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
private static final int THIS_DATANODE_ID = 0;
@@ -84,7 +84,7 @@ public class DeletionResourceTest {
@After
public void tearDown() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(previousDataNodeId);
- for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+ for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
if (baseDir.exists()) {
FileUtils.deleteFileOrDirectory(baseDir);
@@ -212,7 +212,7 @@ public class DeletionResourceTest {
new PipeDeleteDataNodeEvent(
deleteDataNode, "Test", 10, null, null, null, null, null, null,
true, true);
deletionEvent.setCommitterKeyAndCommitId(
- new CommitterKey("Test", 10,
Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1);
+ new CommitterKey("Test", 10, FAKE_DATA_REGION_IDS[3], 0), i + 1);
deletionEvents.add(deletionEvent);
final DeletionResource deletionResource =
@@ -227,8 +227,7 @@ public class DeletionResourceTest {
// for event commit to invoke onCommit() to removeDAL
if (initialIndex == 0) {
- PipeEventCommitManager.getInstance()
- .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]),
"Test");
+ PipeEventCommitManager.getInstance().register("Test", 10,
FAKE_DATA_REGION_IDS[3], "Test");
}
deletionEvents.forEach(deletionEvent ->
deletionEvent.increaseReferenceCount("test"));
final List<Path> paths =
@@ -265,8 +264,7 @@ public class DeletionResourceTest {
});
final PipeTaskRuntimeConfiguration configuration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskSourceRuntimeEnvironment(
- "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null));
+ new PipeTaskSourceRuntimeEnvironment("1", 1,
FAKE_DATA_REGION_IDS[4], null));
extractor.customize(parameters, configuration);
Assert.assertTrue(extractor.shouldExtractDeletion());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 59aec4b1f7b..9e07e91e983 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -70,8 +70,8 @@ public class PipeRealtimeExtractTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
- private final String dataRegion1 = "1";
- private final String dataRegion2 = "2";
+ private final int dataRegion1 = 1;
+ private final int dataRegion2 = 2;
private final String pattern1 = "root.sg.d";
private final String pattern2 = "root.sg.d.a";
private final String[] device = new String[] {"root", "sg", "d"};
@@ -151,31 +151,19 @@ public class PipeRealtimeExtractTest {
final PipeTaskRuntimeConfiguration configuration0 =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
- "1",
- 1,
- Integer.parseInt(dataRegion1),
- new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+ "1", 1, dataRegion1, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration1 =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
- "1",
- 1,
- Integer.parseInt(dataRegion1),
- new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+ "1", 1, dataRegion1, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration2 =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
- "1",
- 1,
- Integer.parseInt(dataRegion2),
- new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+ "1", 1, dataRegion2, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration3 =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
- "1",
- 1,
- Integer.parseInt(dataRegion2),
- new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
+ "1", 1, dataRegion2, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
// Some parameters of extractor are validated and initialized during the
validation process.
extractor0.validate(new PipeParameterValidator(parameters0));
@@ -274,7 +262,7 @@ public class PipeRealtimeExtractTest {
}
private Future<?> write2DataRegion(
- final int writeNum, final String dataRegionId, final int startNum) {
+ final int writeNum, final int dataRegionId, final int startNum) {
final File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegionId +
File.separator + "0");
final boolean ignored = dataRegionDir.mkdirs();
@@ -305,7 +293,7 @@ public class PipeRealtimeExtractTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
- dataRegionId,
+ Integer.toString(dataRegionId),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
@@ -319,7 +307,7 @@ public class PipeRealtimeExtractTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
- dataRegionId,
+ Integer.toString(dataRegionId),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
@@ -331,7 +319,7 @@ public class PipeRealtimeExtractTest {
false),
resource);
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionId, dataRegionId, resource, false);
+ .listenToTsFile(dataRegionId, Integer.toString(dataRegionId),
resource, false);
}
});
}