This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 4715659fdde [IOTDB-6182] Add fsync in ConfigNode persistence modules 
(#11264) (#11296)
4715659fdde is described below

commit 4715659fdde209b79d1b9c76c8104b4160b1986b
Author: Yongzao <[email protected]>
AuthorDate: Fri Oct 13 08:55:42 2023 +0800

    [IOTDB-6182] Add fsync in ConfigNode persistence modules (#11264) (#11296)
---
 .../confignode/conf/SystemPropertiesUtils.java     |  1 +
 .../confignode/persistence/node/NodeInfo.java      |  7 +++---
 .../persistence/partition/PartitionInfo.java       | 29 +++++++++++++---------
 .../confignode/procedure/store/ProcedureWAL.java   |  3 +++
 .../iotdb/confignode/persistence/NodeInfoTest.java |  2 +-
 .../confignode/persistence/PartitionInfoTest.java  |  2 +-
 6 files changed, 27 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 45d7ba0d672..0f7b9e4f85f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -404,6 +404,7 @@ public class SystemPropertiesUtils {
       systemProperties.store(
           fileOutputStream,
           " THIS FILE IS AUTOMATICALLY GENERATED. PLEASE DO NOT MODIFY THIS 
FILE !!!");
+      fileOutputStream.getFD().sync();
     }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index aa5e0ed29c3..c241d88a901 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -458,12 +458,13 @@ public class NodeInfo implements SnapshotProcessor {
 
       serializeVersionInfo(fileOutputStream);
 
-      fileOutputStream.flush();
+      tioStreamTransport.flush();
+      fileOutputStream.getFD().sync();
 
-      fileOutputStream.close();
+      // The tmpFile can be renamed only after the stream is closed
+      tioStreamTransport.close();
 
       return tmpFile.renameTo(snapshotFile);
-
     } finally {
       versionInfoReadWriteLock.readLock().unlock();
       dataNodeInfoReadWriteLock.readLock().unlock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index d61396a4c7a..710c4b91ebf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -849,32 +850,36 @@ public class PartitionInfo implements SnapshotProcessor {
     // snapshot operation.
     File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + 
UUID.randomUUID());
 
-    try (BufferedOutputStream fileOutputStream =
-            new BufferedOutputStream(
-                Files.newOutputStream(tmpFile.toPath()), 
PARTITION_TABLE_BUFFER_SIZE);
-        TIOStreamTransport tioStreamTransport = new 
TIOStreamTransport(fileOutputStream)) {
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
+        BufferedOutputStream bufferedOutputStream =
+            new BufferedOutputStream(fileOutputStream, 
PARTITION_TABLE_BUFFER_SIZE);
+        TIOStreamTransport tioStreamTransport = new 
TIOStreamTransport(bufferedOutputStream)) {
       TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
 
       // serialize nextRegionGroupId
-      ReadWriteIOUtils.write(nextRegionGroupId.get(), fileOutputStream);
+      ReadWriteIOUtils.write(nextRegionGroupId.get(), bufferedOutputStream);
 
       // serialize StorageGroupPartitionTable
-      ReadWriteIOUtils.write(databasePartitionTables.size(), fileOutputStream);
+      ReadWriteIOUtils.write(databasePartitionTables.size(), 
bufferedOutputStream);
       for (Map.Entry<String, DatabasePartitionTable> 
storageGroupPartitionTableEntry :
           databasePartitionTables.entrySet()) {
-        ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), 
fileOutputStream);
-        storageGroupPartitionTableEntry.getValue().serialize(fileOutputStream, 
protocol);
+        ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), 
bufferedOutputStream);
+        
storageGroupPartitionTableEntry.getValue().serialize(bufferedOutputStream, 
protocol);
       }
 
       // serialize regionCleanList
-      ReadWriteIOUtils.write(regionMaintainTaskList.size(), fileOutputStream);
+      ReadWriteIOUtils.write(regionMaintainTaskList.size(), 
bufferedOutputStream);
       for (RegionMaintainTask task : regionMaintainTaskList) {
-        task.serialize(fileOutputStream, protocol);
+        task.serialize(bufferedOutputStream, protocol);
       }
 
       // write to file
-      fileOutputStream.flush();
-      fileOutputStream.close();
+      tioStreamTransport.flush();
+      fileOutputStream.getFD().sync();
+
+      // The tmpFile can be renamed only after the stream is closed
+      tioStreamTransport.close();
+
       // rename file
       return tmpFile.renameTo(snapshotFile);
     } finally {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
index 3f1b4eba687..d4638dd0b23 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
@@ -67,6 +67,9 @@ public class ProcedureWAL {
         DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS)) {
       procedure.serialize(dataOutputStream);
       channel.write(ByteBuffer.wrap(publicBAOS.getBuf(), 0, 
publicBAOS.size()));
+
+      channel.force(true);
+      fos.getFD().sync();
     }
     Files.deleteIfExists(walFilePath);
     Files.move(walTmpPath, walFilePath);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index dbeba88bd0f..959eeba114d 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -65,7 +65,7 @@ public class NodeInfoTest {
   public void testSnapshot() throws TException, IOException {
     registerConfigNodes();
     registerDataNodes();
-    nodeInfo.processTakeSnapshot(snapshotDir);
+    Assert.assertTrue(nodeInfo.processTakeSnapshot(snapshotDir));
 
     NodeInfo nodeInfo1 = new NodeInfo();
     nodeInfo1.processLoadSnapshot(snapshotDir);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index 73b90d8fc60..f39dbae6a63 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -144,7 +144,7 @@ public class PartitionInfoTest {
 
     
partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan());
 
-    partitionInfo.processTakeSnapshot(snapshotDir);
+    Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir));
 
     PartitionInfo partitionInfo1 = new PartitionInfo();
     partitionInfo1.processLoadSnapshot(snapshotDir);

Reply via email to