This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-receiver-fsync-file in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 207258190cade2548139923bb933ab09652f52b6 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Nov 4 11:57:41 2024 +0800 Update IoTDBFileReceiver.java --- .../commons/pipe/receiver/IoTDBFileReceiver.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 59bcc06c45c..5b7a8c22105 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -337,7 +337,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { fileName, writingFile == null ? "null" : writingFile.getPath()); - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(!isSingleFile); // If there are multiple files we can not delete the current file // instead they will be deleted after seal request if (writingFile != null && isSingleFile) { @@ -372,10 +372,13 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { return writingFile != null && writingFile.getName().equals(fileName); } - private void closeCurrentWritingFileWriter() { + private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) { if (writingFileWriter != null) { try { writingFileWriter.close(); + if (fsyncAfterClose) { + writingFileWriter.getFD().sync(); + } LOGGER.info( "Receiver id = {}: Current writing file writer {} was closed.", receiverId.get(), @@ -476,6 +479,10 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // loaded file. Since the writing file writer has already been closed, it will throw a Stream // Close exception. writingFileWriter.close(); + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + writingFileWriter.getFD().sync(); writingFileWriter = null; // writingFile will be deleted after load if no exception occurs @@ -508,7 +515,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(false); deleteCurrentWritingFile(); } } @@ -552,6 +559,10 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // loaded file. Since the writing file writer has already been closed, it will throw a Stream // Close exception. writingFileWriter.close(); + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + writingFileWriter.getFD().sync(); writingFileWriter = null; // WritingFile will be deleted after load if no exception occurs @@ -583,7 +594,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(); + closeCurrentWritingFileWriter(false); // Clear the directory instead of only deleting the referenced files in seal request // to avoid previously undeleted file being redundant when transferring multi files IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
