This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ced3908f07e Reduce IoTConsensus snapshot transmission memory and log
overhead (#18019)
ced3908f07e is described below
commit ced3908f07e54dd4800d98947f22a421b14bb627
Author: Hongzhi Gao <[email protected]>
AuthorDate: Thu Jun 25 14:53:00 2026 +0800
Reduce IoTConsensus snapshot transmission memory and log overhead (#18019)
- Reuse a single buffer across all files instead of allocating a 10MB
buffer per file in SnapshotFragmentReader.
- Only build/emit the full file list at debug level, and throttle the
per-file progress log via a configurable interval
(data_region_iot_snapshot_transmission_progress_log_interval_ms).
- Add SnapshotFragmentReaderTest covering shared-buffer reuse.
---
.../iotdb/consensus/config/IoTConsensusConfig.java | 23 ++++-
.../consensus/iot/IoTConsensusServerImpl.java | 63 ++++++++------
.../iot/snapshot/SnapshotFragmentReader.java | 14 +++-
.../iot/snapshot/SnapshotFragmentReaderTest.java | 97 ++++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../db/consensus/DataRegionConsensusImpl.java | 2 +
.../conf/iotdb-system.properties.template | 7 ++
8 files changed, 195 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 7720cf59f05..c6ca9a7e0dd 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -325,6 +325,7 @@ public class IoTConsensusConfig {
private final long regionMigrationSpeedLimitBytesPerSecond;
private final long subscriptionWalRetentionSizeInBytes;
private final long subscriptionWalRetentionTimeMs;
+ private final long snapshotTransmissionProgressLogIntervalMs;
private Replication(
int maxLogEntriesNumPerBatch,
@@ -342,7 +343,8 @@ public class IoTConsensusConfig {
double maxMemoryRatioForQueue,
long regionMigrationSpeedLimitBytesPerSecond,
long subscriptionWalRetentionSizeInBytes,
- long subscriptionWalRetentionTimeMs) {
+ long subscriptionWalRetentionTimeMs,
+ long snapshotTransmissionProgressLogIntervalMs) {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatchesNum = maxPendingBatchesNum;
@@ -359,6 +361,7 @@ public class IoTConsensusConfig {
this.regionMigrationSpeedLimitBytesPerSecond =
regionMigrationSpeedLimitBytesPerSecond;
this.subscriptionWalRetentionSizeInBytes =
subscriptionWalRetentionSizeInBytes;
this.subscriptionWalRetentionTimeMs = subscriptionWalRetentionTimeMs;
+ this.snapshotTransmissionProgressLogIntervalMs =
snapshotTransmissionProgressLogIntervalMs;
}
public int getMaxLogEntriesNumPerBatch() {
@@ -425,6 +428,10 @@ public class IoTConsensusConfig {
return subscriptionWalRetentionTimeMs;
}
+ public long getSnapshotTransmissionProgressLogIntervalMs() {
+ return snapshotTransmissionProgressLogIntervalMs;
+ }
+
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
@@ -450,6 +457,11 @@ public class IoTConsensusConfig {
private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
private long subscriptionWalRetentionSizeInBytes = 0;
private long subscriptionWalRetentionTimeMs = -1L;
+ // Throttle the per-file snapshot-transmission progress log to at most
once per this interval;
+ // a snapshot may contain hundreds of thousands of files, so one INFO
line per file is itself
+ // a
+ // heavy IO/string-building cost. A value <= 0 logs every file.
+ private long snapshotTransmissionProgressLogIntervalMs = 5000L;
public Replication.Builder setMaxLogEntriesNumPerBatch(int
maxLogEntriesNumPerBatch) {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -535,6 +547,12 @@ public class IoTConsensusConfig {
return this;
}
+ public Builder setSnapshotTransmissionProgressLogIntervalMs(
+ long snapshotTransmissionProgressLogIntervalMs) {
+ this.snapshotTransmissionProgressLogIntervalMs =
snapshotTransmissionProgressLogIntervalMs;
+ return this;
+ }
+
public Replication build() {
return new Replication(
maxLogEntriesNumPerBatch,
@@ -552,7 +570,8 @@ public class IoTConsensusConfig {
maxMemoryRatioForQueue,
regionMigrationSpeedLimitBytesPerSecond,
subscriptionWalRetentionSizeInBytes,
- subscriptionWalRetentionTimeMs);
+ subscriptionWalRetentionTimeMs,
+ snapshotTransmissionProgressLogIntervalMs);
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 91b06a4f3f4..07d1f2593b2 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -382,33 +382,42 @@ public class IoTConsensusServerImpl {
public void transmitSnapshot(Peer targetPeer) throws
ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, newSnapshotDirName);
List<File> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
- AtomicLong snapshotSizeSumAtomic = new AtomicLong();
- StringBuilder allFilesStr = new StringBuilder();
- snapshotPaths.forEach(
- file -> {
- long fileSize = file.length();
- snapshotSizeSumAtomic.addAndGet(fileSize);
- allFilesStr
- .append("\n")
- .append(file.getName())
- .append(" ")
- .append(humanReadableByteCountSI(fileSize));
- });
- final long snapshotSizeSum = snapshotSizeSumAtomic.get();
+ long snapshotSizeSum = 0;
+ for (File file : snapshotPaths) {
+ snapshotSizeSum += file.length();
+ }
long transitedSnapshotSizeSum = 0;
long transitedFilesNum = 0;
long startTime = System.nanoTime();
+ long lastProgressLogTime = startTime;
+ // Throttle the per-file progress log to at most once per this interval; a
snapshot may contain
+ // hundreds of thousands of files, so one INFO line per file is itself a
heavy cost.
+ long progressLogIntervalNs =
+ TimeUnit.MILLISECONDS.toNanos(
+
config.getReplication().getSnapshotTransmissionProgressLogIntervalMs());
logger.info(
IoTConsensusMessages.SNAPSHOT_TRANSMISSION_START,
snapshotPaths.size(),
humanReadableByteCountSI(snapshotSizeSum),
snapshotDir);
- logger.info(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES,
allFilesStr);
+ if (logger.isDebugEnabled()) {
+ StringBuilder allFilesStr = new StringBuilder();
+ for (File file : snapshotPaths) {
+ allFilesStr
+ .append("\n")
+ .append(file.getName())
+ .append(" ")
+ .append(humanReadableByteCountSI(file.length()));
+ }
+ logger.debug(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES,
allFilesStr);
+ }
+ ByteBuffer fragmentBuffer =
+ ByteBuffer.allocate(SnapshotFragmentReader.DEFAULT_FILE_FRAGMENT_SIZE);
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (File file : snapshotPaths) {
SnapshotFragmentReader reader =
- new SnapshotFragmentReader(newSnapshotDirName, file.toPath());
+ new SnapshotFragmentReader(newSnapshotDirName, file.toPath(),
fragmentBuffer);
try {
while (reader.hasNext()) {
// TODO: zero copy ?
@@ -423,16 +432,20 @@ public class IoTConsensusServerImpl {
}
transitedSnapshotSizeSum += reader.getTotalReadSize();
transitedFilesNum++;
- logger.info(
- IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
- newSnapshotDirName,
- transitedFilesNum,
- snapshotPaths.size(),
- humanReadableByteCountSI(transitedSnapshotSizeSum),
- humanReadableByteCountSI(snapshotSizeSum),
- CommonDateTimeUtils.convertMillisecondToDurationStr(
- (System.nanoTime() - startTime) / 1_000_000),
- file);
+ long now = System.nanoTime();
+ if (now - lastProgressLogTime >= progressLogIntervalNs
+ || transitedFilesNum == snapshotPaths.size()) {
+ lastProgressLogTime = now;
+ logger.info(
+ IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
+ newSnapshotDirName,
+ transitedFilesNum,
+ snapshotPaths.size(),
+ humanReadableByteCountSI(transitedSnapshotSizeSum),
+ humanReadableByteCountSI(snapshotSizeSum),
+ CommonDateTimeUtils.convertMillisecondToDurationStr((now -
startTime) / 1_000_000),
+ file);
+ }
} finally {
reader.close();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 3331829d243..f9c905ee2c0 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -27,7 +27,7 @@ import java.nio.file.Path;
public class SnapshotFragmentReader {
- private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
+ public static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
private final String snapshotId;
private final String filePath;
private final SeekableByteChannel fileChannel;
@@ -36,12 +36,20 @@ public class SnapshotFragmentReader {
private long totalReadSize;
private SnapshotFragment cachedSnapshotFragment;
- public SnapshotFragmentReader(String snapshotId, Path path) throws
IOException {
+ /**
+ * The {@code buf} is supplied (and owned) by the caller so a single 10MB
buffer can be reused
+ * across every file of a snapshot transmission. Allocating a fresh 10MB
buffer per file is
+ * extremely wasteful when a snapshot contains hundreds of thousands of tiny
files, multiplying GC
+ * pressure and allocation cost. The buffer is fully reset via {@link
ByteBuffer#clear()} on each
+ * {@link #hasNext()} call, and each fragment is serialized synchronously
before the next read, so
+ * sharing it across files (and across readers) is safe.
+ */
+ public SnapshotFragmentReader(String snapshotId, Path path, ByteBuffer buf)
throws IOException {
this.snapshotId = snapshotId;
this.filePath = path.toAbsolutePath().toString();
this.fileSize = Files.size(path);
this.fileChannel = Files.newByteChannel(path);
- this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE);
+ this.buf = buf;
}
public boolean hasNext() throws IOException {
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
new file mode 100644
index 00000000000..36048d4c9ed
--- /dev/null
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.snapshot;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+/**
+ * Regression test for the snapshot-transmission read path. The sending side
reuses a single,
+ * caller-supplied {@link ByteBuffer} across every file of a snapshot (instead
of allocating a fresh
+ * 10MB buffer per file), so this verifies that (1) a single buffer reused
across many readers still
+ * reconstructs every file byte-for-byte, and (2) the buffer is genuinely
shared rather than
+ * re-allocated per file.
+ */
+public class SnapshotFragmentReaderTest {
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void reusedBufferReadsEveryFileCorrectly() throws IOException {
+ final Random random = new Random(42);
+ // Deliberately use a tiny buffer so files span multiple fragments, and
use file sizes that are
+ // not multiples of the buffer size to exercise the partial-final-fragment
path.
+ final int bufferSize = 16;
+ final int[] fileSizes = {0, 1, bufferSize, bufferSize + 1, 5 * bufferSize
+ 7};
+
+ byte[][] contents = new byte[fileSizes.length][];
+ Path[] paths = new Path[fileSizes.length];
+ for (int i = 0; i < fileSizes.length; i++) {
+ contents[i] = new byte[fileSizes[i]];
+ random.nextBytes(contents[i]);
+ paths[i] = temporaryFolder.newFile("file-" + i).toPath();
+ Files.write(paths[i], contents[i]);
+ }
+
+ final ByteBuffer sharedBuffer = ByteBuffer.allocate(bufferSize);
+ ByteBuffer firstReusedChunk = null;
+
+ for (int i = 0; i < paths.length; i++) {
+ SnapshotFragmentReader reader = new SnapshotFragmentReader("snap",
paths[i], sharedBuffer);
+ try {
+ ByteArrayOutputStream reconstructed = new ByteArrayOutputStream();
+ while (reader.hasNext()) {
+ SnapshotFragment fragment = reader.next();
+ // Every fragment must be backed by the one shared buffer, proving
it is reused and not
+ // re-allocated per file.
+ Assert.assertSame(sharedBuffer, fragment.getFileChunk());
+ if (firstReusedChunk == null) {
+ firstReusedChunk = fragment.getFileChunk();
+ } else {
+ Assert.assertSame(firstReusedChunk, fragment.getFileChunk());
+ }
+
+ // Drain the fragment immediately, mirroring how the sender
serializes each fragment
+ // synchronously before the next read overwrites the shared buffer.
+ ByteBuffer chunk = fragment.getFileChunk();
+ byte[] bytes = new byte[chunk.remaining()];
+ chunk.get(bytes);
+ reconstructed.write(bytes);
+ }
+ Assert.assertArrayEquals(
+ "File " + i + " was not reconstructed correctly",
+ contents[i],
+ reconstructed.toByteArray());
+ Assert.assertEquals(contents[i].length, reader.getTotalReadSize());
+ } finally {
+ reader.close();
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 44ac962aaea..429efe52d65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1101,6 +1101,9 @@ public class IoTDBConfig {
private int maxPendingBatchesNum = 5;
private double maxMemoryRatioForQueue = 0.6;
private long regionMigrationSpeedLimitBytesPerSecond = 48 * 1024 * 1024L;
+ // Throttle the per-file snapshot-transmission progress log in IoTConsensus
to at most once per
+ // this interval (ms). A value <= 0 logs every file.
+ private long dataRegionIotSnapshotTransmissionProgressLogIntervalMs = 5000L;
// IoTConsensusV2 Config
private int iotConsensusV2PipelineSize = 5;
@@ -1275,6 +1278,16 @@ public class IoTDBConfig {
this.regionMigrationSpeedLimitBytesPerSecond =
regionMigrationSpeedLimitBytesPerSecond;
}
+ public long getDataRegionIotSnapshotTransmissionProgressLogIntervalMs() {
+ return dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+ }
+
+ public void setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+ long dataRegionIotSnapshotTransmissionProgressLogIntervalMs) {
+ this.dataRegionIotSnapshotTransmissionProgressLogIntervalMs =
+ dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+ }
+
public int getIotConsensusV2PipelineSize() {
return iotConsensusV2PipelineSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7acda29f959..147c3340bb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1286,6 +1286,12 @@ public class IoTDBDescriptor {
"region_migration_speed_limit_bytes_per_second",
ConfigurationFileUtils.getConfigurationDefaultValue(
"region_migration_speed_limit_bytes_per_second"))));
+ conf.setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+
"data_region_iot_snapshot_transmission_progress_log_interval_ms",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+
"data_region_iot_snapshot_transmission_progress_log_interval_ms"))));
conf.setKeepSameDiskWhenLoadingSnapshot(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 22bab53a8d2..a4c8e00f1ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -178,6 +178,8 @@ public class DataRegionConsensusImpl {
COMMON_CONF.getSubscriptionConsensusWalRetentionSizeInBytes())
.setSubscriptionWalRetentionTimeMs(
COMMON_CONF.getSubscriptionConsensusWalRetentionTimeMs())
+ .setSnapshotTransmissionProgressLogIntervalMs(
+
CONF.getDataRegionIotSnapshotTransmissionProgressLogIntervalMs())
.build())
.build())
.setIoTConsensusV2Config(
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 1e808d7d75a..facf07fb85f 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1723,6 +1723,13 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
# Datatype: long
region_migration_speed_limit_bytes_per_second = 50331648
+# The minimum interval (in ms) between two per-file progress logs while
transmitting a snapshot in
+# IoTConsensus. A snapshot may contain a huge number of files, so logging one
line per file is
+# costly; this throttles it to at most once per interval. A value <= 0 logs
every file.
+# effectiveMode: hot_reload
+# Datatype: long
+data_region_iot_snapshot_transmission_progress_log_interval_ms = 5000
+
# When loading snapshot, try keeping TsFiles in the same disk as the snapshot
dir.
# This may reduce file copies but may also result in a worse disk load-balance
# effectiveMode: hot_reload