This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1244ac1ccbf36ed6c5a92f73c1531d2be7dc094e Author: Caideyipi <[email protected]> AuthorDate: Thu Jul 31 20:00:21 2025 +0800 Pipe: Optimized the table model writing latency by batching & Fixed the NPE caused by tablet event sorting (cherry picked from commit 6fd3870b90a0b92487b6bdd59dacdc7db59e87a8) --- .../evolvable/batch/PipeTabletEventPlainBatch.java | 130 ++++++++++++++------- .../request/PipeTransferTabletBatchReqV2.java | 48 +++++++- .../sink/util/sorter/PipeTabletEventSorter.java | 2 +- 3 files changed, 137 insertions(+), 43 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index 31b10736499..a68c5ae9602 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -22,14 +22,17 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; -import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; @@ -38,7 +41,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @@ -51,13 +54,13 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { private final List<String> insertNodeDataBases = new ArrayList<>(); private final List<String> tabletDataBases = new ArrayList<>(); + // database -> tableName -> Pair<size, tablets to batch> + private final Map<String, Map<String, Pair<Integer, List<Tablet>>>> tableModelTabletMap = + new HashMap<>(); + // Used to rate limit when transferring data private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new HashMap<>(); - PipeTabletEventPlainBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { - super(maxDelayInMs, requestMaxBatchSizeInBytes, null); - } - PipeTabletEventPlainBatch( final int maxDelayInMs, final long requestMaxBatchSizeInBytes, @@ -66,9 +69,8 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { } @Override - protected boolean constructBatch(final TabletInsertionEvent event) - throws WALPipeException, IOException { - final int bufferSize = buildTabletInsertionBuffer(event); + protected boolean constructBatch(final TabletInsertionEvent event) throws IOException { + final long bufferSize = buildTabletInsertionBuffer(event); totalBufferSize += bufferSize; pipe2BytesAccumulated.compute( new Pair<>( @@ -89,11 +91,45 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { binaryDataBases.clear(); insertNodeDataBases.clear(); tabletDataBases.clear(); + tableModelTabletMap.clear(); pipe2BytesAccumulated.clear(); } public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException { + for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> insertTablets : + tableModelTabletMap.entrySet()) { + final String databaseName = insertTablets.getKey(); + for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry : + insertTablets.getValue().entrySet()) { + final List<Tablet> batchTablets = new ArrayList<>(); + for (final Tablet tablet : tabletEntry.getValue().getRight()) { + boolean success = false; + for (final Tablet batchTablet : batchTablets) { + if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) { + success = true; + break; + } + } + if (!success) { + batchTablets.add(tablet); + } + } + for (final Tablet batchTablet : batchTablets) { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + batchTablet.serialize(outputStream); + ReadWriteIOUtils.write(true, outputStream); + tabletBuffers.add( + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); + } + tabletDataBases.add(databaseName); + } + } + } + + tableModelTabletMap.clear(); + return PipeTransferTabletBatchReqV2.toTPipeTransferReq( binaryBuffers, insertNodeBuffers, @@ -111,57 +147,71 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { return pipe2BytesAccumulated; } - private int buildTabletInsertionBuffer(final TabletInsertionEvent event) - throws IOException, WALPipeException { - int databaseEstimateSize = 0; + private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws IOException { + long estimateSize = 0; final ByteBuffer buffer; if (event instanceof PipeInsertNodeTabletInsertionEvent) { final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) event; - // Read the bytebuffer from the wal file and transfer it directly without serializing or - // deserializing if possible final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); - if (Objects.isNull(insertNode)) { - buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer(); - binaryBuffers.add(buffer); - if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - databaseEstimateSize = - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length(); - binaryDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); - } else { - databaseEstimateSize = 4; - binaryDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); - } - } else { + if (!(insertNode instanceof RelationalInsertTabletNode)) { buffer = insertNode.serializeToByteBuffer(); insertNodeBuffers.add(buffer); if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - databaseEstimateSize = - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length(); + estimateSize = + RamUsageEstimator.sizeOf( + pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); } else { - databaseEstimateSize = 4; + estimateSize = 4; insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); } + estimateSize += buffer.limit(); + } else { + for (final Tablet tablet : + ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) { + estimateSize += + constructTabletBatch( + tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + } } } else { final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) event; - try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); - final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream); - ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream); - buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); - } - tabletBuffers.add(buffer); if (pipeRawTabletInsertionEvent.isTableModelEvent()) { - databaseEstimateSize = pipeRawTabletInsertionEvent.getTableModelDatabaseName().length(); - tabletDataBases.add(pipeRawTabletInsertionEvent.getTableModelDatabaseName()); + estimateSize = + constructTabletBatch( + pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.getTableModelDatabaseName()); } else { - databaseEstimateSize = 4; + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream); + ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream); + buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + estimateSize = 4 + buffer.limit(); + tabletBuffers.add(buffer); tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER); } } - return buffer.limit() + databaseEstimateSize; + + return estimateSize; + } + + private long constructTabletBatch(final Tablet tablet, final String databaseName) { + final AtomicLong size = new AtomicLong(0); + final Pair<Integer, List<Tablet>> currentBatch = + tableModelTabletMap + .computeIfAbsent( + databaseName, + k -> { + size.addAndGet(RamUsageEstimator.sizeOf(databaseName)); + return new HashMap<>(); + }) + .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>())); + currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize()); + currentBatch.getRight().add(tablet); + return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java index b7d7d44db85..d9c3fabfae8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java @@ -39,7 +39,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { @@ -62,6 +64,8 @@ public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { final List<InsertRowStatement> insertRowStatementList = new ArrayList<>(); final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); + final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap = + new HashMap<>(); for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) { final InsertBaseStatement statement = binaryReq.constructStatement(); @@ -69,7 +73,22 @@ public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { continue; } if (statement.isWriteToTable()) { - statements.add(statement); + if (statement instanceof InsertRowStatement) { + tableModelDatabaseInsertRowStatementMap + .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) + .add((InsertRowStatement) statement); + } else if (statement instanceof InsertTabletStatement) { + statements.add(statement); + } else if (statement instanceof InsertRowsStatement) { + tableModelDatabaseInsertRowStatementMap + .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) + .addAll(((InsertRowsStatement) statement).getInsertRowStatementList()); + } else { + throw new UnsupportedOperationException( + String.format( + "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.", + binaryReq)); + } continue; } if (statement instanceof InsertRowStatement) { @@ -93,7 +112,22 @@ public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { continue; } if (statement.isWriteToTable()) { - statements.add(statement); + if (statement instanceof InsertRowStatement) { + tableModelDatabaseInsertRowStatementMap + .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) + .add((InsertRowStatement) statement); + } else if (statement instanceof InsertTabletStatement) { + statements.add(statement); + } else if (statement instanceof InsertRowsStatement) { + tableModelDatabaseInsertRowStatementMap + .computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>()) + .addAll(((InsertRowsStatement) statement).getInsertRowStatementList()); + } else { + throw new UnsupportedOperationException( + String.format( + "unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.", + insertNodeReq)); + } continue; } if (statement instanceof InsertRowStatement) { @@ -131,6 +165,16 @@ public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { if (!insertMultiTabletsStatement.isEmpty()) { statements.add(insertMultiTabletsStatement); } + + for (final Map.Entry<String, List<InsertRowStatement>> insertRows : + tableModelDatabaseInsertRowStatementMap.entrySet()) { + final InsertRowsStatement statement = new InsertRowsStatement(); + statement.setWriteToTable(true); + statement.setDatabaseName(insertRows.getKey()); + statement.setInsertRowStatementList(insertRows.getValue()); + statements.add(statement); + } + return statements; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java index 20847173ffc..c9857c9eaba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java @@ -153,7 +153,7 @@ public class PipeTabletEventSorter { private int getLastNonnullIndex( final int i, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) { if (deDuplicatedIndex == null) { - if (originalBitMap.isMarked(index[i])) { + if (originalBitMap != null && originalBitMap.isMarked(index[i])) { deDuplicatedBitMap.mark(i); } return index[i];
