This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-flush by this push:
new 6ee24e24dff refactor
6ee24e24dff is described below
commit 6ee24e24dfffbb3e82b078fc2744555d0155798f
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 13 16:05:53 2026 +0800
refactor
---
.../tsfile/PipeCompactedTsFileInsertionEvent.java | 8 ++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 7 +--
.../PipeRealtimeDataRegionHybridSource.java | 10 ++---
.../realtime/PipeRealtimeDataRegionSource.java | 10 ++---
.../PipeRealtimeDataRegionTsFileSource.java | 6 +--
...peTsFileEpochProgressIndexAndFlushManager.java} | 46 ++++++++++++++------
.../listener/PipeTimePartitionListener.java | 50 ++++++++++++----------
.../rescon/memory/TimePartitionManager.java | 2 +-
8 files changed, 82 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index bf3f2c97acb..078ce718488 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent {
- private final String dataRegionId;
+ private final int dataRegionId;
private final Set<String> originFilePaths;
private final List<Long> commitIds;
@@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent
anyOfOriginalEvents.getStartTime(),
anyOfOriginalEvents.getEndTime());
- this.dataRegionId = String.valueOf(committerKey.getRegionId());
+ this.dataRegionId = committerKey.getRegionId();
this.originFilePaths =
originalEvents.stream()
.map(PipeTsFileInsertionEvent::getTsFile)
@@ -186,7 +186,7 @@ public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex)) {
for (final String originFilePath : originFilePaths) {
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 6b3e505d2ea..7b07e084a7a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -45,7 +45,7 @@ import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
-import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -399,8 +399,9 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
- PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(resource.getDataRegionId(), pipeName,
resource.getTsFilePath());
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
+ .eliminateProgressIndex(
+ Integer.parseInt(resource.getDataRegionId()), pipeName,
resource.getTsFilePath());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 64fd0846465..f3457ac79f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -30,7 +30,7 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -82,7 +82,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState ->
TsFileEpoch.State.USING_TSFILE);
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
} else {
event
@@ -169,14 +169,14 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
switch (state) {
case USING_TABLET:
// If the state is USING_TABLET, discard the event
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
return;
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.markAsExtracted(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
@@ -312,7 +312,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
return null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index cd00e2975a0..3dec073f057 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -39,7 +39,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
-import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeTimePartitionListener;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements
PipeExtractor {
protected String pipeName;
protected long creationTime;
- protected String dataRegionId;
+ protected int dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
protected boolean shouldExtractInsertion;
@@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight();
pipeName = environment.getPipeName();
- dataRegionId = String.valueOf(environment.getRegionId());
+ dataRegionId = environment.getRegionId();
pipeTaskMeta = environment.getPipeTaskMeta();
// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics.
These metrics are
@@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
return skipIfNoPrivileges;
}
- public final String getDataRegionId() {
+ public final int getDataRegionId() {
return dataRegionId;
}
@@ -595,7 +595,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent
event) {
- if (PipeTsFileEpochProgressIndexKeeper.getInstance()
+ if (PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.isProgressIndexAfterOrEquals(
dataRegionId,
pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 8f74bc63fb3..f6685ced073 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
}
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
@@ -116,7 +116,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
- PipeTsFileEpochProgressIndexKeeper.getInstance()
+ PipeTsFileEpochProgressIndexAndFlushManager.getInstance()
.eliminateProgressIndex(
dataRegionId, pipeName,
realtimeEvent.getTsFileEpoch().getFilePath());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
similarity index 62%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
index fbb6502ab17..27d549daf61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.utils.Pair;
@@ -31,14 +34,14 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-public class PipeTsFileEpochProgressIndexKeeper {
+public class PipeTsFileEpochProgressIndexAndFlushManager {
// data region id -> pipeName -> tsFile path -> max progress index
- private final Map<String, Map<String, Map<String, Pair<TsFileResource,
Long>>>>
+ private final Map<Integer, Map<String, Map<String, Pair<TsFileResource,
Long>>>>
progressIndexKeeper = new ConcurrentHashMap<>();
public synchronized void registerProgressIndex(
- final String dataRegionId, final String pipeName, final TsFileResource
resource) {
+ final int dataRegionId, final String pipeName, final TsFileResource
resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -46,7 +49,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
}
public synchronized void markAsExtracted(
- final String dataRegionId, final String pipeName, final String filePath)
{
+ final int dataRegionId, final String pipeName, final String filePath) {
final Pair<TsFileResource, Long> pair =
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
@@ -57,12 +60,28 @@ public class PipeTsFileEpochProgressIndexKeeper {
}
}
- public void flushAllTsFiles(final String dataRegionId) {
-
+ public void flushAllTimeoutTsFiles() {
+ progressIndexKeeper.forEach(
+ (regionId, map) ->
+ map.values()
+ .forEach(
+ fileMap ->
+ fileMap.forEach(
+ (path, pair) -> {
+ if (System.currentTimeMillis()
+ -
PipeConfig.getInstance().getPipeTsFileFlushIntervalSeconds()
+ * 1000L
+ >= pair.getRight()) {
+ StorageEngine.getInstance()
+ .getDataRegion(new DataRegionId(regionId))
+ .asyncCloseOneTsFileProcessor(
+ pair.getLeft().isSeq(),
pair.getLeft().getProcessor());
+ }
+ })));
}
public synchronized void eliminateProgressIndex(
- final String dataRegionId, final @Nonnull String pipeName, final String
filePath) {
+ final int dataRegionId, final @Nonnull String pipeName, final String
filePath) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
@@ -70,7 +89,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
}
public synchronized boolean isProgressIndexAfterOrEquals(
- final String dataRegionId,
+ final int dataRegionId,
final String pipeName,
final String tsFilePath,
final ProgressIndex progressIndex) {
@@ -89,19 +108,20 @@ public class PipeTsFileEpochProgressIndexKeeper {
private static class PipeTimePartitionProgressIndexKeeperHolder {
- private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
- new PipeTsFileEpochProgressIndexKeeper();
+ private static final PipeTsFileEpochProgressIndexAndFlushManager INSTANCE =
+ new PipeTsFileEpochProgressIndexAndFlushManager();
private PipeTimePartitionProgressIndexKeeperHolder() {
// empty constructor
}
}
- public static PipeTsFileEpochProgressIndexKeeper getInstance() {
- return
PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+ public static PipeTsFileEpochProgressIndexAndFlushManager getInstance() {
+ return
PipeTsFileEpochProgressIndexAndFlushManager.PipeTimePartitionProgressIndexKeeperHolder
+ .INSTANCE;
}
- private PipeTsFileEpochProgressIndexKeeper() {
+ private PipeTsFileEpochProgressIndexAndFlushManager() {
// empty constructor
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
index c5e98cd1b2c..d8fd295c9ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java
@@ -30,47 +30,50 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTimePartitionListener {
- private final Map<String, Map<String, PipeRealtimeDataRegionSource>>
dataRegionId2Extractors =
+ private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>>
dataRegionId2Sources =
new ConcurrentHashMap<>();
// This variable is used to record the upper and lower bounds that each data
region's time
// partition ID has ever reached.
- private final Map<String, Pair<Long, Long>>
dataRegionId2TimePartitionIdBound =
+ private final Map<Integer, Pair<Long, Long>>
dataRegionId2TimePartitionIdBound =
new ConcurrentHashMap<>();
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListen(
- String dataRegionId, PipeRealtimeDataRegionSource extractor) {
- dataRegionId2Extractors
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+ dataRegionId2Sources
.computeIfAbsent(dataRegionId, o -> new HashMap<>())
- .put(extractor.getTaskID(), extractor);
- // Assign the previously recorded upper and lower bounds of time partition
to the extractor that
+ .put(source.getTaskID(), source);
+ // Assign the previously recorded upper and lower bounds of time partition
to the source that
// has just started listening to the growth of time partition.
- Pair<Long, Long> timePartitionIdBound =
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+ final Pair<Long, Long> timePartitionIdBound =
+ dataRegionId2TimePartitionIdBound.get(dataRegionId);
if (Objects.nonNull(timePartitionIdBound)) {
- extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound);
+ source.setDataRegionTimePartitionIdBound(timePartitionIdBound);
}
}
- public synchronized void stopListen(String dataRegionId,
PipeRealtimeDataRegionSource extractor) {
- Map<String, PipeRealtimeDataRegionSource> extractors =
- dataRegionId2Extractors.get(dataRegionId);
- if (Objects.isNull(extractors)) {
+ public synchronized void stopListen(
+ final int dataRegionId, final PipeRealtimeDataRegionSource source) {
+ final Map<String, PipeRealtimeDataRegionSource> sources =
+ dataRegionId2Sources.get(dataRegionId);
+ if (Objects.isNull(sources)) {
return;
}
- extractors.remove(extractor.getTaskID());
- if (extractors.isEmpty()) {
- dataRegionId2Extractors.remove(dataRegionId);
+ sources.remove(source.getTaskID());
+ if (sources.isEmpty()) {
+ dataRegionId2Sources.remove(dataRegionId);
}
}
//////////////////////////// listen to changes ////////////////////////////
public synchronized void listenToTimePartitionGrow(
- String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) {
+ final int dataRegionId, final Pair<Long, Long> newTimePartitionIdBound) {
boolean shouldBroadcastTimePartitionChange = false;
- Pair<Long, Long> oldTimePartitionIdBound =
dataRegionId2TimePartitionIdBound.get(dataRegionId);
+ final Pair<Long, Long> oldTimePartitionIdBound =
+ dataRegionId2TimePartitionIdBound.get(dataRegionId);
if (Objects.isNull(oldTimePartitionIdBound)) {
dataRegionId2TimePartitionIdBound.put(dataRegionId,
newTimePartitionIdBound);
@@ -86,14 +89,15 @@ public class PipeTimePartitionListener {
}
if (shouldBroadcastTimePartitionChange) {
- Map<String, PipeRealtimeDataRegionSource> extractors =
- dataRegionId2Extractors.get(dataRegionId);
- if (Objects.isNull(extractors)) {
+ final Map<String, PipeRealtimeDataRegionSource> sources =
+ dataRegionId2Sources.get(dataRegionId);
+ if (Objects.isNull(sources)) {
return;
}
- Pair<Long, Long> timePartitionIdBound =
dataRegionId2TimePartitionIdBound.get(dataRegionId);
- extractors.forEach(
- (id, extractor) ->
extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound));
+ final Pair<Long, Long> timePartitionIdBound =
+ dataRegionId2TimePartitionIdBound.get(dataRegionId);
+ sources.forEach(
+ (id, source) ->
source.setDataRegionTimePartitionIdBound(timePartitionIdBound));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 2e1161977f8..5ded82a36ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -67,7 +67,7 @@ public class TimePartitionManager {
// PipeInsertionDataNodeListener.listenToInsertNode.
PipeTimePartitionListener.getInstance()
.listenToTimePartitionGrow(
- String.valueOf(timePartitionInfo.dataRegionId.getId()),
+ timePartitionInfo.dataRegionId.getId(),
new Pair<>(
timePartitionInfoMapForRegion.firstKey(),
timePartitionInfoMapForRegion.lastKey()));