This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0dc583e9c59 Pipe: Optimize TsFile parsing logic (#16173)
0dc583e9c59 is described below
commit 0dc583e9c59433845d715351b6fa746a92abb5be
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Aug 18 14:39:51 2025 +0800
Pipe: Optimize TsFile parsing logic (#16173)
* Pipe: Optimize TsFile parsing logic
* update
---
.../tsfile/parser/TsFileInsertionEventParser.java | 11 +-
.../query/TsFileInsertionEventQueryParser.java | 215 +++++++++---------
...ileInsertionEventQueryParserTabletIterator.java | 14 +-
.../scan/TsFileInsertionEventScanParser.java | 139 ++++++------
.../table/TsFileInsertionEventTableParser.java | 247 +++++++++++----------
...ileInsertionEventTableParserTabletIterator.java | 38 ++--
6 files changed, 354 insertions(+), 310 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 57616533b80..358103175fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
@@ -59,6 +60,8 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
protected TsFileSequenceReader tsFileSequenceReader;
+ protected Iterable<TabletInsertionEvent> tabletInsertionIterable;
+
protected TsFileInsertionEventParser(
final String pipeName,
final long creationTime,
@@ -83,9 +86,10 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
- // Allocate empty memory block, will be resized later.
this.allocatedMemoryBlockForTablet =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForTabletWithRetry(
+
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
}
/**
@@ -95,6 +99,9 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
@Override
public void close() {
+
+ tabletInsertionIterable = null;
+
try {
if (pipeName != null && !timeUsageReported) {
PipeTsFileToTabletsMetrics.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index ba07299d66b..d61f7a791ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -277,111 +277,118 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return () ->
- new Iterator<TabletInsertionEvent>() {
-
- private TsFileInsertionEventQueryParserTabletIterator tabletIterator
= null;
-
- @Override
- public boolean hasNext() {
- while (tabletIterator == null || !tabletIterator.hasNext()) {
- if (!deviceMeasurementsMapIterator.hasNext()) {
- close();
- return false;
- }
-
- final Map.Entry<IDeviceID, List<String>> entry =
deviceMeasurementsMapIterator.next();
-
- try {
- tabletIterator =
- new TsFileInsertionEventQueryParserTabletIterator(
- tsFileReader,
- measurementDataTypeMap,
- entry.getKey(),
- entry.getValue(),
- timeFilterExpression,
- allocatedMemoryBlockForTablet);
- } catch (final Exception e) {
- close();
- throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
- }
- }
-
- return true;
- }
+ if (tabletInsertionIterable == null) {
+ tabletInsertionIterable =
+ () ->
+ new Iterator<TabletInsertionEvent>() {
+
+ private TsFileInsertionEventQueryParserTabletIterator
tabletIterator = null;
+
+ @Override
+ public boolean hasNext() {
+ while (tabletIterator == null || !tabletIterator.hasNext()) {
+ if (!deviceMeasurementsMapIterator.hasNext()) {
+ close();
+ return false;
+ }
+
+ final Map.Entry<IDeviceID, List<String>> entry =
+ deviceMeasurementsMapIterator.next();
+
+ try {
+ tabletIterator =
+ new TsFileInsertionEventQueryParserTabletIterator(
+ tsFileReader,
+ measurementDataTypeMap,
+ entry.getKey(),
+ entry.getValue(),
+ timeFilterExpression,
+ allocatedMemoryBlockForTablet);
+ } catch (final Exception e) {
+ close();
+ throw new PipeException(
+ "failed to create
TsFileInsertionDataTabletIterator", e);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ final Tablet tablet = tabletIterator.next();
+ final boolean isAligned =
+ deviceIsAlignedMap.getOrDefault(
+
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
+
+ final TabletInsertionEvent next;
+ if (!hasNext()) {
+ next =
+ sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ null,
+ null,
+ null,
+ null,
+ tablet,
+ isAligned,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ true)
+ : new PipeRawTabletInsertionEvent(
+ sourceEvent.getRawIsTableModelEvent(),
+
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ isAligned,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ true);
+ close();
+ } else {
+ next =
+ sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ null,
+ null,
+ null,
+ null,
+ tablet,
+ isAligned,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ false)
+ : new PipeRawTabletInsertionEvent(
+ sourceEvent.getRawIsTableModelEvent(),
+
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ isAligned,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ false);
+ }
+ return next;
+ }
+ };
+ }
- @Override
- public TabletInsertionEvent next() {
- if (!hasNext()) {
- close();
- throw new NoSuchElementException();
- }
-
- final Tablet tablet = tabletIterator.next();
- final boolean isAligned =
- deviceIsAlignedMap.getOrDefault(
-
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
-
- final TabletInsertionEvent next;
- if (!hasNext()) {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- null,
- null,
- null,
- null,
- tablet,
- isAligned,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- true)
- : new PipeRawTabletInsertionEvent(
- sourceEvent.getRawIsTableModelEvent(),
- sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- isAligned,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- true);
- close();
- } else {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- null,
- null,
- null,
- null,
- tablet,
- isAligned,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- false)
- : new PipeRawTabletInsertionEvent(
- sourceEvent.getRawIsTableModelEvent(),
- sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- isAligned,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- false);
- }
- return next;
- }
- };
+ return tabletInsertionIterable;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
index b63fc682a1f..7c32321186e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
@@ -62,6 +62,8 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
private final PipeMemoryBlock allocatedBlockForTablet;
+ private RowRecord rowRecord;
+
TsFileInsertionEventQueryParserTabletIterator(
final TsFileReader tsFileReader,
final Map<String, TSDataType> measurementDataTypeMap,
@@ -135,16 +137,15 @@ public class
TsFileInsertionEventQueryParserTabletIterator implements Iterator<T
// Used for tree model
deviceId.toString(), schemas, 1);
tablet.initBitMaps();
- // Ignore the memory cost of tablet
-
PipeDataNodeResourceManager.memory().forceResize(allocatedBlockForTablet, 0);
return tablet;
}
boolean isFirstRow = true;
while (queryDataSet.hasNext()) {
- final RowRecord rowRecord = queryDataSet.next();
+ final RowRecord rowRecord = this.rowRecord != null ? this.rowRecord :
queryDataSet.next();
if (isFirstRow) {
// Calculate row count and memory size of the tablet based on the
first row
+ this.rowRecord = rowRecord; // Save the first row for later use
Pair<Integer, Integer> rowCountAndMemorySize =
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
tablet =
@@ -152,8 +153,11 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
// Used for tree model
deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedBlockForTablet,
rowCountAndMemorySize.getRight());
+ if (allocatedBlockForTablet.getMemoryUsageInBytes() <
rowCountAndMemorySize.getRight()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedBlockForTablet,
rowCountAndMemorySize.getRight());
+ }
+ this.rowRecord = null; // Clear the saved first row
isFirstRow = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 9dcbc01bd72..064a2f22279 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -108,7 +108,9 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
// Allocate empty memory block, will be resized later.
this.allocatedMemoryBlockForBatchData =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForTabletWithRetry(
+
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
try {
tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
@@ -134,61 +136,67 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return () ->
- new Iterator<TabletInsertionEvent>() {
-
- @Override
- public boolean hasNext() {
- return Objects.nonNull(chunkReader);
- }
-
- @Override
- public TabletInsertionEvent next() {
- if (!hasNext()) {
- close();
- throw new NoSuchElementException();
- }
-
- // currentIsAligned is initialized when
TsFileInsertionEventScanParser is constructed.
- // When the getNextTablet function is called, currentIsAligned may
be updated, causing
- // the currentIsAligned information to be inconsistent with the
current Tablet
- // information.
- final boolean isAligned = currentIsAligned;
- final Tablet tablet = getNextTablet();
- final boolean hasNext = hasNext();
- try {
- return sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- null,
- null,
- null,
- null,
- tablet,
- isAligned,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- !hasNext)
- : new PipeRawTabletInsertionEvent(
- sourceEvent.getRawIsTableModelEvent(),
- sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- isAligned,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- !hasNext);
- } finally {
- if (!hasNext) {
- close();
- }
- }
- }
- };
+ if (tabletInsertionIterable == null) {
+ tabletInsertionIterable =
+ () ->
+ new Iterator<TabletInsertionEvent>() {
+
+ @Override
+ public boolean hasNext() {
+ return Objects.nonNull(chunkReader);
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ // currentIsAligned is initialized when
TsFileInsertionEventScanParser is
+ // constructed.
+ // When the getNextTablet function is called,
currentIsAligned may be updated,
+ // causing
+ // the currentIsAligned information to be inconsistent with
the current Tablet
+ // information.
+ final boolean isAligned = currentIsAligned;
+ final Tablet tablet = getNextTablet();
+ final boolean hasNext = hasNext();
+ try {
+ return sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ null,
+ null,
+ null,
+ null,
+ tablet,
+ isAligned,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ !hasNext)
+ : new PipeRawTabletInsertionEvent(
+ sourceEvent.getRawIsTableModelEvent(),
+ sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ isAligned,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ !hasNext);
+ } finally {
+ if (!hasNext) {
+ close();
+ }
+ }
+ }
+ };
+ }
+ return tabletInsertionIterable;
}
public Iterable<Pair<Tablet, Boolean>> toTabletWithIsAligneds() {
@@ -231,8 +239,6 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
if (!data.hasCurrent()) {
tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
tablet.initBitMaps();
- // Ignore the memory cost of tablet
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
return tablet;
}
@@ -248,8 +254,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
new Tablet(
currentDevice.toString(), currentMeasurements,
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getRight());
+ if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
+ < rowCountAndMemorySize.getRight()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getRight());
+ }
isFirstRow = false;
}
@@ -272,8 +281,6 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
if (tablet == null) {
tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
tablet.initBitMaps();
- // Ignore the memory cost of tablet
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
}
// Switch chunk reader iff current chunk is all consumed
@@ -300,10 +307,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
do {
data = chunkReader.nextPageData();
- PipeDataNodeResourceManager.memory()
- .forceResize(
- allocatedMemoryBlockForBatchData,
- PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data));
+ long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
+ if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
size);
+ }
} while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
} while (!data.hasCurrent());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8011f9c3830..1ed755f5660 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -66,14 +67,15 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
super(pipeName, creationTime, null, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
try {
+ long tableSize =
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
this.allocatedMemoryBlockForChunk =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
this.allocatedMemoryBlockForBatchData =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
this.allocatedMemoryBlockForChunkMeta =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
this.allocatedMemoryBlockForTableSchemas =
-
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
this.startTime = startTime;
this.endTime = endTime;
@@ -101,122 +103,127 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return () ->
- new Iterator<TabletInsertionEvent>() {
-
- private TsFileInsertionEventTableParserTabletIterator tabletIterator;
-
- @Override
- public boolean hasNext() {
- try {
- if (tabletIterator == null) {
- tabletIterator =
- new TsFileInsertionEventTableParserTabletIterator(
- tsFileSequenceReader,
- entry ->
- (Objects.isNull(tablePattern)
- ||
tablePattern.matchesTable(entry.getKey()))
- && hasTablePrivilege(entry.getKey()),
- allocatedMemoryBlockForTablet,
- allocatedMemoryBlockForBatchData,
- allocatedMemoryBlockForChunk,
- allocatedMemoryBlockForChunkMeta,
- allocatedMemoryBlockForTableSchemas,
- startTime,
- endTime);
- }
- if (!tabletIterator.hasNext()) {
- close();
- return false;
- }
- return true;
- } catch (Exception e) {
- close();
- throw new PipeException("Error while parsing tsfile insertion
event", e);
- }
- }
-
- private boolean hasTablePrivilege(final String tableName) {
- return Objects.isNull(userName)
- || Objects.isNull(sourceEvent)
- || Objects.isNull(sourceEvent.getTableModelDatabaseName())
- || Coordinator.getInstance()
- .getAccessControl()
- .checkCanSelectFromTable4Pipe(
- userName,
- new QualifiedObjectName(
- sourceEvent.getTableModelDatabaseName(),
tableName));
- }
-
- @Override
- public TabletInsertionEvent next() {
- if (!hasNext()) {
- close();
- throw new NoSuchElementException();
- }
-
- final Tablet tablet = tabletIterator.next();
-
- final TabletInsertionEvent next;
- if (!hasNext()) {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- null,
- null,
- null,
- tablet,
- true,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- true)
- : new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- true,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- true);
- close();
- } else {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- null,
- null,
- null,
- tablet,
- true,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- false)
- : new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- true,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- false);
- }
- return next;
- }
- };
+ if (tabletInsertionIterable == null) {
+ tabletInsertionIterable =
+ () ->
+ new Iterator<TabletInsertionEvent>() {
+
+ private TsFileInsertionEventTableParserTabletIterator
tabletIterator;
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (tabletIterator == null) {
+ tabletIterator =
+ new TsFileInsertionEventTableParserTabletIterator(
+ tsFileSequenceReader,
+ entry ->
+ (Objects.isNull(tablePattern)
+ ||
tablePattern.matchesTable(entry.getKey()))
+ && hasTablePrivilege(entry.getKey()),
+ allocatedMemoryBlockForTablet,
+ allocatedMemoryBlockForBatchData,
+ allocatedMemoryBlockForChunk,
+ allocatedMemoryBlockForChunkMeta,
+ allocatedMemoryBlockForTableSchemas,
+ startTime,
+ endTime);
+ }
+ if (!tabletIterator.hasNext()) {
+ close();
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ close();
+ throw new PipeException("Error while parsing tsfile
insertion event", e);
+ }
+ }
+
+ private boolean hasTablePrivilege(final String tableName) {
+ return Objects.isNull(userName)
+ || Objects.isNull(sourceEvent)
+ ||
Objects.isNull(sourceEvent.getTableModelDatabaseName())
+ || Coordinator.getInstance()
+ .getAccessControl()
+ .checkCanSelectFromTable4Pipe(
+ userName,
+ new QualifiedObjectName(
+ sourceEvent.getTableModelDatabaseName(),
tableName));
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
+ final Tablet tablet = tabletIterator.next();
+
+ final TabletInsertionEvent next;
+ if (!hasNext()) {
+ next =
+ sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+ null,
+ null,
+ null,
+ tablet,
+ true,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ true)
+ : new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ true,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ true);
+ close();
+ } else {
+ next =
+ sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+ null,
+ null,
+ null,
+ tablet,
+ true,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ false)
+ : new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ true,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ false);
+ }
+ return next;
+ }
+ };
+ }
+
+ return tabletInsertionIterable;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 4f114bb9cbb..91f0f3baa2f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -138,8 +138,10 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
tableSchemaSize +=
tableSchemaEntry.getKey().length()
+
PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(tableSchemaEntry.getValue());
- PipeDataNodeResourceManager.memory()
- .forceResize(this.allocatedMemoryBlockForTableSchema,
tableSchemaSize);
+ if (tableSchemaSize >
allocatedMemoryBlockForTableSchema.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(this.allocatedMemoryBlockForTableSchema,
tableSchemaSize);
+ }
}
filteredTableSchemaIterator = tableSchemaList.iterator();
@@ -158,10 +160,11 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
case INIT_DATA:
if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
batchData = chunkReader.nextPageData();
- PipeDataNodeResourceManager.memory()
- .forceResize(
- allocatedMemoryBlockForBatchData,
-
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData));
+ final long size =
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData);
+ if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() <
size) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForBatchData, size);
+ }
state = State.CHECK_DATA;
break;
}
@@ -204,8 +207,10 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
size +=
PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata);
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunkMeta, size);
+ if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() <
size) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunkMeta, size);
+ }
}
deviceID = pair.getLeft();
@@ -284,8 +289,11 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
// Calculate row count and memory size of the tablet based on the
first row
final Pair<Integer, Integer> rowCountAndMemorySize =
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData);
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getLeft());
+ if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
+ < rowCountAndMemorySize.getRight()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getRight());
+ }
tablet =
new Tablet(
@@ -313,7 +321,6 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
}
if (isFirstRow) {
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
tablet = new Tablet(tableName, measurementList, dataTypeList,
columnTypes, 0);
tablet.initBitMaps();
}
@@ -326,7 +333,10 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
if (Objects.isNull(timeChunk)) {
timeChunk = reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
timeChunkSize =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
timeChunkSize);
+ if (allocatedMemoryBlockForChunk.getMemoryUsageInBytes() <
timeChunkSize) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk, timeChunkSize);
+ }
}
timeChunk.getData().rewind();
long size = timeChunkSize;
@@ -360,7 +370,9 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
+ if (allocatedMemoryBlockForChunk.getMemoryUsageInBytes() < size) {
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
+ }
valueChunkList.add(chunk);
}