This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 188daf79f18 Subscription: stream parsing tablets during the poll 
process to reduce memory usage (#14101) (#14110)
188daf79f18 is described below

commit 188daf79f180527720aedd06165dae9104fef11e
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Nov 18 16:11:05 2024 +0800

    Subscription: stream parsing tablets during the poll process to reduce 
memory usage (#14101) (#14110)
---
 .../SubscriptionSessionDataSetsHandler.java        |  19 +-
 .../event/batch/SubscriptionPipeEventBatch.java    |   2 +-
 .../batch/SubscriptionPipeTabletEventBatch.java    | 200 +++++++++++++++------
 .../response/SubscriptionEventTabletResponse.java  |  73 ++++----
 4 files changed, 191 insertions(+), 103 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
index 9da0aa3805a..6bca41eb6d9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
@@ -23,26 +23,33 @@ import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandl
 
 import org.apache.tsfile.write.record.Tablet;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 public class SubscriptionSessionDataSetsHandler
     implements Iterable<SubscriptionSessionDataSet>, 
SubscriptionMessageHandler {
 
-  private final List<SubscriptionSessionDataSet> dataSets;
-
   private final List<Tablet> tablets;
 
   public SubscriptionSessionDataSetsHandler(final List<Tablet> tablets) {
-    this.dataSets = new ArrayList<>();
     this.tablets = tablets;
-    tablets.forEach((tablet -> this.dataSets.add(new 
SubscriptionSessionDataSet(tablet))));
   }
 
   @Override
   public Iterator<SubscriptionSessionDataSet> iterator() {
-    return dataSets.iterator();
+    return new Iterator<SubscriptionSessionDataSet>() {
+      final Iterator<Tablet> tabletsIterator = tablets.iterator();
+
+      @Override
+      public boolean hasNext() {
+        return tabletsIterator.hasNext();
+      }
+
+      @Override
+      public SubscriptionSessionDataSet next() {
+        return new SubscriptionSessionDataSet(tabletsIterator.next());
+      }
+    };
   }
 
   public Iterator<Tablet> tabletIterator() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index dbc06881cae..58aac3714c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -124,7 +124,7 @@ public abstract class SubscriptionPipeEventBatch {
     result.put("prefetchingQueue", 
prefetchingQueue.coreReportMessage().toString());
     result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
     result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
-    // TODO: stringify subscription events?
+    // omit subscription events here
     result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
     return result;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 2100d3839de..fe5ffee564f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -27,58 +27,48 @@ import 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.metrics.core.utils.IoTDBMovingAverage;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Meter;
 import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch {
+public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
+    implements Iterator<List<Tablet>> {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
 
-  private volatile List<Tablet> tablets = new LinkedList<>();
   private long firstEventProcessingTime = Long.MIN_VALUE;
   private long totalBufferSize = 0;
 
+  private volatile Iterator<EnrichedEvent> enrichedEventsIterator;
+  private volatile Iterator<TabletInsertionEvent> 
currentTabletInsertionEventsIterator;
+
+  private final Meter insertNodeTabletInsertionEventSizeEstimator;
+  private final Meter rawTabletInsertionEventSizeEstimator;
+
   public SubscriptionPipeTabletEventBatch(
       final int regionId,
       final SubscriptionPrefetchingTabletQueue prefetchingQueue,
       final int maxDelayInMs,
       final long maxBatchSizeInBytes) {
     super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
-  }
 
-  public LinkedList<Tablet> moveTablets() {
-    if (Objects.isNull(tablets)) {
-      tablets = new ArrayList<>();
-      for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-        if (enrichedEvent instanceof TsFileInsertionEvent) {
-          onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent);
-        } else if (enrichedEvent instanceof TabletInsertionEvent) {
-          onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent);
-        } else {
-          LOGGER.warn(
-              "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} 
when moving.",
-              this,
-              enrichedEvent);
-        }
-      }
-    }
-    final LinkedList<Tablet> result = new LinkedList<>(tablets);
-    firstEventProcessingTime = Long.MIN_VALUE;
-    totalBufferSize = 0;
-    tablets = null; // reset to null for gc & subsequent move
-    return result;
+    this.insertNodeTabletInsertionEventSizeEstimator =
+        new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
+    this.rawTabletInsertionEventSizeEstimator =
+        new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
   }
 
   /////////////////////////////// ack & clean ///////////////////////////////
@@ -97,36 +87,43 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
     enrichedEvents.clear();
-    if (Objects.nonNull(tablets)) {
-      tablets.clear();
-    }
+
+    enrichedEventsIterator = null;
+    currentTabletInsertionEventsIterator = null;
   }
 
   /////////////////////////////// utility ///////////////////////////////
 
   @Override
   protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
-    constructBatch(event);
+    // update processing time
     if (firstEventProcessingTime == Long.MIN_VALUE) {
       firstEventProcessingTime = System.currentTimeMillis();
     }
+
+    // update buffer size
+    // TODO: more precise computation
+    if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+      totalBufferSize += getEstimatedInsertNodeTabletInsertionEventSize();
+    } else if (event instanceof PipeRawTabletInsertionEvent) {
+      totalBufferSize += getEstimatedRawTabletInsertionEventSize();
+    }
   }
 
   @Override
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
-    for (final TabletInsertionEvent tabletInsertionEvent :
-        ((PipeTsFileInsertionEvent) event)
-            
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())) {
-      onTabletInsertionEvent(tabletInsertionEvent);
+    // update processing time
+    if (firstEventProcessingTime == Long.MIN_VALUE) {
+      firstEventProcessingTime = System.currentTimeMillis();
     }
+
+    // update buffer size
+    // TODO: more precise computation
+    totalBufferSize += ((PipeTsFileInsertionEvent) event).getTsFile().length();
   }
 
   @Override
   protected List<SubscriptionEvent> generateSubscriptionEvents() {
-    if (tablets.isEmpty()) {
-      return null;
-    }
-
     return Collections.singletonList(
         new SubscriptionEvent(this, 
prefetchingQueue.generateSubscriptionCommitContext()));
   }
