This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c5a2e4364f4 Pipe: implement AutoCloseable for insert node event and
raw tablet event & Subscription: add parameter control tablet event batch
estimate & close memory block in tablet event for iteration snapshot (#14991)
c5a2e4364f4 is described below
commit c5a2e4364f42f5c644c8aa165466dd8cd5449d65
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 3 11:14:19 2025 +0800
Pipe: implement AutoCloseable for insert node event and raw tablet event &
Subscription: add parameter control tablet event batch estimate & close memory
block in tablet event for iteration snapshot (#14991)
---
.../agent/task/connection/PipeEventCollector.java | 1 +
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 65 +++++++++++++++-------
.../common/tablet/PipeRawTabletInsertionEvent.java | 28 ++++++++--
.../batch/SubscriptionPipeTabletEventBatch.java | 44 +++++++++------
.../SubscriptionPipeTabletIterationSnapshot.java | 48 +++++++++-------
.../apache/iotdb/commons/conf/CommonConfig.java | 34 +++++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 16 ++++++
.../subscription/config/SubscriptionConfig.java | 21 +++++++
8 files changed, 195 insertions(+), 62 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index f6a6f07c066..af9ed00b4ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -100,6 +100,7 @@ public class PipeEventCollector implements EventCollector {
}
private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent
sourceEvent) {
+ // TODO: let subscription module fully manage the parsing process of the
insert node event
if (sourceEvent.shouldParseTimeOrPattern()) {
for (final PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 730dbd40d5e..d1159776d18 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -65,11 +65,12 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
- implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
+ implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable,
AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
@@ -84,7 +85,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
private final boolean isAligned;
private final boolean isGeneratedByPipe;
- private final PipeTabletMemoryBlock allocatedMemoryBlock;
+ private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
+ private volatile List<Tablet> tablets;
private List<TabletInsertionEventParser> eventParsers;
@@ -149,9 +151,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
this.isAligned = isAligned;
this.isGeneratedByPipe = isGeneratedByPipe;
- // Allocate empty memory block, will be resized later.
- this.allocatedMemoryBlock =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ this.allocatedMemoryBlock = new AtomicReference<>();
}
public InsertNode getInsertNode() throws WALPipeException {
@@ -205,7 +205,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
eventParsers.clear();
eventParsers = null;
}
- allocatedMemoryBlock.close();
+ close();
return true;
} catch (final Exception e) {
LOGGER.warn(
@@ -377,18 +377,21 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
}
// TODO: for table model insertion, we need to get the database name
- public List<Tablet> convertToTablets() {
- final List<Tablet> tablets =
- initEventParsers().stream()
- .map(TabletInsertionEventParser::convertToTablet)
- .collect(Collectors.toList());
- PipeDataNodeResourceManager.memory()
- .forceResize(
- allocatedMemoryBlock,
- tablets.stream()
- .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
- .reduce(Long::sum)
- .orElse(0L));
+ public synchronized List<Tablet> convertToTablets() {
+ if (Objects.isNull(tablets)) {
+ tablets =
+ initEventParsers().stream()
+ .map(TabletInsertionEventParser::convertToTablet)
+ .collect(Collectors.toList());
+ allocatedMemoryBlock.compareAndSet(
+ null,
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForTabletWithRetry(
+ tablets.stream()
+ .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+ .reduce(Long::sum)
+ .orElse(0L)));
+ }
return tablets;
}
@@ -529,13 +532,13 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
private static class PipeInsertNodeTabletInsertionEventResource extends
PipeEventResource {
private final WALEntryHandler walEntryHandler;
- private final PipeTabletMemoryBlock allocatedMemoryBlock;
+ private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
private PipeInsertNodeTabletInsertionEventResource(
final AtomicBoolean isReleased,
final AtomicInteger referenceCount,
final WALEntryHandler walEntryHandler,
- final PipeTabletMemoryBlock allocatedMemoryBlock) {
+ final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock) {
super(isReleased, referenceCount);
this.walEntryHandler = walEntryHandler;
this.allocatedMemoryBlock = allocatedMemoryBlock;
@@ -545,11 +548,31 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
protected void finalizeResource() {
try {
PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
- allocatedMemoryBlock.close();
+ allocatedMemoryBlock.getAndUpdate(
+ memoryBlock -> {
+ if (Objects.nonNull(memoryBlock)) {
+ memoryBlock.close();
+ }
+ return null;
+ });
} catch (final Exception e) {
LOGGER.warn(
"Decrease reference count for memTable {} error.",
walEntryHandler.getMemTableId(), e);
}
}
}
+
+ /////////////////////////// AutoCloseable ///////////////////////////
+
+ @Override
+ public synchronized void close() {
+ allocatedMemoryBlock.getAndUpdate(
+ memoryBlock -> {
+ if (Objects.nonNull(memoryBlock)) {
+ memoryBlock.close();
+ }
+ return null;
+ });
+ tablets = null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 531da2abbd4..a366735d6e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
- implements TabletInsertionEvent, ReferenceTrackableEvent {
+ implements TabletInsertionEvent, ReferenceTrackableEvent, AutoCloseable {
// For better calculation
private static final long INSTANCE_SIZE =
@@ -62,7 +62,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
private final EnrichedEvent sourceEvent;
private boolean needToReport;
- private PipeTabletMemoryBlock allocatedMemoryBlock;
+ private final PipeTabletMemoryBlock allocatedMemoryBlock;
private TabletInsertionEventParser eventParser;
@@ -100,6 +100,10 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
this.needToReport = needToReport;
+
+ // Allocate empty memory block, will be resized later.
+ this.allocatedMemoryBlock =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
}
public PipeRawTabletInsertionEvent(
@@ -183,10 +187,10 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForTabletWithRetry(
- PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) +
INSTANCE_SIZE);
+ PipeDataNodeResourceManager.memory()
+ .forceResize(
+ allocatedMemoryBlock,
+ PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) +
INSTANCE_SIZE);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseTabletEventCount(pipeName, creationTime);
@@ -431,4 +435,16 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
allocatedMemoryBlock.close();
}
}
+
+ /////////////////////////// AutoCloseable ///////////////////////////
+
+ @Override
+ public void close() {
+ // The semantic of close is to release the memory occupied by parsing,
this method does nothing
+ // to unify the external close semantic:
+ // 1. PipeRawTabletInsertionEvent: the tablet occupying memory upon
construction, even when
+ // parsing is involved.
+ // 2. PipeInsertNodeTabletInsertionEvent: the tablet is only constructed
when it's actually
+ // involved in parsing.
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index d2cbf72be0e..bc705287993 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -91,10 +92,6 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
// we decrease the reference count of events if and only if when the whole
batch is consumed
if (!hasNext() && referenceCount.get() == 0) {
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
- if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
- // close data container in tsfile event
- ((PipeTsFileInsertionEvent) enrichedEvent).close();
- }
enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
}
}
@@ -109,10 +106,6 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
// clear the reference count of events
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
- if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
- // close data container in tsfile event
- ((PipeTsFileInsertionEvent) enrichedEvent).close();
- }
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
@@ -162,7 +155,11 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
protected boolean shouldEmit() {
return totalBufferSize >= maxBatchSizeInBytes
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs
+ // considering the inaccuracy of the estimation, configure a hard
limit here to avoid an
+ // excessively large batch
+ || enrichedEvents.size()
+ >=
SubscriptionConfig.getInstance().getSubscriptionMaxAllowedEventCountInTabletBatch();
}
private List<Tablet> convertToTablets(final TabletInsertionEvent
tabletInsertionEvent) {
@@ -192,7 +189,10 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
/////////////////////////////// estimator ///////////////////////////////
private long getEstimatedInsertNodeTabletInsertionEventSize() {
- return Math.max(100L, (long)
insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
+ return Math.max(
+ SubscriptionConfig.getInstance()
+ .getSubscriptionEstimatedInsertNodeTabletInsertionEventSize(),
+ (long) insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
}
private void updateEstimatedInsertNodeTabletInsertionEventSize(final long
size) {
@@ -200,7 +200,9 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
}
private long getEstimatedRawTabletInsertionEventSize() {
- return Math.max(100L, (long)
rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
+ return Math.max(
+
SubscriptionConfig.getInstance().getSubscriptionEstimatedRawTabletInsertionEventSize(),
+ (long) rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
}
private void updateEstimatedRawTabletInsertionEventSize(final long size) {
@@ -283,13 +285,20 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
if (currentTabletInsertionEventsIterator.hasNext()) {
final TabletInsertionEvent tabletInsertionEvent =
currentTabletInsertionEventsIterator.next();
- if (!((EnrichedEvent) tabletInsertionEvent)
- .increaseReferenceCount(this.getClass().getName())) {
+ if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
- "SubscriptionPipeTabletEventBatch: Failed to increase the
reference count of event {}, skipping it.",
- ((EnrichedEvent) tabletInsertionEvent).coreReportMessage());
+ "SubscriptionPipeTabletEventBatch: Unexpected tablet insertion
event {}, skipping it.",
+ tabletInsertionEvent);
} else {
- iterationSnapshot.addParsedEnrichedEvent((EnrichedEvent)
tabletInsertionEvent);
+ if (!((PipeRawTabletInsertionEvent) tabletInsertionEvent)
+ .increaseReferenceCount(this.getClass().getName())) {
+ LOGGER.warn(
+ "SubscriptionPipeTabletEventBatch: Failed to increase the
reference count of event {}, skipping it.",
+ ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).coreReportMessage());
+ } else {
+ iterationSnapshot.addParsedEnrichedEvent(
+ (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ }
}
if (!currentTabletInsertionEventsIterator.hasNext()) {
iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent)
currentTsFileInsertionEvent);
@@ -327,6 +336,9 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
.iterator();
return next();
} else if (enrichedEvent instanceof TabletInsertionEvent) {
+ // There are two types of TabletInsertionEvent:
+ // 1. PipeInsertNodeTabletInsertionEvent, which binds memory blocks
when parsed into tablets
+ // 2. PipeRawTabletInsertionEvent, which is parsed and bound with
memory blocks upstream
iterationSnapshot.addIteratedEnrichedEvent(enrichedEvent);
return convertToTablets((TabletInsertionEvent) enrichedEvent);
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
index a7466e9070f..952c37492b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -30,7 +31,7 @@ import java.util.List;
public class SubscriptionPipeTabletIterationSnapshot {
private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
- private final List<EnrichedEvent> parsedEnrichedEvents = new ArrayList<>();
+ private final List<PipeRawTabletInsertionEvent> parsedEnrichedEvents = new
ArrayList<>();
public List<EnrichedEvent> getIteratedEnrichedEvents() {
return Collections.unmodifiableList(iteratedEnrichedEvents);
@@ -40,39 +41,48 @@ public class SubscriptionPipeTabletIterationSnapshot {
iteratedEnrichedEvents.add(enrichedEvent);
}
- public void addParsedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+ public void addParsedEnrichedEvent(final PipeRawTabletInsertionEvent
enrichedEvent) {
parsedEnrichedEvents.add(enrichedEvent);
}
public void ack() {
+ closeIteratedEnrichedEvents();
+ decreaseReferenceCountOfParsedEnrichedEvents();
+ }
+
+ public void cleanUp() {
+ closeIteratedEnrichedEvents();
+ clearReferenceCountOfParsedEnrichedEvents();
+ }
+
+ private void closeIteratedEnrichedEvents() {
+ // TODO: unify close interface
for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+ // close data container in tsfile event
if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
- // close data container in tsfile event
((PipeTsFileInsertionEvent) enrichedEvent).close();
}
- }
-
- for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+ // close memory block in tablet event
+ if (enrichedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ ((PipeInsertNodeTabletInsertionEvent) enrichedEvent).close();
+ }
if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
- // decrease reference count in raw tablet event
- enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+ ((PipeRawTabletInsertionEvent) enrichedEvent).close();
}
}
}
- public void cleanUp() {
- for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
- if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
- // close data container in tsfile event
- ((PipeTsFileInsertionEvent) enrichedEvent).close();
- }
+ private void decreaseReferenceCountOfParsedEnrichedEvents() {
+ for (final PipeRawTabletInsertionEvent event : parsedEnrichedEvents) {
+ // decrease reference count in raw tablet event
+ event.decreaseReferenceCount(this.getClass().getName(), true);
}
+ }
- for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
- if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
- // clear reference count in raw tablet event
- enrichedEvent.clearReferenceCount(this.getClass().getName());
- }
+ private void clearReferenceCountOfParsedEnrichedEvents() {
+ for (final PipeRawTabletInsertionEvent event : parsedEnrichedEvents) {
+ // clear reference count in raw tablet event
+ event.clearReferenceCount(this.getClass().getName());
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3110aa07076..cb69d5db391 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.KB;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
public class CommonConfig {
@@ -309,6 +310,9 @@ public class CommonConfig {
private long subscriptionReadTabletBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
+ private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 *
KB;
+ private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
+ private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
private boolean subscriptionPrefetchEnabled = false;
private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1442,6 +1446,36 @@ public class CommonConfig {
this.subscriptionCheckMemoryEnoughIntervalMs =
subscriptionCheckMemoryEnoughIntervalMs;
}
+ public long getSubscriptionEstimatedInsertNodeTabletInsertionEventSize() {
+ return subscriptionEstimatedInsertNodeTabletInsertionEventSize;
+ }
+
+ public void setSubscriptionEstimatedInsertNodeTabletInsertionEventSize(
+ final long subscriptionEstimatedInsertNodeTabletInsertionEventSize) {
+ this.subscriptionEstimatedInsertNodeTabletInsertionEventSize =
+ subscriptionEstimatedInsertNodeTabletInsertionEventSize;
+ }
+
+ public long getSubscriptionEstimatedRawTabletInsertionEventSize() {
+ return subscriptionEstimatedRawTabletInsertionEventSize;
+ }
+
+ public void setSubscriptionEstimatedRawTabletInsertionEventSize(
+ final long subscriptionEstimatedRawTabletInsertionEventSize) {
+ this.subscriptionEstimatedRawTabletInsertionEventSize =
+ subscriptionEstimatedRawTabletInsertionEventSize;
+ }
+
+ public long getSubscriptionMaxAllowedEventCountInTabletBatch() {
+ return subscriptionMaxAllowedEventCountInTabletBatch;
+ }
+
+ public void setSubscriptionMaxAllowedEventCountInTabletBatch(
+ final long subscriptionMaxAllowedEventCountInTabletBatch) {
+ this.subscriptionMaxAllowedEventCountInTabletBatch =
+ subscriptionMaxAllowedEventCountInTabletBatch;
+ }
+
public boolean getSubscriptionPrefetchEnabled() {
return subscriptionPrefetchEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e31969bdbb8..2afe9bbdeb4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -738,6 +738,22 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_check_memory_enough_interval_ms",
String.valueOf(config.getSubscriptionCheckMemoryEnoughIntervalMs()))));
+ config.setSubscriptionEstimatedInsertNodeTabletInsertionEventSize(
+ Long.parseLong(
+ properties.getProperty(
+
"subscription_estimated_insert_node_tablet_insertion_event_size",
+ String.valueOf(
+
config.getSubscriptionEstimatedInsertNodeTabletInsertionEventSize()))));
+ config.setSubscriptionEstimatedRawTabletInsertionEventSize(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_estimated_raw_tablet_insertion_event_size",
+
String.valueOf(config.getSubscriptionEstimatedRawTabletInsertionEventSize()))));
+ config.setSubscriptionMaxAllowedEventCountInTabletBatch(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_max_allowed_event_count_in_tablet_batch",
+
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
config.setSubscriptionPrefetchEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 76e29d77ee0..1ace6e71de8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -85,6 +85,18 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionCheckMemoryEnoughIntervalMs();
}
+ public long getSubscriptionEstimatedInsertNodeTabletInsertionEventSize() {
+ return
COMMON_CONFIG.getSubscriptionEstimatedInsertNodeTabletInsertionEventSize();
+ }
+
+ public long getSubscriptionEstimatedRawTabletInsertionEventSize() {
+ return COMMON_CONFIG.getSubscriptionEstimatedRawTabletInsertionEventSize();
+ }
+
+ public long getSubscriptionMaxAllowedEventCountInTabletBatch() {
+ return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
+ }
+
public boolean getSubscriptionPrefetchEnabled() {
return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
}
@@ -150,6 +162,15 @@ public class SubscriptionConfig {
LOGGER.info(
"SubscriptionCheckMemoryEnoughIntervalMs: {}",
getSubscriptionCheckMemoryEnoughIntervalMs());
+ LOGGER.info(
+ "SubscriptionEstimatedInsertNodeTabletInsertionEventSize: {}",
+ getSubscriptionEstimatedInsertNodeTabletInsertionEventSize());
+ LOGGER.info(
+ "SubscriptionEstimatedRawTabletInsertionEventSize: {}",
+ getSubscriptionEstimatedRawTabletInsertionEventSize());
+ LOGGER.info(
+ "SubscriptionMaxAllowedEventCountInTabletBatch: {}",
+ getSubscriptionMaxAllowedEventCountInTabletBatch());
LOGGER.info("SubscriptionPrefetchEnabled: {}",
getSubscriptionPrefetchEnabled());
LOGGER.info(