This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 188daf79f18 Subscription: stream parsing tablets during the poll
process to reduce memory usage (#14101) (#14110)
188daf79f18 is described below
commit 188daf79f180527720aedd06165dae9104fef11e
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Nov 18 16:11:05 2024 +0800
Subscription: stream parsing tablets during the poll process to reduce
memory usage (#14101) (#14110)
---
.../SubscriptionSessionDataSetsHandler.java | 19 +-
.../event/batch/SubscriptionPipeEventBatch.java | 2 +-
.../batch/SubscriptionPipeTabletEventBatch.java | 200 +++++++++++++++------
.../response/SubscriptionEventTabletResponse.java | 73 ++++----
4 files changed, 191 insertions(+), 103 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
index 9da0aa3805a..6bca41eb6d9 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSetsHandler.java
@@ -23,26 +23,33 @@ import
org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandl
import org.apache.tsfile.write.record.Tablet;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class SubscriptionSessionDataSetsHandler
implements Iterable<SubscriptionSessionDataSet>,
SubscriptionMessageHandler {
- private final List<SubscriptionSessionDataSet> dataSets;
-
private final List<Tablet> tablets;
public SubscriptionSessionDataSetsHandler(final List<Tablet> tablets) {
- this.dataSets = new ArrayList<>();
this.tablets = tablets;
- tablets.forEach((tablet -> this.dataSets.add(new
SubscriptionSessionDataSet(tablet))));
}
@Override
public Iterator<SubscriptionSessionDataSet> iterator() {
- return dataSets.iterator();
+ return new Iterator<SubscriptionSessionDataSet>() {
+ final Iterator<Tablet> tabletsIterator = tablets.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return tabletsIterator.hasNext();
+ }
+
+ @Override
+ public SubscriptionSessionDataSet next() {
+ return new SubscriptionSessionDataSet(tabletsIterator.next());
+ }
+ };
}
public Iterator<Tablet> tabletIterator() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index dbc06881cae..58aac3714c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -124,7 +124,7 @@ public abstract class SubscriptionPipeEventBatch {
result.put("prefetchingQueue",
prefetchingQueue.coreReportMessage().toString());
result.put("maxDelayInMs", String.valueOf(maxDelayInMs));
result.put("maxBatchSizeInBytes", String.valueOf(maxBatchSizeInBytes));
- // TODO: stringify subscription events?
+ // omit subscription events here
result.put("enrichedEvents", formatEnrichedEvents(enrichedEvents, 4));
return result;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 2100d3839de..fe5ffee564f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -27,58 +27,48 @@ import
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.metrics.core.utils.IoTDBMovingAverage;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Meter;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch {
+public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
+ implements Iterator<List<Tablet>> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
- private volatile List<Tablet> tablets = new LinkedList<>();
private long firstEventProcessingTime = Long.MIN_VALUE;
private long totalBufferSize = 0;
+ private volatile Iterator<EnrichedEvent> enrichedEventsIterator;
+ private volatile Iterator<TabletInsertionEvent>
currentTabletInsertionEventsIterator;
+
+ private final Meter insertNodeTabletInsertionEventSizeEstimator;
+ private final Meter rawTabletInsertionEventSizeEstimator;
+
public SubscriptionPipeTabletEventBatch(
final int regionId,
final SubscriptionPrefetchingTabletQueue prefetchingQueue,
final int maxDelayInMs,
final long maxBatchSizeInBytes) {
super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
- }
- public LinkedList<Tablet> moveTablets() {
- if (Objects.isNull(tablets)) {
- tablets = new ArrayList<>();
- for (final EnrichedEvent enrichedEvent : enrichedEvents) {
- if (enrichedEvent instanceof TsFileInsertionEvent) {
- onTsFileInsertionEvent((TsFileInsertionEvent) enrichedEvent);
- } else if (enrichedEvent instanceof TabletInsertionEvent) {
- onTabletInsertionEvent((TabletInsertionEvent) enrichedEvent);
- } else {
- LOGGER.warn(
- "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {}
when moving.",
- this,
- enrichedEvent);
- }
- }
- }
- final LinkedList<Tablet> result = new LinkedList<>(tablets);
- firstEventProcessingTime = Long.MIN_VALUE;
- totalBufferSize = 0;
- tablets = null; // reset to null for gc & subsequent move
- return result;
+ this.insertNodeTabletInsertionEventSizeEstimator =
+ new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
+ this.rawTabletInsertionEventSizeEstimator =
+ new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
}
/////////////////////////////// ack & clean ///////////////////////////////
@@ -97,36 +87,43 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
- if (Objects.nonNull(tablets)) {
- tablets.clear();
- }
+
+ enrichedEventsIterator = null;
+ currentTabletInsertionEventsIterator = null;
}
/////////////////////////////// utility ///////////////////////////////
@Override
protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
- constructBatch(event);
+ // update processing time
if (firstEventProcessingTime == Long.MIN_VALUE) {
firstEventProcessingTime = System.currentTimeMillis();
}
+
+ // update buffer size
+ // TODO: more precise computation
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ totalBufferSize += getEstimatedInsertNodeTabletInsertionEventSize();
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ totalBufferSize += getEstimatedRawTabletInsertionEventSize();
+ }
}
@Override
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
- for (final TabletInsertionEvent tabletInsertionEvent :
- ((PipeTsFileInsertionEvent) event)
-
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())) {
- onTabletInsertionEvent(tabletInsertionEvent);
+ // update processing time
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
}
+
+ // update buffer size
+ // TODO: more precise computation
+ totalBufferSize += ((PipeTsFileInsertionEvent) event).getTsFile().length();
}
@Override
protected List<SubscriptionEvent> generateSubscriptionEvents() {
- if (tablets.isEmpty()) {
- return null;
- }
-
return Collections.singletonList(
new SubscriptionEvent(this,
prefetchingQueue.generateSubscriptionCommitContext()));
}
@@ -137,25 +134,21 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
|| System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
- private void constructBatch(final TabletInsertionEvent event) {
- final List<Tablet> currentTablets = convertToTablets(event);
- if (currentTablets.isEmpty()) {
- return;
- }
- tablets.addAll(currentTablets);
- totalBufferSize +=
- currentTablets.stream()
- .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
- .reduce(Long::sum)
- .orElse(0L);
- }
-
private List<Tablet> convertToTablets(final TabletInsertionEvent
tabletInsertionEvent) {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- return ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).convertToTablets();
+ final List<Tablet> tablets =
+ ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).convertToTablets();
+ updateEstimatedInsertNodeTabletInsertionEventSize(
+ tablets.stream()
+ .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+ .reduce(Long::sum)
+ .orElse(0L));
+ return tablets;
} else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- return Collections.singletonList(
- ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).convertToTablet());
+ final Tablet tablet = ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).convertToTablet();
+ updateEstimatedRawTabletInsertionEventSize(
+ PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
+ return Collections.singletonList(tablet);
}
LOGGER.warn(
@@ -165,6 +158,96 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
return Collections.emptyList();
}
+ /////////////////////////////// estimator ///////////////////////////////
+
+ private long getEstimatedInsertNodeTabletInsertionEventSize() {
+ return Math.max(100L, (long)
insertNodeTabletInsertionEventSizeEstimator.getOneMinuteRate());
+ }
+
+ private void updateEstimatedInsertNodeTabletInsertionEventSize(final long
size) {
+ insertNodeTabletInsertionEventSizeEstimator.mark(size);
+ }
+
+ private long getEstimatedRawTabletInsertionEventSize() {
+ return Math.max(100L, (long)
rawTabletInsertionEventSizeEstimator.getOneMinuteRate());
+ }
+
+ private void updateEstimatedRawTabletInsertionEventSize(final long size) {
+ rawTabletInsertionEventSizeEstimator.mark(size);
+ }
+
+ /////////////////////////////// iterator ///////////////////////////////
+
+ public void resetIterator() {
+ enrichedEventsIterator = enrichedEvents.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+ if (currentTabletInsertionEventsIterator.hasNext()) {
+ return true;
+ } else {
+ // reset
+ currentTabletInsertionEventsIterator = null;
+ return false;
+ }
+ }
+
+ if (Objects.isNull(enrichedEventsIterator)) {
+ return false;
+ }
+
+ if (enrichedEventsIterator.hasNext()) {
+ return true;
+ } else {
+ // reset
+ enrichedEventsIterator = null;
+ return false;
+ }
+ }
+
+ @Override
+ public List<Tablet> next() {
+ if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+ if (currentTabletInsertionEventsIterator.hasNext()) {
+ return convertToTablets(currentTabletInsertionEventsIterator.next());
+ } else {
+ currentTabletInsertionEventsIterator = null;
+ }
+ }
+
+ if (Objects.isNull(enrichedEventsIterator)) {
+ return null;
+ }
+
+ if (!enrichedEventsIterator.hasNext()) {
+ return null;
+ }
+
+ final EnrichedEvent enrichedEvent = enrichedEventsIterator.next();
+ if (enrichedEvent instanceof TsFileInsertionEvent) {
+ if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
+ LOGGER.warn(
+ "SubscriptionPipeTabletEventBatch {} override non-null
currentTabletInsertionEventsIterator when iterating (broken invariant).",
+ this);
+ }
+ currentTabletInsertionEventsIterator =
+ ((PipeTsFileInsertionEvent) enrichedEvent)
+
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
+ .iterator();
+ return next();
+ } else if (enrichedEvent instanceof TabletInsertionEvent) {
+ return convertToTablets((TabletInsertionEvent) enrichedEvent);
+ } else {
+ LOGGER.warn(
+ "SubscriptionPipeTabletEventBatch {} ignore EnrichedEvent {} when
iterating (broken invariant).",
+ this,
+ enrichedEvent);
+ return null;
+ }
+ }
+
/////////////////////////////// stringify ///////////////////////////////
@Override
@@ -175,11 +258,14 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
protected Map<String, String> coreReportMessage() {
final Map<String, String> coreReportMessage = super.coreReportMessage();
- coreReportMessage.put(
- "size of tablets",
- (Objects.nonNull(tablets) ? String.valueOf(tablets.size()) :
"<unknown>"));
coreReportMessage.put("firstEventProcessingTime",
String.valueOf(firstEventProcessingTime));
coreReportMessage.put("totalBufferSize", String.valueOf(totalBufferSize));
+ coreReportMessage.put(
+ "estimatedInsertNodeTabletInsertionEventSize",
+ String.valueOf(getEstimatedInsertNodeTabletInsertionEventSize()));
+ coreReportMessage.put(
+ "estimatedRawTabletInsertionEventSize",
+ String.valueOf(getEstimatedRawTabletInsertionEventSize()));
return coreReportMessage;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index e2590f5ff32..61cce24294d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -33,11 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/**
* The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -56,7 +54,6 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
private final SubscriptionPipeTabletEventBatch batch;
private final SubscriptionCommitContext commitContext;
- private volatile LinkedList<Tablet> tablets;
private volatile int tabletsSize;
private final AtomicInteger nextOffset = new AtomicInteger(0);
@@ -65,21 +62,17 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
this.batch = batch;
this.commitContext = commitContext;
- init(batch);
+ init();
}
@Override
public void prefetchRemainingResponses() {
- if (hasNoMore) {
- return;
- }
-
- offer(generateNextTabletResponse());
+ // do nothing
}
@Override
public void fetchNextResponse(final long offset /* unused */) {
- prefetchRemainingResponses();
+ offer(generateNextTabletResponse());
if (Objects.isNull(poll())) {
LOGGER.warn(
"SubscriptionEventTabletResponse {} is empty when fetching next
response (broken invariant)",
@@ -94,21 +87,20 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
return;
}
cleanUp();
- init(batch);
+ init();
}
@Override
public synchronized void cleanUp() {
super.cleanUp();
- tablets = null;
tabletsSize = 0;
nextOffset.set(0);
}
/////////////////////////////// utility ///////////////////////////////
- private void init(final SubscriptionPipeTabletEventBatch batch) {
+ private void init() {
if (!isEmpty()) {
LOGGER.warn(
"SubscriptionEventTabletResponse {} is not empty when initializing
(broken invariant)",
@@ -116,42 +108,46 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
return;
}
- tablets = batch.moveTablets();
- tabletsSize = tablets.size();
+ batch.resetIterator();
offer(generateNextTabletResponse());
}
private synchronized CachedSubscriptionPollResponse
generateNextTabletResponse() {
final List<Tablet> currentTablets = new ArrayList<>();
- final AtomicLong currentTotalBufferSize = new AtomicLong();
+ long currentBufferSize = 0;
+
+ while (batch.hasNext()) {
+ final List<Tablet> tablets = batch.next();
+ if (Objects.isNull(tablets)) {
+ continue;
+ }
+
+ currentTablets.addAll(tablets);
+ final long bufferSize =
+ tablets.stream()
+ .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
+ .reduce(Long::sum)
+ .orElse(0L);
+ tabletsSize += tablets.size();
- Tablet currentTablet;
- while (!tablets.isEmpty() && Objects.nonNull(currentTablet =
tablets.removeFirst())) {
- final long bufferSize =
PipeMemoryWeightUtil.calculateTabletSizeInBytes(currentTablet);
if (bufferSize > READ_TABLET_BUFFER_SIZE) {
- LOGGER.warn("Detect large tablet with {} byte(s).", bufferSize);
- tablets.addAll(currentTablets); // re-enqueue previous tablets
- currentTablets.clear();
- currentTotalBufferSize.set(0);
+ // TODO: split tablets
+ LOGGER.warn("Detect large tablets with {} byte(s).", bufferSize);
return new CachedSubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(
- Collections.singletonList(currentTablet),
nextOffset.incrementAndGet()),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
commitContext);
}
- if (currentTotalBufferSize.get() + bufferSize > READ_TABLET_BUFFER_SIZE)
{
- final CachedSubscriptionPollResponse response =
- new CachedSubscriptionPollResponse(
- SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
- commitContext);
- tablets.add(currentTablet); // re-enqueue current tablet
- currentTablets.clear();
- currentTotalBufferSize.set(0);
- return response;
+
+ if (currentBufferSize + bufferSize > READ_TABLET_BUFFER_SIZE) {
+ // TODO: split tablets
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
+ commitContext);
}
- currentTablets.add(currentTablet);
- currentTotalBufferSize.addAndGet(bufferSize);
+
+ currentBufferSize += bufferSize;
}
final CachedSubscriptionPollResponse response;
@@ -169,8 +165,7 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
new TabletsPayload(new ArrayList<>(currentTablets),
nextOffset.incrementAndGet()),
commitContext);
}
- currentTablets.clear();
- currentTotalBufferSize.set(0);
+
return response;
}
}