This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch 9f83245ea9abe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a97a4eb9782d19cffefaaa92bfaff56ebd3591e0 Author: YongzaoDan <[email protected]> AuthorDate: Tue Jun 27 14:42:08 2023 +0800 Finish --- .../persistence/executor/ConfigPlanExecutor.java | 52 +++++++++++----------- .../persistence/partition/PartitionInfo.java | 6 ++- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index a6e7710db84..c2ddd7652d4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -476,31 +476,33 @@ public class ConfigPlanExecutor { } AtomicBoolean result = new AtomicBoolean(true); - snapshotProcessorList.forEach( - x -> { - boolean takeSnapshotResult = true; - try { - long startTime = System.currentTimeMillis(); - LOGGER.info( - "[ConfigNodeSnapshot] Start to take snapshot for {} into {}", - x.getClass().getName(), - snapshotDir.getAbsolutePath()); - takeSnapshotResult = x.processTakeSnapshot(snapshotDir); - LOGGER.info( - "[ConfigNodeSnapshot] Finish to take snapshot for {}, time consumption: {} ms", - x.getClass().getName(), - System.currentTimeMillis() - startTime); - } catch (TException | IOException e) { - LOGGER.error("Take snapshot error: {}", e.getMessage()); - takeSnapshotResult = false; - } finally { - // If any snapshot fails, the whole fails - // So this is just going to be false - if (!takeSnapshotResult) { - result.set(false); - } - } - }); + snapshotProcessorList + .parallelStream() + .forEach( + x -> { + boolean takeSnapshotResult = true; + try { + long startTime = System.currentTimeMillis(); + LOGGER.info( + "[ConfigNodeSnapshot] Start to take snapshot for {} into {}", + x.getClass().getName(), + snapshotDir.getAbsolutePath()); + takeSnapshotResult = x.processTakeSnapshot(snapshotDir); + LOGGER.info( + "[ConfigNodeSnapshot] Finish to take snapshot for {}, time consumption: {} ms", + x.getClass().getName(), + System.currentTimeMillis() - startTime); + } catch (TException | IOException e) { + LOGGER.error("Take snapshot error: {}", e.getMessage()); + takeSnapshotResult = false; + } finally { + // If any snapshot fails, the whole fails + // So this is just going to be false + if (!takeSnapshotResult) { + result.set(false); + } + } + }); if (result.get()) { LOGGER.info("[ConfigNodeSnapshot] Task snapshot success, snapshotDir: {}", snapshotDir); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index c6a74c1313e..7b3c6629931 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -72,8 +72,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; @@ -780,7 +780,9 @@ public class PartitionInfo implements SnapshotProcessor { // snapshot operation. File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID()); - try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile); + try (BufferedOutputStream fileOutputStream = + new BufferedOutputStream( + Files.newOutputStream(tmpFile.toPath()), PARTITION_TABLE_BUFFER_SIZE); TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) { TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
