This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch pipe-dynamic-memory-allocation in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b8a28f54586309e3d5fa09ac77672be0f0a4815a Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 29 12:33:11 2026 +0800 Improve pipe runtime memory allocation --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 5 +- .../tsfile/parser/TsFileInsertionEventParser.java | 5 +- .../scan/TsFileInsertionEventScanParser.java | 8 +- .../table/TsFileInsertionEventTableParser.java | 21 +--- .../evolvable/batch/PipeTabletEventBatch.java | 70 ++++++++++--- .../evolvable/batch/PipeTabletEventPlainBatch.java | 51 ++++++--- .../batch/PipeTabletEventTsFileBatch.java | 30 +++--- .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 18 +++- .../iotconsensusv2/IoTConsensusV2SyncSink.java | 20 +++- .../IoTConsensusV2TsFileInsertionEventHandler.java | 34 +++++- .../IoTConsensusV2TransferBatchReqBuilder.java | 116 +++++++++++++-------- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 17 ++- .../async/handler/PipeTransferTsFileHandler.java | 42 +++++--- .../thrift/sync/IoTDBDataRegionSyncSink.java | 88 ++++++++++++++++ 14 files changed, 378 insertions(+), 147 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 3509e6b29ce..21fc71da969 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -794,9 +794,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { long needMemory = 0; - needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); - needMemory += calculateSinkBatchMemory(sinkParameters); - needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); + // TsFile parser, sink batch, and TsFile read buffer memory are allocated dynamically + // from PipeMemoryManager only while they are active. needMemory += calculateAssignerMemory(sourceParameters); PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 627731fa7ed..e53e0a94827 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; @@ -109,9 +108,7 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { this.sourceEvent = sourceEvent; this.allocatedMemoryBlockForTablet = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); LOGGER.debug( DataNodePipeMessages.TSFILE_HAS_INITIALIZED_PIPENAME_CREATION_TIME_PATTERN, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index e3af5aaa0c1..9f6ccf728d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -144,12 +143,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { filter = Objects.nonNull(timeFilterExpression) ? timeFilterExpression.getFilter() : null; this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); try { currentModifications = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 8ecdcc0cec5..7c80e44cc1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -22,11 +22,9 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table; import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -94,25 +92,14 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser allocatedMemoryBlockForModifications = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); - long tableSize = - Math.min( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); - this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunkMeta = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForTableSchemas = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance() - .getConfig() - .getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.startTime = startTime; this.endTime = endTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index aede0e994d9..a751f798a0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -61,8 +61,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { // limit in buffer size this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes; - this.allocatedMemoryBlock = - PipeDataNodeResourceManager.memory().forceAllocate(requestMaxBatchSizeInBytes); + this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); if (recordMetric != null) { this.recordMetric = recordMetric; } else { @@ -97,6 +96,10 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { events.add((EnrichedEvent) event); } } catch (final Exception e) { + if (events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } // If the event is not added to the batch, we need to decrease the reference count. ((EnrichedEvent) event) .decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false); @@ -126,7 +129,28 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { protected abstract boolean constructBatch(final TabletInsertionEvent event) throws WALPipeException, IOException; + protected void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; + } + + final long newTotalBufferSize = totalBufferSize + bufferSize; + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, maxBatchSizeInBytes)); + totalBufferSize = newTotalBufferSize; + } + + protected void releaseAllocatedMemoryBlock() { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); + } + + protected void clearBatchData() {} + public boolean shouldEmit() { + if (events.isEmpty()) { + return false; + } + final long diff = System.currentTimeMillis() - firstEventProcessingTime; if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) { recordMetric.accept(diff, totalBufferSize, events.size()); @@ -138,23 +162,26 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { public synchronized void onSuccess() { events.clear(); - totalBufferSize = 0; - - firstEventProcessingTime = Long.MIN_VALUE; + resetMemoryUsage(); } @Override public synchronized void close() { + if (isClosed) { + return; + } isClosed = true; clearEventsReferenceCount(PipeTabletEventBatch.class.getName()); events.clear(); + clearBatchData(); + resetMemoryUsage(); allocatedMemoryBlock.close(); } /** - * Discard all events of the given pipe. This method only clears the reference count of the events - * and discard them, but do not modify other objects (such as buffers) for simplicity. + * Discard all events of the given pipe. This method only clears the reference count of the + * events. If some events remain, cached batch data is kept unchanged for simplicity. */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { @@ -162,14 +189,27 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { } public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { - events.removeIf( - event -> { - if (isEventFromPipe(event, committerKey)) { - event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); - return true; - } - return false; - }); + final boolean hasDiscardedEvents = + events.removeIf( + event -> { + if (isEventFromPipe(event, committerKey)) { + event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return true; + } + return false; + }); + if (hasDiscardedEvents && events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } + } + + private void resetMemoryUsage() { + totalBufferSize = 0; + + releaseAllocatedMemoryBlock(); + + firstEventProcessingTime = Long.MIN_VALUE; } private static boolean isEventFromPipe( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index b32479e2f1a..05b348f3237 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -74,7 +74,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @Override protected boolean constructBatch(final TabletInsertionEvent event) throws IOException { final long bufferSize = buildTabletInsertionBuffer(event); - totalBufferSize += bufferSize; pipe2BytesAccumulated.compute( new Pair<>( ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) event).getCreationTime()), @@ -85,8 +84,13 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { insertNodeBuffers.clear(); tabletBuffers.clear(); @@ -161,24 +165,21 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); if (!(insertNode instanceof RelationalInsertTabletNode)) { buffer = insertNode.serializeToByteBuffer(); + final String databaseName = + pipeInsertNodeTabletInsertionEvent.isTableModelEvent() + ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); insertNodeBuffers.add(buffer); - if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - final String databaseName = - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } else { - final String databaseName = pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } - estimateSize += buffer.limit(); + insertNodeDataBases.add(databaseName); } else { - for (final Tablet tablet : - ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) { - estimateSize += - constructTabletBatch( - tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + final List<Tablet> tablets = pipeInsertNodeTabletInsertionEvent.convertToTablets(); + estimateSize = calculateTabletsSizeInBytes(tablets); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + for (final Tablet tablet : tablets) { + constructTabletBatchWithoutMemoryReservation( + tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); } } } else { @@ -198,6 +199,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { } final String databaseName = pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); tabletBuffers.add(buffer); tabletDataBases.add(databaseName); } @@ -207,12 +209,27 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { } private long constructTabletBatch(final Tablet tablet, final String databaseName) { + final long estimateSize = calculateTabletSizeInBytes(tablet); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + constructTabletBatchWithoutMemoryReservation(tablet, databaseName); + return estimateSize; + } + + private void constructTabletBatchWithoutMemoryReservation( + final Tablet tablet, final String databaseName) { final Pair<Integer, List<Tablet>> currentBatch = tableModelTabletMap .computeIfAbsent(databaseName, k -> new HashMap<>()) .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>())); currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize()); currentBatch.getRight().add(tablet); + } + + private long calculateTabletsSizeInBytes(final List<Tablet> tablets) { + return tablets.stream().mapToLong(PipeTabletEventPlainBatch::calculateTabletSizeInBytes).sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 7b511e23fc6..053b42b2c78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -86,6 +86,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { (PipeInsertNodeTabletInsertionEvent) event; final boolean isTableModel = insertNodeTabletInsertionEvent.isTableModelEvent(); final List<Tablet> tablets = insertNodeTabletInsertionEvent.convertToTablets(); + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletsSizeInBytes(tablets)); for (int i = 0; i < tablets.size(); ++i) { final Tablet tablet = tablets.get(i); if (isTabletEmpty(tablet)) { @@ -114,6 +115,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { if (isTabletEmpty(tablet)) { return true; } + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletSizeInBytes(tablet)); if (rawTabletInsertionEvent.isTableModelEvent()) { // table Model bufferTableModelTablet( @@ -139,6 +141,17 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { return true; } + private long calculateTabletsSizeInBytes(final List<Tablet> tablets) { + return tablets.stream() + .filter(tablet -> !isTabletEmpty(tablet)) + .mapToLong(PipeTabletEventTsFileBatch::calculateTabletSizeInBytes) + .sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { + return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; + } + private void bufferTreeModelTablet( final String pipeName, final long creationTime, @@ -146,11 +159,6 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { final boolean isAligned) { new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); - // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses PipeTreeModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -162,11 +170,6 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { final String pipeName, final long creationTime, final Tablet tablet, final String dataBase) { new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); - // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses PipeTableModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -209,8 +212,13 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { pipeName2WeightMap.clear(); tableModeTsFileBuilder.onSuccess(); treeModeTsFileBuilder.onSuccess(); @@ -220,8 +228,6 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { public synchronized void close() { super.close(); - pipeName2WeightMap.clear(); - tableModeTsFileBuilder.close(); treeModeTsFileBuilder.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 4f2dab1bfa8..92a8c731fbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -33,6 +33,8 @@ import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; @@ -497,10 +499,13 @@ public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { final AirGapSocket socket, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); @@ -532,6 +537,11 @@ public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min((long) PIPE_CONFIG.getPipeSinkReadFileBufferSize(), Math.max(file.length(), 1L)); + } + private boolean sendBatch( final AirGapSocket socket, byte[] bytes, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 481e340a739..9c3104c0ad1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -39,6 +39,8 @@ import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; @@ -435,10 +437,13 @@ public class IoTConsensusV2SyncSink extends IoTDBSink { final TCommitId tCommitId, final TConsensusGroupId tConsensusGroupId) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { final int readLength = reader.read(readBuffer); if (readLength == -1) { @@ -501,6 +506,13 @@ public class IoTConsensusV2SyncSink extends IoTDBSink { } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + private TEndPoint getFollowerUrl() { // In current iotConsensusV2 design, one connector corresponds to one follower, so the peers is // actually a singleton list diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java index 4e269aaa7e8..52815e645bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java @@ -31,6 +31,8 @@ import org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferR import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; @@ -70,7 +72,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler private final boolean transferMod; private final int readFileBufferSize; - private final byte[] readBuffer; + private PipeTsFileMemoryBlock memoryBlock; + private byte[] readBuffer; private long position; private RandomAccessFile reader; @@ -106,8 +109,15 @@ public class IoTConsensusV2TsFileInsertionEventHandler transferMod = event.isWithMod(); currentFile = transferMod ? modFile : tsFile; - readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - readBuffer = new byte[readFileBufferSize]; + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); + readFileBufferSize = + (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; reader = @@ -128,6 +138,12 @@ public class IoTConsensusV2TsFileInsertionEventHandler this.client = client; client.setShouldReturnSelf(false); + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + final int readLength = reader.read(readBuffer); if (readLength == -1) { if (currentFile == modFile) { @@ -246,6 +262,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler client.returnSelf(); } + releaseReadBufferMemoryBlock(); + long duration = System.nanoTime() - createTime; metric.recordConnectorTsFileTransferTimer(duration); } @@ -330,10 +348,20 @@ public class IoTConsensusV2TsFileInsertionEventHandler connector.addFailureEventToRetryQueue(event); metric.recordRetryCounter(); + releaseReadBufferMemoryBlock(); + if (client != null) { client.setShouldReturnSelf(true); client.returnSelf(); } } } + + private void releaseReadBufferMemoryBlock() { + if (memoryBlock != null) { + memoryBlock.close(); + memoryBlock = null; + readBuffer = null; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java index 677c77e0540..b55cb1233f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java @@ -70,6 +70,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose protected long firstEventProcessingTime = Long.MIN_VALUE; // limit in buffer size + protected final long maxBatchSizeInBytes; protected final PipeMemoryBlock allocatedMemoryBlock; protected long totalBufferSize = 0; @@ -92,37 +93,12 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose this.consensusGroupId = consensusGroupId; this.thisDataNodeId = thisDataNodeId; - final long requestMaxBatchSizeInBytes = + maxBatchSizeInBytes = parameters.getLongOrDefault( Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - allocatedMemoryBlock = - PipeDataNodeResourceManager.memory() - .tryAllocate(requestMaxBatchSizeInBytes) - .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0)) - .setShrinkCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_SHRUNK_FROM, - oldMemory, - newMemory)) - .setExpandMethod( - oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes)) - .setExpandCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_EXPANDED_FROM, - oldMemory, - newMemory)); - - if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) { - LOGGER.info( - "IoTConsensusV2TransferBatchReqBuilder: the max batch size is adjusted from {} to {} due to the " - + "memory restriction", - requestMaxBatchSizeInBytes, - getMaxBatchSizeInBytes()); - } + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); } /** @@ -137,27 +113,80 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose return false; } - final long requestCommitId = ((EnrichedEvent) event).getReplicateIndexForIoTV2(); + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + final long requestCommitId = enrichedEvent.getReplicateIndexForIoTV2(); // The deduplication logic here is to avoid the accumulation of the same event in a batch when // retrying. if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { - events.add((EnrichedEvent) event); - requestCommitIds.add(requestCommitId); - final int bufferSize = buildTabletInsertionBuffer(event); - - ((EnrichedEvent) event) - .increaseReferenceCount(IoTConsensusV2TransferBatchReqBuilder.class.getName()); + if (!enrichedEvent.increaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName())) { + LOGGER.warn(DataNodePipeMessages.CANNOT_INCREASE_REFERENCE_COUNT_FOR_EVENT_IGNORE, event); + return shouldEmit(); + } - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); + final int previousEventsSize = events.size(); + final int previousRequestCommitIdsSize = requestCommitIds.size(); + final int previousBatchReqsSize = batchReqs.size(); + try { + events.add(enrichedEvent); + requestCommitIds.add(requestCommitId); + final int bufferSize = buildTabletInsertionBuffer(event); + increaseTotalBufferSizeAndUpdateMemoryBlock(bufferSize); + + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } + } catch (final Exception e) { + rollbackTo(previousEventsSize, previousRequestCommitIdsSize, previousBatchReqsSize); + if (events.isEmpty()) { + resetMemoryUsage(); + } + enrichedEvent.decreaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName(), false); + throw e; } + } + + return shouldEmit(); + } - totalBufferSize += bufferSize; + private boolean shouldEmit() { + return !events.isEmpty() + && (totalBufferSize >= getMaxBatchSizeInBytes() + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs); + } + + private void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; } - return totalBufferSize >= getMaxBatchSizeInBytes() - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + final long newTotalBufferSize = totalBufferSize + bufferSize; + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, getMaxBatchSizeInBytes())); + totalBufferSize = newTotalBufferSize; + } + + private void rollbackTo( + final int previousEventsSize, + final int previousRequestCommitIdsSize, + final int previousBatchReqsSize) { + while (events.size() > previousEventsSize) { + events.remove(events.size() - 1); + } + while (requestCommitIds.size() > previousRequestCommitIdsSize) { + requestCommitIds.remove(requestCommitIds.size() - 1); + } + while (batchReqs.size() > previousBatchReqsSize) { + batchReqs.remove(batchReqs.size() - 1); + } + } + + private void resetMemoryUsage() { + firstEventProcessingTime = Long.MIN_VALUE; + totalBufferSize = 0; + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); } public synchronized void onSuccess() { @@ -166,9 +195,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose events.clear(); requestCommitIds.clear(); - firstEventProcessingTime = Long.MIN_VALUE; - - totalBufferSize = 0; + resetMemoryUsage(); } public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() throws IOException { @@ -176,7 +203,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose } protected long getMaxBatchSizeInBytes() { - return allocatedMemoryBlock.getMemoryUsageInBytes(); + return maxBatchSizeInBytes; } public boolean isEmpty() { @@ -220,6 +247,9 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName()); } } + batchReqs.clear(); + events.clear(); + requestCommitIds.clear(); allocatedMemoryBlock.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index c0a9eb6a79d..735e6c48dbb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,8 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -417,8 +419,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector { long position = 0; // Try small piece to rebase the file position. - final byte[] buffer = new byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()]; - try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final byte[] buffer = new byte[readFileBufferSize]; while (true) { final int dataLength = randomAccessFile.read(buffer); if (dataLength == -1) { @@ -456,6 +462,13 @@ public class IoTDBLegacyPipeSink implements PipeConnector { } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + @Override public void close() throws Exception { if (client != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 2467c3ce143..27cb81d6cda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -124,11 +124,15 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { // the memory of the TsFile event is not released, so the memory is not enough for slicing. This // will cause a deadlock. waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); // 20 - 40 seconds + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); readFileBufferSize = (int) Math.min( - PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), - transferMod ? Math.max(tsFile.length(), modFile.length()) : tsFile.length()); + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; isSealSignalSent = new AtomicBoolean(false); @@ -142,21 +146,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { final IoTDBDataNodeAsyncClientManager clientManager, final AsyncPipeDataTransferServiceClient client) throws TException, IOException { - // Delay creation of resources to avoid OOM or too many open files - if (readBuffer == null) { - memoryBlock = - PipeDataNodeResourceManager.memory() - .forceAllocateForTsFileWithRetry( - PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled() - ? readFileBufferSize - : 0); - readBuffer = new byte[readFileBufferSize]; - } - - if (reader == null) { - reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); - } - this.clientManager = clientManager; this.client = client; @@ -173,6 +162,17 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { return; } + // Delay creation of resources to avoid OOM or too many open files + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + + if (reader == null) { + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); + } + client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); @@ -256,6 +256,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { super.onComplete(response); } finally { if (sink.isClosed()) { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -319,6 +320,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { referenceCount); } + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } @@ -361,6 +363,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { try { super.onError(exception); } finally { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -412,6 +415,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_FILE_READER_OR_DELETE, e); } finally { try { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { @@ -473,10 +477,14 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { @Override public void close() { super.close(); + releaseReadBufferMemoryBlock(); + } + private void releaseReadBufferMemoryBlock() { if (memoryBlock != null) { memoryBlock.close(); memoryBlock = null; + readBuffer = null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index eb7d39864c4..7775092c270 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -22,10 +22,12 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -36,6 +38,8 @@ import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; @@ -71,6 +75,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Collections; @@ -593,6 +598,89 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_FILE, tsFile); } + @Override + protected void transferFilePieces( + final Map<Pair<String, Long>, Double> pipe2WeightMap, + final File file, + final Pair<IoTDBSyncClient, Boolean> clientAndStatus, + final boolean isMultiFile) + throws PipeException, IOException { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; + while (true) { + mayLimitRateAndRecordIO(readFileBufferSize); + final int readLength = reader.read(readBuffer); + if (readLength == -1) { + break; + } + + final byte[] payLoad = + readLength == readFileBufferSize + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength); + final PipeTransferFilePieceResp resp; + try { + final TPipeTransferReq req = + compressIfNeeded( + isMultiFile + ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) + : getTransferSingleFilePieceReq(file.getName(), position, payLoad)); + pipe2WeightMap.forEach( + (namePair, weight) -> + rateLimitIfNeeded( + namePair.getLeft(), + namePair.getRight(), + clientAndStatus.getLeft().getEndPoint(), + (long) (req.getBody().length * weight))); + resp = + PipeTransferFilePieceResp.fromTPipeTransferResp( + clientAndStatus.getLeft().pipeTransfer(req)); + } catch (final Exception e) { + clientAndStatus.setRight(false); + throw new PipeConnectionException( + String.format( + "Network error when transfer file %s, because %s.", file, e.getMessage()), + e); + } + + position += readLength; + + final TSStatus status = resp.getStatus(); + if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { + position = resp.getEndWritingOffset(); + reader.seek(position); + LOGGER.info(DataNodePipeMessages.REDIRECT_FILE_POSITION_TO, position); + continue; + } + + if (status.getCode() + == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { + getClientManager().sendHandshakeReq(clientAndStatus); + } + + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + receiverStatusHandler.handle( + resp.getStatus(), + String.format("Transfer file %s error, result status %s.", file, resp.getStatus()), + file.getName()); + } + } + } + } + + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