@@ -137,25 +134,21 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
         || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
-  private void constructBatch(final TabletInsertionEvent event) {
-    final List<Tablet> currentTablets = convertToTablets(event);
-    if (currentTablets.isEmpty()) {
-      return;
-    }
-    tablets.addAll(currentTablets);
-    totalBufferSize +=
-        currentTablets.stream()
-            .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
-            .reduce(Long::sum)
-            .orElse(0L);
-  }
-
   private List<Tablet> convertToTablets(final TabletInsertionEvent 
tabletInsertionEvent) {
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      return ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).convertToTablets();
+      final List<Tablet> tablets =
+          ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).convertToTablets();
+      updateEstimatedInsertNodeTabletInsertionEventSize(
+          tablets.stream()
+              .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+              .reduce(Long::sum)
+              .orElse(0L));
+      return tablets;
     } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-      return Collections.singletonList(
-          ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).convertToTablet());
+      final Tablet tablet = ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).convertToTablet();
+      updateEstimatedRawTabletInsertionEventSize(
+          PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
+      return Collections.singletonList(tablet);
     }
 
     LOGGER.warn(
@@ -165,6 +158,96 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     return Collections.emptyList();
   }
 
+  /////////////////////////////// estimator ///////////////////////////////
+
+  private long getEstimatedInsertNodeTabletInsertionEventSize() {
+    return Math.max(100L, (long) 
insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
+  }
+
+  private void updateEstimatedInsertNodeTabletInsertionEventSize(final long 
size) {
+    insertNodeTabletInsertionEventSizeEstimator.mark(size);
+  }
+
+  private long getEstimatedRawTabletInsertionEventSize() {
+    return Math.max(100L, (long) 
rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
+  }
+
+  private void updateEstimatedRawTabletInsertionEventSize(final long size) {
+    rawTabletInsertionEventSizeEstimator.mark(size);
+  }
+
+  /////////////////////////////// iterator ///////////////////////////////
+
+  public void resetIterator() {
+    enrichedEventsIterator = enrichedEvents.iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+      if (currentTabletInsertionEventsIterator.hasNext()) {
+        return true;
+      } else {
+        // reset
+        currentTabletInsertionEventsIterator = null;
+        return false;
+      }
+    }
+
+    if (Objects.isNull(enrichedEventsIterator)) {
+      return false;
+    }
+
+    if (enrichedEventsIterator.hasNext()) {
+      return true;
+    } else {
+      // reset
+      enrichedEventsIterator = null;
+      return false;
+    }
+  }
+
+  @Override
+  public List<Tablet> next() {
+    if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+      if (currentTabletInsertionEventsIterator.hasNext()) {
+        return convertToTablets(currentTabletInsertionEventsIterator.next());
+      } else {
+        currentTabletInsertionEventsIterator = null;
+      }
+    }
+
+    if (Objects.isNull(enrichedEventsIterator)) {
+      return null;
+    }
+
+    if (!enrichedEventsIterator.hasNext()) {
+      return null;
+    }
+
+    final EnrichedEvent enrichedEvent = enrichedEventsIterator.next();
+    if (enrichedEvent instanceof TsFileInsertionEvent) {
+      if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+        LOGGER.warn(
+            "SubscriptionPipeTabletEventBatch {} override non-null 
currentTabletInsertionEventsIterator when iterating (broken invariant).",
+            this);
+      }
+      currentTabletInsertionEventsIterator =
+          ((PipeTsFileInsertionEvent) enrichedEvent)
+              
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
+              .iterator();
+      return next();
+    } else if (enrichedEvent instanceof TabletInsertionEvent) {
+      return convertToTablets((TabletInsertionEvent) enrichedEvent);
+    } else {
+      LOGGER.warn(
+          "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} when 
iterating (broken invariant).",
+          this,
+          enrichedEvent);
+      return null;
+    }
+  }
+
   /////////////////////////////// stringify ///////////////////////////////
 
   @Override
