This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch 4cee8227e75 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8c0b8250ceed708f1ccff638af3fdfbceff6695 Author: Yongzao <[email protected]> AuthorDate: Thu Oct 12 18:40:36 2023 +0800 [IOTDB-6182] Add fsync in ConfigNode persistence modules (#11264) --- .../confignode/conf/SystemPropertiesUtils.java | 1 + .../confignode/persistence/node/NodeInfo.java | 7 +++--- .../persistence/partition/PartitionInfo.java | 29 +++++++++++++--------- .../confignode/procedure/store/ProcedureWAL.java | 3 +++ .../iotdb/confignode/persistence/NodeInfoTest.java | 2 +- .../confignode/persistence/PartitionInfoTest.java | 2 +- 6 files changed, 27 insertions(+), 17 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 45d7ba0d672..0f7b9e4f85f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -404,6 +404,7 @@ public class SystemPropertiesUtils { systemProperties.store( fileOutputStream, " THIS FILE IS AUTOMATICALLY GENERATED. PLEASE DO NOT MODIFY THIS FILE !!!"); + fileOutputStream.getFD().sync(); } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java index aa5e0ed29c3..c241d88a901 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java @@ -458,12 +458,13 @@ public class NodeInfo implements SnapshotProcessor { serializeVersionInfo(fileOutputStream); - fileOutputStream.flush(); + tioStreamTransport.flush(); + fileOutputStream.getFD().sync(); - fileOutputStream.close(); + // The tmpFile can be renamed only after the stream is closed + tioStreamTransport.close(); return tmpFile.renameTo(snapshotFile); - } finally { versionInfoReadWriteLock.readLock().unlock(); dataNodeInfoReadWriteLock.readLock().unlock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index d61396a4c7a..710c4b91ebf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; @@ -849,32 +850,36 @@ public class PartitionInfo implements SnapshotProcessor { // snapshot operation. File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID()); - try (BufferedOutputStream fileOutputStream = - new BufferedOutputStream( - Files.newOutputStream(tmpFile.toPath()), PARTITION_TABLE_BUFFER_SIZE); - TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) { + try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile); + BufferedOutputStream bufferedOutputStream = + new BufferedOutputStream(fileOutputStream, PARTITION_TABLE_BUFFER_SIZE); + TIOStreamTransport tioStreamTransport = new TIOStreamTransport(bufferedOutputStream)) { TProtocol protocol = new TBinaryProtocol(tioStreamTransport); // serialize nextRegionGroupId - ReadWriteIOUtils.write(nextRegionGroupId.get(), fileOutputStream); + ReadWriteIOUtils.write(nextRegionGroupId.get(), bufferedOutputStream); // serialize StorageGroupPartitionTable - ReadWriteIOUtils.write(databasePartitionTables.size(), fileOutputStream); + ReadWriteIOUtils.write(databasePartitionTables.size(), bufferedOutputStream); for (Map.Entry<String, DatabasePartitionTable> storageGroupPartitionTableEntry : databasePartitionTables.entrySet()) { - ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), fileOutputStream); - storageGroupPartitionTableEntry.getValue().serialize(fileOutputStream, protocol); + ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(), bufferedOutputStream); + storageGroupPartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol); } // serialize regionCleanList - ReadWriteIOUtils.write(regionMaintainTaskList.size(), fileOutputStream); + ReadWriteIOUtils.write(regionMaintainTaskList.size(), bufferedOutputStream); for (RegionMaintainTask task : regionMaintainTaskList) { - task.serialize(fileOutputStream, protocol); + task.serialize(bufferedOutputStream, protocol); } // write to file - fileOutputStream.flush(); - fileOutputStream.close(); + tioStreamTransport.flush(); + fileOutputStream.getFD().sync(); + + // The tmpFile can be renamed only after the stream is closed + tioStreamTransport.close(); + // rename file return tmpFile.renameTo(snapshotFile); } finally { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java index 3f1b4eba687..d4638dd0b23 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java @@ -67,6 +67,9 @@ public class ProcedureWAL { DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS)) { procedure.serialize(dataOutputStream); channel.write(ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size())); + + channel.force(true); + fos.getFD().sync(); } Files.deleteIfExists(walFilePath); Files.move(walTmpPath, walFilePath); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java index dbeba88bd0f..959eeba114d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java @@ -65,7 +65,7 @@ public class NodeInfoTest { public void testSnapshot() throws TException, IOException { registerConfigNodes(); registerDataNodes(); - nodeInfo.processTakeSnapshot(snapshotDir); + Assert.assertTrue(nodeInfo.processTakeSnapshot(snapshotDir)); NodeInfo nodeInfo1 = new NodeInfo(); nodeInfo1.processLoadSnapshot(snapshotDir); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index 73b90d8fc60..f39dbae6a63 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -144,7 +144,7 @@ public class PartitionInfoTest { partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan()); - partitionInfo.processTakeSnapshot(snapshotDir); + Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir)); PartitionInfo partitionInfo1 = new PartitionInfo(); partitionInfo1.processLoadSnapshot(snapshotDir);
