This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ced3908f07e Reduce IoTConsensus snapshot transmission memory and log 
overhead (#18019)
ced3908f07e is described below

commit ced3908f07e54dd4800d98947f22a421b14bb627
Author: Hongzhi Gao <[email protected]>
AuthorDate: Thu Jun 25 14:53:00 2026 +0800

    Reduce IoTConsensus snapshot transmission memory and log overhead (#18019)
    
    - Reuse a single buffer across all files instead of allocating a 10MB
      buffer per file in SnapshotFragmentReader.
    - Only build/emit the full file list at debug level, and throttle the
      per-file progress log via a configurable interval
      (data_region_iot_snapshot_transmission_progress_log_interval_ms).
    - Add SnapshotFragmentReaderTest covering shared-buffer reuse.
---
 .../iotdb/consensus/config/IoTConsensusConfig.java | 23 ++++-
 .../consensus/iot/IoTConsensusServerImpl.java      | 63 ++++++++------
 .../iot/snapshot/SnapshotFragmentReader.java       | 14 +++-
 .../iot/snapshot/SnapshotFragmentReaderTest.java   | 97 ++++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../db/consensus/DataRegionConsensusImpl.java      |  2 +
 .../conf/iotdb-system.properties.template          |  7 ++
 8 files changed, 195 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 7720cf59f05..c6ca9a7e0dd 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -325,6 +325,7 @@ public class IoTConsensusConfig {
     private final long regionMigrationSpeedLimitBytesPerSecond;
     private final long subscriptionWalRetentionSizeInBytes;
     private final long subscriptionWalRetentionTimeMs;
+    private final long snapshotTransmissionProgressLogIntervalMs;
 
     private Replication(
         int maxLogEntriesNumPerBatch,
@@ -342,7 +343,8 @@ public class IoTConsensusConfig {
         double maxMemoryRatioForQueue,
         long regionMigrationSpeedLimitBytesPerSecond,
         long subscriptionWalRetentionSizeInBytes,
-        long subscriptionWalRetentionTimeMs) {
+        long subscriptionWalRetentionTimeMs,
+        long snapshotTransmissionProgressLogIntervalMs) {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
@@ -359,6 +361,7 @@ public class IoTConsensusConfig {
       this.regionMigrationSpeedLimitBytesPerSecond = 
regionMigrationSpeedLimitBytesPerSecond;
       this.subscriptionWalRetentionSizeInBytes = 
subscriptionWalRetentionSizeInBytes;
       this.subscriptionWalRetentionTimeMs = subscriptionWalRetentionTimeMs;
+      this.snapshotTransmissionProgressLogIntervalMs = 
snapshotTransmissionProgressLogIntervalMs;
     }
 
     public int getMaxLogEntriesNumPerBatch() {
@@ -425,6 +428,10 @@ public class IoTConsensusConfig {
       return subscriptionWalRetentionTimeMs;
     }
 
+    public long getSnapshotTransmissionProgressLogIntervalMs() {
+      return snapshotTransmissionProgressLogIntervalMs;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
@@ -450,6 +457,11 @@ public class IoTConsensusConfig {
       private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
       private long subscriptionWalRetentionSizeInBytes = 0;
       private long subscriptionWalRetentionTimeMs = -1L;
+      // Throttle the per-file snapshot-transmission progress log to at most 
once per this interval;
+      // a snapshot may contain hundreds of thousands of files, so one INFO 
line per file is itself
+      // a
+      // heavy IO/string-building cost. A value <= 0 logs every file.
+      private long snapshotTransmissionProgressLogIntervalMs = 5000L;
 
       public Replication.Builder setMaxLogEntriesNumPerBatch(int 
maxLogEntriesNumPerBatch) {
         this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -535,6 +547,12 @@ public class IoTConsensusConfig {
         return this;
       }
 
+      public Builder setSnapshotTransmissionProgressLogIntervalMs(
+          long snapshotTransmissionProgressLogIntervalMs) {
+        this.snapshotTransmissionProgressLogIntervalMs = 
snapshotTransmissionProgressLogIntervalMs;
+        return this;
+      }
+
       public Replication build() {
         return new Replication(
             maxLogEntriesNumPerBatch,
@@ -552,7 +570,8 @@ public class IoTConsensusConfig {
             maxMemoryRatioForQueue,
             regionMigrationSpeedLimitBytesPerSecond,
             subscriptionWalRetentionSizeInBytes,
-            subscriptionWalRetentionTimeMs);
+            subscriptionWalRetentionTimeMs,
+            snapshotTransmissionProgressLogIntervalMs);
       }
     }
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 91b06a4f3f4..07d1f2593b2 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -382,33 +382,42 @@ public class IoTConsensusServerImpl {
   public void transmitSnapshot(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
     File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<File> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
-    AtomicLong snapshotSizeSumAtomic = new AtomicLong();
-    StringBuilder allFilesStr = new StringBuilder();
-    snapshotPaths.forEach(
-        file -> {
-          long fileSize = file.length();
-          snapshotSizeSumAtomic.addAndGet(fileSize);
-          allFilesStr
-              .append("\n")
-              .append(file.getName())
-              .append(" ")
-              .append(humanReadableByteCountSI(fileSize));
-        });
-    final long snapshotSizeSum = snapshotSizeSumAtomic.get();
+    long snapshotSizeSum = 0;
+    for (File file : snapshotPaths) {
+      snapshotSizeSum += file.length();
+    }
     long transitedSnapshotSizeSum = 0;
     long transitedFilesNum = 0;
     long startTime = System.nanoTime();
+    long lastProgressLogTime = startTime;
+    // Throttle the per-file progress log to at most once per this interval; a 
snapshot may contain
+    // hundreds of thousands of files, so one INFO line per file is itself a 
heavy cost.
+    long progressLogIntervalNs =
+        TimeUnit.MILLISECONDS.toNanos(
+            
config.getReplication().getSnapshotTransmissionProgressLogIntervalMs());
     logger.info(
         IoTConsensusMessages.SNAPSHOT_TRANSMISSION_START,
         snapshotPaths.size(),
         humanReadableByteCountSI(snapshotSizeSum),
         snapshotDir);
-    logger.info(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, 
allFilesStr);
+    if (logger.isDebugEnabled()) {
+      StringBuilder allFilesStr = new StringBuilder();
+      for (File file : snapshotPaths) {
+        allFilesStr
+            .append("\n")
+            .append(file.getName())
+            .append(" ")
+            .append(humanReadableByteCountSI(file.length()));
+      }
+      logger.debug(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, 
allFilesStr);
+    }
+    ByteBuffer fragmentBuffer =
+        ByteBuffer.allocate(SnapshotFragmentReader.DEFAULT_FILE_FRAGMENT_SIZE);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (File file : snapshotPaths) {
         SnapshotFragmentReader reader =
-            new SnapshotFragmentReader(newSnapshotDirName, file.toPath());
+            new SnapshotFragmentReader(newSnapshotDirName, file.toPath(), 
fragmentBuffer);
         try {
           while (reader.hasNext()) {
             // TODO: zero copy ?
@@ -423,16 +432,20 @@ public class IoTConsensusServerImpl {
           }
           transitedSnapshotSizeSum += reader.getTotalReadSize();
           transitedFilesNum++;
-          logger.info(
-              IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
-              newSnapshotDirName,
-              transitedFilesNum,
-              snapshotPaths.size(),
-              humanReadableByteCountSI(transitedSnapshotSizeSum),
-              humanReadableByteCountSI(snapshotSizeSum),
-              CommonDateTimeUtils.convertMillisecondToDurationStr(
-                  (System.nanoTime() - startTime) / 1_000_000),
-              file);
+          long now = System.nanoTime();
+          if (now - lastProgressLogTime >= progressLogIntervalNs
+              || transitedFilesNum == snapshotPaths.size()) {
+            lastProgressLogTime = now;
+            logger.info(
+                IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
+                newSnapshotDirName,
+                transitedFilesNum,
+                snapshotPaths.size(),
+                humanReadableByteCountSI(transitedSnapshotSizeSum),
+                humanReadableByteCountSI(snapshotSizeSum),
+                CommonDateTimeUtils.convertMillisecondToDurationStr((now - 
startTime) / 1_000_000),
+                file);
+          }
         } finally {
           reader.close();
         }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 3331829d243..f9c905ee2c0 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -27,7 +27,7 @@ import java.nio.file.Path;
 
 public class SnapshotFragmentReader {
 
-  private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
+  public static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
   private final String snapshotId;
   private final String filePath;
   private final SeekableByteChannel fileChannel;
@@ -36,12 +36,20 @@ public class SnapshotFragmentReader {
   private long totalReadSize;
   private SnapshotFragment cachedSnapshotFragment;
 
-  public SnapshotFragmentReader(String snapshotId, Path path) throws 
IOException {
+  /**
+   * The {@code buf} is supplied (and owned) by the caller so a single 10MB 
buffer can be reused
+   * across every file of a snapshot transmission. Allocating a fresh 10MB 
buffer per file is
+   * extremely wasteful when a snapshot contains hundreds of thousands of tiny 
files, multiplying GC
+   * pressure and allocation cost. The buffer is fully reset via {@link 
ByteBuffer#clear()} on each
+   * {@link #hasNext()} call, and each fragment is serialized synchronously 
before the next read, so
+   * sharing it across files (and across readers) is safe.
+   */
+  public SnapshotFragmentReader(String snapshotId, Path path, ByteBuffer buf) 
throws IOException {
     this.snapshotId = snapshotId;
     this.filePath = path.toAbsolutePath().toString();
     this.fileSize = Files.size(path);
     this.fileChannel = Files.newByteChannel(path);
-    this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE);
+    this.buf = buf;
   }
 
   public boolean hasNext() throws IOException {
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
new file mode 100644
index 00000000000..36048d4c9ed
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.snapshot;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+/**
+ * Regression test for the snapshot-transmission read path. The sending side 
reuses a single,
+ * caller-supplied {@link ByteBuffer} across every file of a snapshot (instead 
of allocating a fresh
+ * 10MB buffer per file), so this verifies that (1) a single buffer reused 
across many readers still
+ * reconstructs every file byte-for-byte, and (2) the buffer is genuinely 
shared rather than
+ * re-allocated per file.
+ */
+public class SnapshotFragmentReaderTest {
+
+  @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void reusedBufferReadsEveryFileCorrectly() throws IOException {
+    final Random random = new Random(42);
+    // Deliberately use a tiny buffer so files span multiple fragments, and 
use file sizes that are
+    // not multiples of the buffer size to exercise the partial-final-fragment 
path.
+    final int bufferSize = 16;
+    final int[] fileSizes = {0, 1, bufferSize, bufferSize + 1, 5 * bufferSize 
+ 7};
+
+    byte[][] contents = new byte[fileSizes.length][];
+    Path[] paths = new Path[fileSizes.length];
+    for (int i = 0; i < fileSizes.length; i++) {
+      contents[i] = new byte[fileSizes[i]];
+      random.nextBytes(contents[i]);
+      paths[i] = temporaryFolder.newFile("file-" + i).toPath();
+      Files.write(paths[i], contents[i]);
+    }
+
+    final ByteBuffer sharedBuffer = ByteBuffer.allocate(bufferSize);
+    ByteBuffer firstReusedChunk = null;
+
+    for (int i = 0; i < paths.length; i++) {
+      SnapshotFragmentReader reader = new SnapshotFragmentReader("snap", 
paths[i], sharedBuffer);
+      try {
+        ByteArrayOutputStream reconstructed = new ByteArrayOutputStream();
+        while (reader.hasNext()) {
+          SnapshotFragment fragment = reader.next();
+          // Every fragment must be backed by the one shared buffer, proving 
it is reused and not
+          // re-allocated per file.
+          Assert.assertSame(sharedBuffer, fragment.getFileChunk());
+          if (firstReusedChunk == null) {
+            firstReusedChunk = fragment.getFileChunk();
+          } else {
+            Assert.assertSame(firstReusedChunk, fragment.getFileChunk());
+          }
+
+          // Drain the fragment immediately, mirroring how the sender 
serializes each fragment
+          // synchronously before the next read overwrites the shared buffer.
+          ByteBuffer chunk = fragment.getFileChunk();
+          byte[] bytes = new byte[chunk.remaining()];
+          chunk.get(bytes);
+          reconstructed.write(bytes);
+        }
+        Assert.assertArrayEquals(
+            "File " + i + " was not reconstructed correctly",
+            contents[i],
+            reconstructed.toByteArray());
+        Assert.assertEquals(contents[i].length, reader.getTotalReadSize());
+      } finally {
+        reader.close();
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 44ac962aaea..429efe52d65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1101,6 +1101,9 @@ public class IoTDBConfig {
   private int maxPendingBatchesNum = 5;
   private double maxMemoryRatioForQueue = 0.6;
   private long regionMigrationSpeedLimitBytesPerSecond = 48 * 1024 * 1024L;
+  // Throttle the per-file snapshot-transmission progress log in IoTConsensus 
to at most once per
+  // this interval (ms). A value <= 0 logs every file.
+  private long dataRegionIotSnapshotTransmissionProgressLogIntervalMs = 5000L;
 
   // IoTConsensusV2 Config
   private int iotConsensusV2PipelineSize = 5;
@@ -1275,6 +1278,16 @@ public class IoTDBConfig {
     this.regionMigrationSpeedLimitBytesPerSecond = 
regionMigrationSpeedLimitBytesPerSecond;
   }
 
+  public long getDataRegionIotSnapshotTransmissionProgressLogIntervalMs() {
+    return dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+  }
+
+  public void setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+      long dataRegionIotSnapshotTransmissionProgressLogIntervalMs) {
+    this.dataRegionIotSnapshotTransmissionProgressLogIntervalMs =
+        dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+  }
+
   public int getIotConsensusV2PipelineSize() {
     return iotConsensusV2PipelineSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7acda29f959..147c3340bb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1286,6 +1286,12 @@ public class IoTDBDescriptor {
                 "region_migration_speed_limit_bytes_per_second",
                 ConfigurationFileUtils.getConfigurationDefaultValue(
                     "region_migration_speed_limit_bytes_per_second"))));
+    conf.setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                
"data_region_iot_snapshot_transmission_progress_log_interval_ms",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    
"data_region_iot_snapshot_transmission_progress_log_interval_ms"))));
     conf.setKeepSameDiskWhenLoadingSnapshot(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 22bab53a8d2..a4c8e00f1ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -178,6 +178,8 @@ public class DataRegionConsensusImpl {
                               
COMMON_CONF.getSubscriptionConsensusWalRetentionSizeInBytes())
                           .setSubscriptionWalRetentionTimeMs(
                               
COMMON_CONF.getSubscriptionConsensusWalRetentionTimeMs())
+                          .setSnapshotTransmissionProgressLogIntervalMs(
+                              
CONF.getDataRegionIotSnapshotTransmissionProgressLogIntervalMs())
                           .build())
                   .build())
           .setIoTConsensusV2Config(
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 1e808d7d75a..facf07fb85f 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1723,6 +1723,13 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
 # Datatype: long
 region_migration_speed_limit_bytes_per_second = 50331648
 
+# The minimum interval (in ms) between two per-file progress logs while 
transmitting a snapshot in
+# IoTConsensus. A snapshot may contain a huge number of files, so logging one 
line per file is
+# costly; this throttles it to at most once per interval. A value <= 0 logs 
every file.
+# effectiveMode: hot_reload
+# Datatype: long
+data_region_iot_snapshot_transmission_progress_log_interval_ms = 5000
+
 # When loading snapshot, try keeping TsFiles in the same disk as the snapshot 
dir.
 # This may reduce file copies but may also result in a worse disk load-balance
 # effectiveMode: hot_reload

Reply via email to