@@ -175,11 +258,14 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
   @Override
   protected Map<String, String> coreReportMessage() {
     final Map<String, String> coreReportMessage = super.coreReportMessage();
-    coreReportMessage.put(
-        "size of tablets",
-        (Objects.nonNull(tablets) ? String.valueOf(tablets.size()) : 
"<unknown>"));
     coreReportMessage.put("firstEventProcessingTime", 
String.valueOf(firstEventProcessingTime));
     coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
+    coreReportMessage.put(
+        "estimatedInsertNodeTabletInsertionEventSize",
+        String.valueOf(getEstimatedInsertNodeTabletInsertionEventSize()));
+    coreReportMessage.put(
+        "estimatedRawTabletInsertionEventSize",
+        String.valueOf(getEstimatedRawTabletInsertionEventSize()));
     return coreReportMessage;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index e2590f5ff32..61cce24294d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -33,11 +33,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -56,7 +54,6 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
   private final SubscriptionPipeTabletEventBatch batch;
   private final SubscriptionCommitContext commitContext;
 
-  private volatile LinkedList<Tablet> tablets;
   private volatile int tabletsSize;
   private final AtomicInteger nextOffset = new AtomicInteger(0);
 
@@ -65,21 +62,17 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
     this.batch = batch;
     this.commitContext = commitContext;
 
-    init(batch);
+    init();
   }
 
   @Override
   public void prefetchRemainingResponses() {
-    if (hasNoMore) {
-      return;
-    }
-
-    offer(generateNextTabletResponse());
+    // do nothing
   }
 
   @Override
   public void fetchNextResponse(final long offset /* unused */) {
-    prefetchRemainingResponses();
+    offer(generateNextTabletResponse());
     if (Objects.isNull(poll())) {
       LOGGER.warn(
           "SubscriptionEventTabletResponse {} is empty when fetching next 
response (broken invariant)",
@@ -94,21 +87,20 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       return;
     }
     cleanUp();
-    init(batch);
+    init();
   }
 
   @Override
   public synchronized void cleanUp() {
     super.cleanUp();
 
-    tablets = null;
     tabletsSize = 0;
     nextOffset.set(0);
   }
 
   /////////////////////////////// utility ///////////////////////////////
 
-  private void init(final SubscriptionPipeTabletEventBatch batch) {
+  private void init() {
     if (!isEmpty()) {
       LOGGER.warn(
           "SubscriptionEventTabletResponse {} is not empty when initializing 
(broken invariant)",
@@ -116,42 +108,46 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
       return;
     }
 
-    tablets = batch.moveTablets();
-    tabletsSize = tablets.size();
+    batch.resetIterator();
     offer(generateNextTabletResponse());
   }
 
   private synchronized CachedSubscriptionPollResponse 
generateNextTabletResponse() {
     final List<Tablet> currentTablets = new ArrayList<>();
-    final AtomicLong currentTotalBufferSize = new AtomicLong();
+    long currentBufferSize = 0;
+
+    while (batch.hasNext()) {
+      final List<Tablet> tablets = batch.next();
+      if (Objects.isNull(tablets)) {
+        continue;
+      }
+
+      currentTablets.addAll(tablets);
+      final long bufferSize =
+          tablets.stream()
+              .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+              .reduce(Long::sum)
+              .orElse(0L);
+      tabletsSize += tablets.size();
 
-    Tablet currentTablet;
-    while (!tablets.isEmpty() && Objects.nonNull(currentTablet = 
tablets.removeFirst())) {
-      final long bufferSize = 
PipeMemoryWeightUtil.calculateTabletSizeInBytes(currentTablet);
       if (bufferSize > READ_TABLET_BUFFER_SIZE) {
-        LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
-        tablets.addAll(currentTablets); // re-enqueue previous tablets
-        currentTablets.clear();
-        currentTotalBufferSize.set(0);
+        // TODO: split tablets
+        LOGGER.warn("Detect large tablets with {} byte(s).", bufferSize);
         return new CachedSubscriptionPollResponse(
             SubscriptionPollResponseType.TABLETS.getType(),
-            new TabletsPayload(
-                Collections.singletonList(currentTablet), 
nextOffset.incrementAndGet()),
+            new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
             commitContext);
       }
-      if (currentTotalBufferSize.get() + bufferSize > READ_TABLET_BUFFER_SIZE) 
{
-        final CachedSubscriptionPollResponse response =
-            new CachedSubscriptionPollResponse(
-                SubscriptionPollResponseType.TABLETS.getType(),
-                new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
-                commitContext);
-        tablets.add(currentTablet); // re-enqueue current tablet
-        currentTablets.clear();
-        currentTotalBufferSize.set(0);
-        return response;
+
+      if (currentBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
+        // TODO: split tablets
+        return new CachedSubscriptionPollResponse(
+            SubscriptionPollResponseType.TABLETS.getType(),
+            new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
+            commitContext);
       }
-      currentTablets.add(currentTablet);
-      currentTotalBufferSize.addAndGet(bufferSize);
+
+      currentBufferSize += bufferSize;
     }
 
     final CachedSubscriptionPollResponse response;
@@ -169,8 +165,7 @@ public class SubscriptionEventTabletResponse extends 
SubscriptionEventExtendable
               new TabletsPayload(new ArrayList<>(currentTablets), 
nextOffset.incrementAndGet()),
               commitContext);
     }
-    currentTablets.clear();
-    currentTotalBufferSize.set(0);
+
     return response;
   }
 }

Reply via email to