This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ffe3400af9 Pipe: Optimized the hybrid switching algorithm (#15528)
2ffe3400af9 is described below
commit 2ffe3400af95d0d413168f1914a83603fe2390c3
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 21 15:33:56 2025 +0800
Pipe: Optimized the hybrid switching algorithm (#15528)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 11 +-
.../agent/task/connection/PipeEventCollector.java | 9 +-
.../statement/PipeStatementInsertionEvent.java | 4 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 2 +
.../PipeRealtimeDataRegionHybridExtractor.java | 201 ++++++++++++++-------
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 50 ++++-
.../PipeDataNodeRemainingEventAndTimeOperator.java | 41 ++++-
.../apache/iotdb/commons/conf/CommonConfig.java | 62 ++++++-
.../iotdb/commons/pipe/config/PipeConfig.java | 21 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 16 ++
.../commons/pipe/resource/log/PipeLogManager.java | 14 ++
13 files changed, 344 insertions(+), 95 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f09422fa663..8fd099baa06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -699,7 +699,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
|| mayWalSizeReachThrottleThreshold())) {
// Extractors of this pipe may be stuck and is pinning too many
MemTables.
LOGGER.warn(
- "Pipe {} needs to restart because too many memtables are pinned.
mayMemTablePinnedCountReachDangerousThreshold: {},
mayWalSizeReachThrottleThreshold: {}",
+ "Pipe {} needs to restart because too many memTables are pinned
or the WAL size is too large. mayMemTablePinnedCountReachDangerousThreshold:
{}, mayWalSizeReachThrottleThreshold: {}",
pipeMeta.getStaticMeta(),
mayMemTablePinnedCountReachDangerousThreshold(),
mayWalSizeReachThrottleThreshold());
@@ -737,10 +737,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private boolean mayMemTablePinnedCountReachDangerousThreshold() {
- return PipeDataNodeResourceManager.wal().getPinnedWalCount()
- >= 5
- * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
- * StorageEngine.getInstance().getDataRegionNumber();
+ return PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount() !=
Integer.MAX_VALUE
+ && PipeDataNodeResourceManager.wal().getPinnedWalCount()
+ >= 5
+ *
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
+ * StorageEngine.getInstance().getDataRegionNumber();
}
private boolean mayWalSizeReachThrottleThreshold() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index e385a8d1037..4e75fc9cbf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -116,10 +116,11 @@ public class PipeEventCollector implements EventCollector
{
}
private void parseAndCollectEvent(final PipeRawTabletInsertionEvent
sourceEvent) {
- collectParsedRawTableEvent(
- sourceEvent.shouldParseTimeOrPattern()
- ? sourceEvent.parseEventWithPatternOrTime()
- : sourceEvent);
+ if (sourceEvent.shouldParseTimeOrPattern()) {
+ collectParsedRawTableEvent(sourceEvent.parseEventWithPatternOrTime());
+ } else {
+ collectEvent(sourceEvent);
+ }
}
private void parseAndCollectEvent(final PipeTsFileInsertionEvent
sourceEvent) throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
index 1efe3aa2ac7..990d25eebd1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
@@ -90,7 +90,7 @@ public class PipeStatementInsertionEvent extends
PipeInsertionEvent
.forceResize(allocatedMemoryBlock, statement.ramBytesUsed() +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTabletEventCount(pipeName, creationTime);
+ .increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
}
@@ -99,7 +99,7 @@ public class PipeStatementInsertionEvent extends
PipeInsertionEvent
public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTabletEventCount(pipeName, creationTime);
+ .decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 598eeaed0da..0b8dc056fae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -202,7 +202,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTabletEventCount(pipeName, creationTime);
+ .increaseInsertNodeEventCount(pipeName, creationTime);
PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
}
return true;
@@ -238,7 +238,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
if (Objects.nonNull(pipeName)) {
PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTabletEventCount(pipeName, creationTime);
+ .decreaseInsertNodeEventCount(pipeName, creationTime);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index b2a6fc74922..5f29402bd36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -236,7 +236,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .increaseTabletEventCount(pipeName, creationTime);
+ .increaseRawTabletEventCount(pipeName, creationTime);
}
return true;
}
@@ -245,7 +245,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTabletEventCount(pipeName, creationTime);
+ .decreaseRawTabletEventCount(pipeName, creationTime);
}
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index e07385fd813..1b5f05fc8fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -125,6 +125,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+ protected String pipeID;
private String taskID;
protected String userName;
protected boolean skipIfNoPrivileges = true;
@@ -214,6 +215,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// holding a reference to IoTDBDataRegionExtractor, the taskID should be
constructed to
// match that of IoTDBDataRegionExtractor.
creationTime = environment.getCreationTime();
+ pipeID = pipeName + "_" + creationTime;
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index ebcd71626ab..52de019f741 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -43,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.function.Consumer;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
@@ -80,24 +82,24 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private void extractTabletInsertion(final PipeRealtimeEvent event) {
- TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+ TsFileEpoch.State state;
- if (state != TsFileEpoch.State.USING_TSFILE
- && state != TsFileEpoch.State.USING_BOTH
- && canNotUseTabletAnyMore(event)) {
+ if (canNotUseTabletAnyMore(event)) {
+ event.getTsFileEpoch().migrateState(this, curState ->
TsFileEpoch.State.USING_TSFILE);
+ } else {
event
.getTsFileEpoch()
.migrateState(
this,
curState -> {
switch (curState) {
- case EMPTY:
+ case USING_BOTH:
case USING_TSFILE:
- return TsFileEpoch.State.USING_TSFILE;
+ return TsFileEpoch.State.USING_BOTH;
+ case EMPTY:
case USING_TABLET:
- case USING_BOTH:
default:
- return TsFileEpoch.State.USING_BOTH;
+ return TsFileEpoch.State.USING_TABLET;
}
});
}
@@ -111,6 +113,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
case EMPTY:
case USING_TABLET:
case USING_BOTH:
+ // USING_BOTH indicates that there are discarded events previously.
+ // In this case, we need to delay the progress report to tsFile event,
to avoid losing data.
+ if (state == TsFileEpoch.State.USING_BOTH) {
+ event.skipReportOnCommit();
+ }
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
@@ -169,7 +176,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
case USING_BOTH:
default:
- return TsFileEpoch.State.USING_BOTH;
+ return canNotUseTabletAnyMore(event)
+ ? TsFileEpoch.State.USING_TSFILE
+ : TsFileEpoch.State.USING_BOTH;
}
});
@@ -208,23 +217,98 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) {
- // In the following 7 cases, we should not extract any more tablet events.
all the data
- // represented by the tablet events should be carried by the following
tsfile event:
- // 0. If the pipe task is currently restarted.
+ // In the following 4 cases, we should not extract this tablet event. all
the data
+ // represented by the tablet event should be carried by the following
tsfile event:
+ // 0. If the remaining insert event count is too large, we need to reduce
the accumulated
+ // tablets.
// 1. If Wal size > maximum size of wal buffer,
// the write operation will be throttled, so we should not extract any
more tablet events.
- // 2. The number of pinned memtables has reached the dangerous threshold.
- // 3. The number of historical tsFile events to transfer has exceeded the
limit.
- // 4. The number of realtime tsfile events to transfer has exceeded the
limit.
- // 5. The number of linked tsfiles has reached the dangerous threshold.
- // 6. The shallow memory usage of the insert node has reached the
dangerous threshold.
- return isPipeTaskCurrentlyRestarted(event)
+ // 2. The shallow memory usage of the insert node has reached the
dangerous threshold.
+ // 3. Deprecated logics (unused by default)
+ return mayRemainingInsertNodeEventExceedLimit(event)
|| mayWalSizeReachThrottleThreshold(event)
+ || mayInsertNodeMemoryReachDangerousThreshold(event)
+ || canNotUseTabletAnymoreDeprecated(event);
+ }
+
+ private boolean mayRemainingInsertNodeEventExceedLimit(final
PipeRealtimeEvent event) {
+ final boolean mayRemainingInsertEventExceedLimit =
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .mayRemainingInsertEventExceedLimit(pipeID);
+ if (mayRemainingInsertEventExceedLimit &&
event.mayExtractorUseTablets(this)) {
+ logByLogManager(
+ l ->
+ l.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore(0): remaining insert
event has reached max allowed insert event count {}",
+ pipeName,
+ dataRegionId,
+
PipeConfig.getInstance().getPipeMaxAllowedRemainingInsertEventCountPerPipe()));
+ }
+ return mayRemainingInsertEventExceedLimit;
+ }
+
+ private boolean mayWalSizeReachThrottleThreshold(final PipeRealtimeEvent
event) {
+ final boolean mayWalSizeReachThrottleThreshold =
+ 3 * WALManager.getInstance().getTotalDiskUsage()
+ > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+ if (mayWalSizeReachThrottleThreshold &&
event.mayExtractorUseTablets(this)) {
+ logByLogManager(
+ l ->
+ l.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore(1): Wal size {} has
reached throttle threshold {}",
+ pipeName,
+ dataRegionId,
+ WALManager.getInstance().getTotalDiskUsage(),
+
IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() / 3.0d));
+ }
+ return mayWalSizeReachThrottleThreshold;
+ }
+
+ private boolean mayInsertNodeMemoryReachDangerousThreshold(final
PipeRealtimeEvent event) {
+ final long floatingMemoryUsageInByte =
+ PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
+ final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
+ final long totalFloatingMemorySizeInBytes =
+
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
+ final boolean mayInsertNodeMemoryReachDangerousThreshold =
+ 3 * floatingMemoryUsageInByte * pipeCount >= 2 *
totalFloatingMemorySizeInBytes;
+ if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
+ logByLogManager(
+ l ->
+ l.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore(2): The shallow
memory usage of the insert node {} has reached the dangerous threshold {}",
+ pipeName,
+ dataRegionId,
+ floatingMemoryUsageInByte * pipeCount,
+ 2 * totalFloatingMemorySizeInBytes / 3.0d));
+ }
+ return mayInsertNodeMemoryReachDangerousThreshold;
+ }
+
+ private void logByLogManager(final Consumer<Logger> infoFunction) {
+ PipeDataNodeResourceManager.log()
+ .schedule(
+ PipeRealtimeDataRegionHybridExtractor.class, getTaskID(),
Integer.MAX_VALUE, 100, 1)
+ .ifPresent(infoFunction);
+ }
+
+ /**
+ * These judgements are deprecated, and are only reserved for manual
operation and compatibility.
+ */
+ @Deprecated
+ private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent
event) {
+ // In the following 5 cases, we should not extract any more tablet events.
all the data
+ // represented by the tablet events should be carried by the following
tsfile event:
+ // 0. If the pipe task is currently restarted.
+ // 1. The number of pinned memTables has reached the dangerous threshold.
+ // 2. The number of historical tsFile events to transfer has exceeded the
limit.
+ // 3. The number of realtime tsfile events to transfer has exceeded the
limit.
+ // 4. The number of linked tsFiles has reached the dangerous threshold.
+ return isPipeTaskCurrentlyRestarted(event)
|| mayMemTablePinnedCountReachDangerousThreshold(event)
|| isHistoricalTsFileEventCountExceededLimit(event)
|| isRealtimeTsFileEventCountExceededLimit(event)
- || mayTsFileLinkedCountReachDangerousThreshold(event)
- || mayInsertNodeMemoryReachDangerousThreshold(event);
+ || mayTsFileLinkedCountReachDangerousThreshold(event);
}
private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
@@ -236,36 +320,24 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently
restarted",
+ "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(0): Pipe task is
currently restarted",
pipeName,
dataRegionId);
}
return isPipeTaskCurrentlyRestarted;
}
- private boolean mayWalSizeReachThrottleThreshold(final PipeRealtimeEvent
event) {
- final boolean mayWalSizeReachThrottleThreshold =
- 3 * WALManager.getInstance().getTotalDiskUsage()
- > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
- if (mayWalSizeReachThrottleThreshold &&
event.mayExtractorUseTablets(this)) {
- LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached
throttle threshold {}",
- pipeName,
- dataRegionId,
- WALManager.getInstance().getTotalDiskUsage(),
- IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() /
3.0d);
- }
- return mayWalSizeReachThrottleThreshold;
- }
-
private boolean mayMemTablePinnedCountReachDangerousThreshold(final
PipeRealtimeEvent event) {
+ if (PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount() ==
Integer.MAX_VALUE) {
+ return false;
+ }
final boolean mayMemTablePinnedCountReachDangerousThreshold =
PipeDataNodeResourceManager.wal().getPinnedWalCount()
>= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()
* StorageEngine.getInstance().getDataRegionNumber();
if (mayMemTablePinnedCountReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned
memtables {} has reached the dangerous threshold {}",
+ "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1): The number of
pinned memTables {} has reached the dangerous threshold {}",
pipeName,
dataRegionId,
PipeDataNodeResourceManager.wal().getPinnedWalCount(),
@@ -276,6 +348,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean isHistoricalTsFileEventCountExceededLimit(final
PipeRealtimeEvent event) {
+ if
(PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()
+ == Integer.MAX_VALUE) {
+ return false;
+ }
final IoTDBDataRegionExtractor extractor =
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
final boolean isHistoricalTsFileEventCountExceededLimit =
@@ -284,7 +360,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>=
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
if (isHistoricalTsFileEventCountExceededLimit &&
event.mayExtractorUseTablets(this)) {
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical
tsFile events {} has exceeded the limit {}",
+ "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2): The number of
historical tsFile events {} has exceeded the limit {}",
pipeName,
dataRegionId,
extractor.getHistoricalTsFileInsertionEventCount(),
@@ -294,12 +370,16 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean isRealtimeTsFileEventCountExceededLimit(final
PipeRealtimeEvent event) {
+ if
(PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()
+ == Integer.MAX_VALUE) {
+ return false;
+ }
final boolean isRealtimeTsFileEventCountExceededLimit =
pendingQueue.getTsFileInsertionEventCount()
>=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
if (isRealtimeTsFileEventCountExceededLimit &&
event.mayExtractorUseTablets(this)) {
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime
tsFile events {} has exceeded the limit {}",
+ "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3): The number of
realtime tsFile events {} has exceeded the limit {}",
pipeName,
dataRegionId,
pendingQueue.getTsFileInsertionEventCount(),
@@ -309,12 +389,15 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean mayTsFileLinkedCountReachDangerousThreshold(final
PipeRealtimeEvent event) {
+ if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() ==
Long.MAX_VALUE) {
+ return false;
+ }
final boolean mayTsFileLinkedCountReachDangerousThreshold =
PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
if (mayTsFileLinkedCountReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked
tsfiles {} has reached the dangerous threshold {}",
+ "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(4): The number of
linked tsFiles {} has reached the dangerous threshold {}",
pipeName,
dataRegionId,
PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount(),
@@ -323,25 +406,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
return mayTsFileLinkedCountReachDangerousThreshold;
}
- private boolean mayInsertNodeMemoryReachDangerousThreshold(final
PipeRealtimeEvent event) {
- final long floatingMemoryUsageInByte =
- PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
- final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
- final long totalFloatingMemorySizeInBytes =
-
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes();
- final boolean mayInsertNodeMemoryReachDangerousThreshold =
- 3 * floatingMemoryUsageInByte * pipeCount >= 2 *
totalFloatingMemorySizeInBytes;
- if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
- LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage
of the insert node {} has reached the dangerous threshold {}",
- pipeName,
- dataRegionId,
- floatingMemoryUsageInByte * pipeCount,
- 2 * totalFloatingMemorySizeInBytes / 3.0d);
- }
- return mayInsertNodeMemoryReachDangerousThreshold;
- }
-
@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
@@ -387,13 +451,20 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
.migrateState(
this,
state -> {
- if (!state.equals(TsFileEpoch.State.EMPTY)) {
- return state;
+ switch (state) {
+ case EMPTY:
+ return canNotUseTabletAnyMore(event)
+ ? TsFileEpoch.State.USING_TSFILE
+ : TsFileEpoch.State.USING_TABLET;
+ case USING_TSFILE:
+ return canNotUseTabletAnyMore(event)
+ ? TsFileEpoch.State.USING_TSFILE
+ : TsFileEpoch.State.USING_BOTH;
+ case USING_TABLET:
+ case USING_BOTH:
+ default:
+ return state;
}
-
- return canNotUseTabletAnyMore(event)
- ? TsFileEpoch.State.USING_TSFILE
- : TsFileEpoch.State.USING_TABLET;
});
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
index 18384fae48f..045f0201fcc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.metric.overview;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
@@ -84,6 +85,31 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
String.valueOf(operator.getCreationTime()));
}
+ public boolean mayRemainingInsertEventExceedLimit(final String pipeID) {
+ if (Objects.isNull(metricService)) {
+ return true;
+ }
+
+ if (remainingEventAndTimeOperatorMap.values().stream()
+
.map(PipeDataNodeRemainingEventAndTimeOperator::getRemainingInsertEventSmoothingCount)
+ .reduce(0d, Double::sum)
+ >
PipeConfig.getInstance().getPipeMaxAllowedTotalRemainingInsertEventCount()) {
+ return true;
+ }
+
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.get(pipeID);
+ if (Objects.isNull(operator)) {
+ LOGGER.warn(
+ "Failed to get remaining insert event,
RemainingEventAndTimeOperator({}) does not exist, will degrade anyway",
+ pipeID);
+ return true;
+ }
+
+ return operator.getRemainingInsertEventSmoothingCount()
+ >
PipeConfig.getInstance().getPipeMaxAllowedRemainingInsertEventCountPerPipe();
+ }
+
@Override
public void unbindFrom(final AbstractMetricService metricService) {
ImmutableSet.copyOf(remainingEventAndTimeOperatorMap.keySet()).forEach(this::deregister);
@@ -147,20 +173,36 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
- public void increaseTabletEventCount(final String pipeName, final long
creationTime) {
+ public void increaseInsertNodeEventCount(final String pipeName, final long
creationTime) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .increaseInsertNodeEventCount();
+ }
+
+ public void decreaseInsertNodeEventCount(final String pipeName, final long
creationTime) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .decreaseInsertNodeEventCount();
+ }
+
+ public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
- .increaseTabletEventCount();
+ .increaseRawTabletEventCount();
}
- public void decreaseTabletEventCount(final String pipeName, final long
creationTime) {
+ public void decreaseRawTabletEventCount(final String pipeName, final long
creationTime) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
- .decreaseTabletEventCount();
+ .decreaseRawTabletEventCount();
}
public void increaseTsFileEventCount(final String pipeName, final long
creationTime) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 09f6823f03d..218448da0d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -44,7 +44,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- private final AtomicInteger tabletEventCount = new AtomicInteger(0);
+ private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
+ private final AtomicInteger rawTabletEventCount = new AtomicInteger(0);
private final AtomicInteger tsfileEventCount = new AtomicInteger(0);
private final AtomicInteger heartbeatEventCount = new AtomicInteger(0);
@@ -53,6 +54,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private final IoTDBHistogram collectInvocationHistogram =
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
+ private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
+ private final Meter insertNodeEventCountMeter =
+ new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
+
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -62,12 +67,20 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
//////////////////////////// Remaining event & time calculation
////////////////////////////
- void increaseTabletEventCount() {
- tabletEventCount.incrementAndGet();
+ void increaseInsertNodeEventCount() {
+ insertNodeEventCount.incrementAndGet();
+ }
+
+ void decreaseInsertNodeEventCount() {
+ insertNodeEventCount.decrementAndGet();
+ }
+
+ void increaseRawTabletEventCount() {
+ rawTabletEventCount.incrementAndGet();
}
- void decreaseTabletEventCount() {
- tabletEventCount.decrementAndGet();
+ void decreaseRawTabletEventCount() {
+ rawTabletEventCount.decrementAndGet();
}
void increaseTsFileEventCount() {
@@ -86,10 +99,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
heartbeatEventCount.decrementAndGet();
}
+ double getRemainingInsertEventSmoothingCount() {
+ if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
+ >=
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
{
+ insertNodeEventCountMeter.mark(insertNodeEventCount.get());
+ lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
+ }
+ return PipeConfig.getInstance()
+ .getPipeRemainingTimeCommitRateAverageTime()
+ .getMeterRate(insertNodeEventCountMeter);
+ }
+
long getRemainingEvents() {
final long remainingEvents =
tsfileEventCount.get()
- + tabletEventCount.get()
+ + rawTabletEventCount.get()
+ + insertNodeEventCount.get()
+ heartbeatEventCount.get()
+ schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
@@ -116,7 +141,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
final double totalDataRegionWriteEventCount =
- tsfileEventCount.get() * Math.max(invocationValue, 1) +
tabletEventCount.get();
+ tsfileEventCount.get() * Math.max(invocationValue, 1)
+ + rawTabletEventCount.get()
+ + insertNodeEventCount.get();
dataRegionCommitMeter.updateAndGet(
meter -> {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a813cf216e1..b55ef0347f1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -265,10 +265,10 @@ public class CommonConfig {
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
- private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
- private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
- private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
- private long pipeMaxAllowedLinkedTsFileCount = 300;
+ private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE;
// Deprecated
+ private int pipeMaxAllowedPendingTsFileEpochPerDataRegion =
Integer.MAX_VALUE; // Deprecated
+ private int pipeMaxAllowedPinnedMemTableCount = Integer.MAX_VALUE; // per
data region
+ private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
@@ -276,6 +276,9 @@ public class CommonConfig {
private long pipeFlushAfterLastTerminateSeconds = 30;
private long pipeFlushAfterTerminateCount = 30;
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
+ private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
+ private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
+ private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1443,6 +1446,57 @@ public class CommonConfig {
return pipeStorageEngineFlushTimeIntervalMs;
}
+ public int getPipeMaxAllowedRemainingInsertEventCountPerPipe() {
+ return pipeMaxAllowedRemainingInsertEventCountPerPipe;
+ }
+
+ public void setPipeMaxAllowedRemainingInsertEventCountPerPipe(
+ int pipeMaxAllowedRemainingInsertEventCountPerPipe) {
+ if (this.pipeMaxAllowedRemainingInsertEventCountPerPipe
+ == pipeMaxAllowedRemainingInsertEventCountPerPipe) {
+ return;
+ }
+ this.pipeMaxAllowedRemainingInsertEventCountPerPipe =
+ pipeMaxAllowedRemainingInsertEventCountPerPipe;
+ logger.info(
+ "pipeMaxAllowedRemainingInsertEventCount is set to {}",
+ pipeMaxAllowedRemainingInsertEventCountPerPipe);
+ }
+
+ public int getPipeMaxAllowedTotalRemainingInsertEventCount() {
+ return pipeMaxAllowedTotalRemainingInsertEventCount;
+ }
+
+ public void setPipeMaxAllowedTotalRemainingInsertEventCount(
+ int pipeMaxAllowedTotalRemainingInsertEventCount) {
+ if (this.pipeMaxAllowedTotalRemainingInsertEventCount
+ == pipeMaxAllowedTotalRemainingInsertEventCount) {
+ return;
+ }
+ this.pipeMaxAllowedTotalRemainingInsertEventCount =
+ pipeMaxAllowedTotalRemainingInsertEventCount;
+ logger.info(
+ "pipeMaxAllowedTotalRemainingInsertEventCount is set to {}",
+ pipeMaxAllowedTotalRemainingInsertEventCount);
+ }
+
+ public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
+ return pipeRemainingEventCountSmoothingIntervalSeconds;
+ }
+
+ public void setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
+ int pipeRemainingEventCountSmoothingIntervalSeconds) {
+ if (this.pipeRemainingEventCountSmoothingIntervalSeconds
+ == pipeRemainingEventCountSmoothingIntervalSeconds) {
+ return;
+ }
+ this.pipeRemainingEventCountSmoothingIntervalSeconds =
+ pipeRemainingEventCountSmoothingIntervalSeconds;
+ logger.info(
+ "pipeRemainingEventCountSmoothingIntervalSeconds is set to {}",
+ pipeRemainingEventCountSmoothingIntervalSeconds);
+ }
+
public void setPipeStuckRestartIntervalSeconds(long
pipeStuckRestartIntervalSeconds) {
if (this.pipeStuckRestartIntervalSeconds ==
pipeStuckRestartIntervalSeconds) {
return;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9b63b46626c..851ed8c4b5d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -315,6 +315,18 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
}
+ public int getPipeMaxAllowedRemainingInsertEventCountPerPipe() {
+ return COMMON_CONFIG.getPipeMaxAllowedRemainingInsertEventCountPerPipe();
+ }
+
+ public int getPipeMaxAllowedTotalRemainingInsertEventCount() {
+ return COMMON_CONFIG.getPipeMaxAllowedTotalRemainingInsertEventCount();
+ }
+
+ public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
+ return
COMMON_CONFIG.getPipeRemainingInsertEventCountSmoothingIntervalSeconds();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public int getPipeMetaReportMaxLogNumPerRound() {
@@ -542,6 +554,15 @@ public class PipeConfig {
LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}",
getPipeFlushAfterLastTerminateSeconds());
LOGGER.info(
"PipeStorageEngineFlushTimeIntervalMs: {}",
getPipeStorageEngineFlushTimeIntervalMs());
+ LOGGER.info(
+ "PipeMaxAllowedRemainingInsertEventCountPerPipe: {}",
+ getPipeMaxAllowedRemainingInsertEventCountPerPipe());
+ LOGGER.info(
+ "PipeMaxAllowedTotalRemainingInsertEventCount: {}",
+ getPipeMaxAllowedTotalRemainingInsertEventCount());
+ LOGGER.info(
+ "PipeRemainingInsertEventCountSmoothingIntervalSeconds: {}",
+ getPipeRemainingInsertEventCountSmoothingIntervalSeconds());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 919df926189..f25c285ae5a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -426,6 +426,22 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
+ config.setPipeMaxAllowedRemainingInsertEventCountPerPipe(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_allowed_remaining_insert_event_count_per_pipe",
+
String.valueOf(config.getPipeMaxAllowedRemainingInsertEventCountPerPipe()))));
+ config.setPipeMaxAllowedTotalRemainingInsertEventCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_allowed_total_remaining_insert_event_count",
+
String.valueOf(config.getPipeMaxAllowedTotalRemainingInsertEventCount()))));
+ config.setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_remaining_insert_event_count_smoothing_interval_seconds",
+ String.valueOf(
+
config.getPipeRemainingInsertEventCountSmoothingIntervalSeconds()))));
config.setPipeStuckRestartMinIntervalMs(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index 8e18fcfa546..49699fdf878 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -29,6 +29,8 @@ public class PipeLogManager {
private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<Class<?>, ConcurrentMap<Object, PipeLogStatus>>
+ logClass2Key2StatusMap = new ConcurrentHashMap<>();
public Optional<Logger> schedule(
final Class<?> logClass,
@@ -40,4 +42,16 @@ public class PipeLogManager {
logClass, k -> new PipeLogStatus(logClass, maxAverageScale,
maxLogInterval))
.schedule(scale);
}
+
+ public Optional<Logger> schedule(
+ final Class<?> logClass,
+ final Object key,
+ final int maxAverageScale,
+ final int maxLogInterval,
+ final int scale) {
+ return logClass2Key2StatusMap
+ .computeIfAbsent(logClass, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(key, k -> new PipeLogStatus(logClass,
maxAverageScale, maxLogInterval))
+ .schedule(scale);
+ }
}