This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c5a2e4364f4 Pipe: implement AutoCloseable for insert node event and 
raw tablet event & Subscription: add parameter control tablet event batch 
estimate & close memory block in tablet event for iteration snapshot (#14991)
c5a2e4364f4 is described below

commit c5a2e4364f42f5c644c8aa165466dd8cd5449d65
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 3 11:14:19 2025 +0800

    Pipe: implement AutoCloseable for insert node event and raw tablet event & 
Subscription: add parameter control tablet event batch estimate & close memory 
block in tablet event for iteration snapshot (#14991)
---
 .../agent/task/connection/PipeEventCollector.java  |  1 +
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 65 +++++++++++++++-------
 .../common/tablet/PipeRawTabletInsertionEvent.java | 28 ++++++++--
 .../batch/SubscriptionPipeTabletEventBatch.java    | 44 +++++++++------
 .../SubscriptionPipeTabletIterationSnapshot.java   | 48 +++++++++-------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 34 +++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       | 16 ++++++
 .../subscription/config/SubscriptionConfig.java    | 21 +++++++
 8 files changed, 195 insertions(+), 62 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index f6a6f07c066..af9ed00b4ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -100,6 +100,7 @@ public class PipeEventCollector implements EventCollector {
   }
 
   private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent 
sourceEvent) {
+    // TODO: let subscription module fully manage the parsing process of the 
insert node event
     if (sourceEvent.shouldParseTimeOrPattern()) {
       for (final PipeRawTabletInsertionEvent parsedEvent :
           sourceEvent.toRawTabletInsertionEvents()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 730dbd40d5e..d1159776d18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -65,11 +65,12 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
-    implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
+    implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable, 
AutoCloseable {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
@@ -84,7 +85,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   private final boolean isAligned;
   private final boolean isGeneratedByPipe;
 
-  private final PipeTabletMemoryBlock allocatedMemoryBlock;
+  private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
+  private volatile List<Tablet> tablets;
 
   private List<TabletInsertionEventParser> eventParsers;
 
@@ -149,9 +151,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     this.isAligned = isAligned;
     this.isGeneratedByPipe = isGeneratedByPipe;
 
-    // Allocate empty memory block, will be resized later.
-    this.allocatedMemoryBlock =
-        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+    this.allocatedMemoryBlock = new AtomicReference<>();
   }
 
   public InsertNode getInsertNode() throws WALPipeException {
@@ -205,7 +205,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         eventParsers.clear();
         eventParsers = null;
       }
-      allocatedMemoryBlock.close();
+      close();
       return true;
     } catch (final Exception e) {
       LOGGER.warn(
@@ -377,18 +377,21 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   }
 
   // TODO: for table model insertion, we need to get the database name
-  public List<Tablet> convertToTablets() {
-    final List<Tablet> tablets =
-        initEventParsers().stream()
-            .map(TabletInsertionEventParser::convertToTablet)
-            .collect(Collectors.toList());
-    PipeDataNodeResourceManager.memory()
-        .forceResize(
-            allocatedMemoryBlock,
-            tablets.stream()
-                .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
-                .reduce(Long::sum)
-                .orElse(0L));
+  public synchronized List<Tablet> convertToTablets() {
+    if (Objects.isNull(tablets)) {
+      tablets =
+          initEventParsers().stream()
+              .map(TabletInsertionEventParser::convertToTablet)
+              .collect(Collectors.toList());
+      allocatedMemoryBlock.compareAndSet(
+          null,
+          PipeDataNodeResourceManager.memory()
+              .forceAllocateForTabletWithRetry(
+                  tablets.stream()
+                      .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+                      .reduce(Long::sum)
+                      .orElse(0L)));
+    }
     return tablets;
   }
 
@@ -529,13 +532,13 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   private static class PipeInsertNodeTabletInsertionEventResource extends 
PipeEventResource {
 
     private final WALEntryHandler walEntryHandler;
-    private final PipeTabletMemoryBlock allocatedMemoryBlock;
+    private final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock;
 
     private PipeInsertNodeTabletInsertionEventResource(
         final AtomicBoolean isReleased,
         final AtomicInteger referenceCount,
         final WALEntryHandler walEntryHandler,
-        final PipeTabletMemoryBlock allocatedMemoryBlock) {
+        final AtomicReference<PipeTabletMemoryBlock> allocatedMemoryBlock) {
       super(isReleased, referenceCount);
       this.walEntryHandler = walEntryHandler;
       this.allocatedMemoryBlock = allocatedMemoryBlock;
@@ -545,11 +548,31 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     protected void finalizeResource() {
       try {
         PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
-        allocatedMemoryBlock.close();
+        allocatedMemoryBlock.getAndUpdate(
+            memoryBlock -> {
+              if (Objects.nonNull(memoryBlock)) {
+                memoryBlock.close();
+              }
+              return null;
+            });
       } catch (final Exception e) {
         LOGGER.warn(
             "Decrease reference count for memTable {} error.", 
walEntryHandler.getMemTableId(), e);
       }
     }
   }
+
+  /////////////////////////// AutoCloseable ///////////////////////////
+
+  @Override
+  public synchronized void close() {
+    allocatedMemoryBlock.getAndUpdate(
+        memoryBlock -> {
+          if (Objects.nonNull(memoryBlock)) {
+            memoryBlock.close();
+          }
+          return null;
+        });
+    tablets = null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 531da2abbd4..a366735d6e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
 public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
-    implements TabletInsertionEvent, ReferenceTrackableEvent {
+    implements TabletInsertionEvent, ReferenceTrackableEvent, AutoCloseable {
 
   // For better calculation
   private static final long INSTANCE_SIZE =
@@ -62,7 +62,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
   private final EnrichedEvent sourceEvent;
   private boolean needToReport;
 
-  private PipeTabletMemoryBlock allocatedMemoryBlock;
+  private final PipeTabletMemoryBlock allocatedMemoryBlock;
 
   private TabletInsertionEventParser eventParser;
 
@@ -100,6 +100,10 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     this.isAligned = isAligned;
     this.sourceEvent = sourceEvent;
     this.needToReport = needToReport;
+
+    // Allocate empty memory block, will be resized later.
+    this.allocatedMemoryBlock =
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
   }
 
   public PipeRawTabletInsertionEvent(
@@ -183,10 +187,10 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
 
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
-    allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .forceAllocateForTabletWithRetry(
-                PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
+    PipeDataNodeResourceManager.memory()
+        .forceResize(
+            allocatedMemoryBlock,
+            PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
           .increaseTabletEventCount(pipeName, creationTime);
@@ -431,4 +435,16 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
       allocatedMemoryBlock.close();
     }
   }
+
+  /////////////////////////// AutoCloseable ///////////////////////////
+
+  @Override
+  public void close() {
+    // The semantic of close is to release the memory occupied by parsing, 
this method does nothing
+    // to unify the external close semantic:
+    //   1. PipeRawTabletInsertionEvent: the tablet occupying memory upon 
construction, even when
+    // parsing is involved.
+    //   2. PipeInsertNodeTabletInsertionEvent: the tablet is only constructed 
when it's actually
+    // involved in parsing.
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index d2cbf72be0e..bc705287993 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -91,10 +92,6 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     // we decrease the reference count of events if and only if when the whole 
batch is consumed
     if (!hasNext() && referenceCount.get() == 0) {
       for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-        if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
-          // close data container in tsfile event
-          ((PipeTsFileInsertionEvent) enrichedEvent).close();
-        }
         enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
       }
     }
@@ -109,10 +106,6 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
     // clear the reference count of events
     for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
-        // close data container in tsfile event
-        ((PipeTsFileInsertionEvent) enrichedEvent).close();
-      }
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
     enrichedEvents.clear();
@@ -162,7 +155,11 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   protected boolean shouldEmit() {
     return totalBufferSize >= maxBatchSizeInBytes
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs
+        // considering the inaccuracy of the estimation, configure a hard 
limit here to avoid an
+        // excessively large batch
+        || enrichedEvents.size()
+            >= 
SubscriptionConfig.getInstance().getSubscriptionMaxAllowedEventCountInTabletBatch();
   }
 
   private List<Tablet> convertToTablets(final TabletInsertionEvent 
tabletInsertionEvent) {
@@ -192,7 +189,10 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   /////////////////////////////// estimator ///////////////////////////////
 
   private long getEstimatedInsertNodeTabletInsertionEventSize() {
-    return Math.max(100L, (long) 
insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
+    return Math.max(
+        SubscriptionConfig.getInstance()
+            .getSubscriptionEstimatedInsertNodeTabletInsertionEventSize(),
+        (long) insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
   }
 
   private void updateEstimatedInsertNodeTabletInsertionEventSize(final long 
size) {
@@ -200,7 +200,9 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   }
 
   private long getEstimatedRawTabletInsertionEventSize() {
-    return Math.max(100L, (long) 
rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
+    return Math.max(
+        
SubscriptionConfig.getInstance().getSubscriptionEstimatedRawTabletInsertionEventSize(),
+        (long) rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
   }
 
   private void updateEstimatedRawTabletInsertionEventSize(final long size) {
@@ -283,13 +285,20 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       if (currentTabletInsertionEventsIterator.hasNext()) {
         final TabletInsertionEvent tabletInsertionEvent =
             currentTabletInsertionEventsIterator.next();
-        if (!((EnrichedEvent) tabletInsertionEvent)
-            .increaseReferenceCount(this.getClass().getName())) {
+        if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
           LOGGER.warn(
-              "SubscriptionPipeTabletEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
-              ((EnrichedEvent) tabletInsertionEvent).coreReportMessage());
+              "SubscriptionPipeTabletEventBatch: Unexpected tablet insertion 
event {}, skipping it.",
+              tabletInsertionEvent);
         } else {
-          iterationSnapshot.addParsedEnrichedEvent((EnrichedEvent) 
tabletInsertionEvent);
+          if (!((PipeRawTabletInsertionEvent) tabletInsertionEvent)
+              .increaseReferenceCount(this.getClass().getName())) {
+            LOGGER.warn(
+                "SubscriptionPipeTabletEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
+                ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).coreReportMessage());
+          } else {
+            iterationSnapshot.addParsedEnrichedEvent(
+                (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+          }
         }
         if (!currentTabletInsertionEventsIterator.hasNext()) {
           iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent) 
currentTsFileInsertionEvent);
@@ -327,6 +336,9 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
               .iterator();
       return next();
     } else if (enrichedEvent instanceof TabletInsertionEvent) {
+      // There are two types of TabletInsertionEvent:
+      //   1. PipeInsertNodeTabletInsertionEvent, which binds memory blocks 
when parsed into tablets
+      //   2. PipeRawTabletInsertionEvent, which is parsed and bound with 
memory blocks upstream
       iterationSnapshot.addIteratedEnrichedEvent(enrichedEvent);
       return convertToTablets((TabletInsertionEvent) enrichedEvent);
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
index a7466e9070f..952c37492b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
@@ -30,7 +31,7 @@ import java.util.List;
 public class SubscriptionPipeTabletIterationSnapshot {
 
   private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
-  private final List<EnrichedEvent> parsedEnrichedEvents = new ArrayList<>();
+  private final List<PipeRawTabletInsertionEvent> parsedEnrichedEvents = new 
ArrayList<>();
 
   public List<EnrichedEvent> getIteratedEnrichedEvents() {
     return Collections.unmodifiableList(iteratedEnrichedEvents);
@@ -40,39 +41,48 @@ public class SubscriptionPipeTabletIterationSnapshot {
     iteratedEnrichedEvents.add(enrichedEvent);
   }
 
-  public void addParsedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+  public void addParsedEnrichedEvent(final PipeRawTabletInsertionEvent 
enrichedEvent) {
     parsedEnrichedEvents.add(enrichedEvent);
   }
 
   public void ack() {
+    closeIteratedEnrichedEvents();
+    decreaseReferenceCountOfParsedEnrichedEvents();
+  }
+
+  public void cleanUp() {
+    closeIteratedEnrichedEvents();
+    clearReferenceCountOfParsedEnrichedEvents();
+  }
+
+  private void closeIteratedEnrichedEvents() {
+    // TODO: unify close interface
     for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+      // close data container in tsfile event
       if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
-        // close data container in tsfile event
         ((PipeTsFileInsertionEvent) enrichedEvent).close();
       }
-    }
-
-    for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+      // close memory block in tablet event
+      if (enrichedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        ((PipeInsertNodeTabletInsertionEvent) enrichedEvent).close();
+      }
       if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
-        // decrease reference count in raw tablet event
-        enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+        ((PipeRawTabletInsertionEvent) enrichedEvent).close();
       }
     }
   }
 
-  public void cleanUp() {
-    for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
-      if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
-        // close data container in tsfile event
-        ((PipeTsFileInsertionEvent) enrichedEvent).close();
-      }
+  private void decreaseReferenceCountOfParsedEnrichedEvents() {
+    for (final PipeRawTabletInsertionEvent event : parsedEnrichedEvents) {
+      // decrease reference count in raw tablet event
+      event.decreaseReferenceCount(this.getClass().getName(), true);
     }
+  }
 
-    for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
-      if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
-        // clear reference count in raw tablet event
-        enrichedEvent.clearReferenceCount(this.getClass().getName());
-      }
+  private void clearReferenceCountOfParsedEnrichedEvents() {
+    for (final PipeRawTabletInsertionEvent event : parsedEnrichedEvents) {
+      // clear reference count in raw tablet event
+      event.clearReferenceCount(this.getClass().getName());
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3110aa07076..cb69d5db391 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.KB;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
 public class CommonConfig {
@@ -309,6 +310,9 @@ public class CommonConfig {
   private long subscriptionReadTabletBufferSize = 8 * MB;
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
   private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
+  private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 * 
KB;
+  private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
+  private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
 
   private boolean subscriptionPrefetchEnabled = false;
   private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1442,6 +1446,36 @@ public class CommonConfig {
     this.subscriptionCheckMemoryEnoughIntervalMs = 
subscriptionCheckMemoryEnoughIntervalMs;
   }
 
+  public long getSubscriptionEstimatedInsertNodeTabletInsertionEventSize() {
+    return subscriptionEstimatedInsertNodeTabletInsertionEventSize;
+  }
+
+  public void setSubscriptionEstimatedInsertNodeTabletInsertionEventSize(
+      final long subscriptionEstimatedInsertNodeTabletInsertionEventSize) {
+    this.subscriptionEstimatedInsertNodeTabletInsertionEventSize =
+        subscriptionEstimatedInsertNodeTabletInsertionEventSize;
+  }
+
+  public long getSubscriptionEstimatedRawTabletInsertionEventSize() {
+    return subscriptionEstimatedRawTabletInsertionEventSize;
+  }
+
+  public void setSubscriptionEstimatedRawTabletInsertionEventSize(
+      final long subscriptionEstimatedRawTabletInsertionEventSize) {
+    this.subscriptionEstimatedRawTabletInsertionEventSize =
+        subscriptionEstimatedRawTabletInsertionEventSize;
+  }
+
+  public long getSubscriptionMaxAllowedEventCountInTabletBatch() {
+    return subscriptionMaxAllowedEventCountInTabletBatch;
+  }
+
+  public void setSubscriptionMaxAllowedEventCountInTabletBatch(
+      final long subscriptionMaxAllowedEventCountInTabletBatch) {
+    this.subscriptionMaxAllowedEventCountInTabletBatch =
+        subscriptionMaxAllowedEventCountInTabletBatch;
+  }
+
   public boolean getSubscriptionPrefetchEnabled() {
     return subscriptionPrefetchEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e31969bdbb8..2afe9bbdeb4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -738,6 +738,22 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_check_memory_enough_interval_ms",
                 
String.valueOf(config.getSubscriptionCheckMemoryEnoughIntervalMs()))));
+    config.setSubscriptionEstimatedInsertNodeTabletInsertionEventSize(
+        Long.parseLong(
+            properties.getProperty(
+                
"subscription_estimated_insert_node_tablet_insertion_event_size",
+                String.valueOf(
+                    
config.getSubscriptionEstimatedInsertNodeTabletInsertionEventSize()))));
+    config.setSubscriptionEstimatedRawTabletInsertionEventSize(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_estimated_raw_tablet_insertion_event_size",
+                
String.valueOf(config.getSubscriptionEstimatedRawTabletInsertionEventSize()))));
+    config.setSubscriptionMaxAllowedEventCountInTabletBatch(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_max_allowed_event_count_in_tablet_batch",
+                
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
 
     config.setSubscriptionPrefetchEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 76e29d77ee0..1ace6e71de8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -85,6 +85,18 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionCheckMemoryEnoughIntervalMs();
   }
 
+  public long getSubscriptionEstimatedInsertNodeTabletInsertionEventSize() {
+    return 
COMMON_CONFIG.getSubscriptionEstimatedInsertNodeTabletInsertionEventSize();
+  }
+
+  public long getSubscriptionEstimatedRawTabletInsertionEventSize() {
+    return COMMON_CONFIG.getSubscriptionEstimatedRawTabletInsertionEventSize();
+  }
+
+  public long getSubscriptionMaxAllowedEventCountInTabletBatch() {
+    return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
+  }
+
   public boolean getSubscriptionPrefetchEnabled() {
     return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
   }
@@ -150,6 +162,15 @@ public class SubscriptionConfig {
     LOGGER.info(
         "SubscriptionCheckMemoryEnoughIntervalMs: {}",
         getSubscriptionCheckMemoryEnoughIntervalMs());
+    LOGGER.info(
+        "SubscriptionEstimatedInsertNodeTabletInsertionEventSize: {}",
+        getSubscriptionEstimatedInsertNodeTabletInsertionEventSize());
+    LOGGER.info(
+        "SubscriptionEstimatedRawTabletInsertionEventSize: {}",
+        getSubscriptionEstimatedRawTabletInsertionEventSize());
+    LOGGER.info(
+        "SubscriptionMaxAllowedEventCountInTabletBatch: {}",
+        getSubscriptionMaxAllowedEventCountInTabletBatch());
 
     LOGGER.info("SubscriptionPrefetchEnabled: {}", 
getSubscriptionPrefetchEnabled());
     LOGGER.info(

Reply via email to