This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-receiver-fsync-file in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 96383eacfb663ae6ce858dd04dc70e068e09f308 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Nov 4 12:39:59 2024 +0800 Update PipeConsensusReceiver.java --- .../protocol/pipeconsensus/PipeConsensusReceiver.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 0a8b04dbef7..3721c0277a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -487,7 +487,7 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod (if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, false); deleteCurrentWritingFile(tsFileWriter); } } @@ -550,6 +550,10 @@ public class PipeConsensusReceiver { // loaded file. Since the writing file writer has already been closed, it will throw a Stream // Close exception. writingFileWriter.close(); + // Sync here is necessary to ensure that the data is written to the disk. Or data region may + // load the file before the data is written to the disk and cause unexpected behavior after + // system restart. (e.g., empty file in data region's data directory) + writingFileWriter.getFD().sync(); tsFileWriter.setWritingFileWriter(null); // WritingFile will be deleted after load if no exception occurs @@ -599,7 +603,7 @@ public class PipeConsensusReceiver { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the // sender. - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, false); // Clear the directory instead of only deleting the referenced files in seal request // to avoid previously undeleted file being redundant when transferring multi files IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get()); @@ -794,10 +798,14 @@ public class PipeConsensusReceiver { return !offsetCorrect; } - private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter tsFileWriter) { + private void closeCurrentWritingFileWriter( + PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) { if (tsFileWriter.getWritingFileWriter() != null) { try { tsFileWriter.getWritingFileWriter().close(); + if (fsyncAfterClose) { + tsFileWriter.getWritingFileWriter().getFD().sync(); + } LOGGER.info( "PipeConsensus-PipeName-{}: Current writing file writer {} was closed.", consensusPipeName, @@ -878,7 +886,7 @@ public class PipeConsensusReceiver { fileName, tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath()); - closeCurrentWritingFileWriter(tsFileWriter); + closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile); // If there are multiple files we can not delete the current file // instead they will be deleted after seal request if (tsFileWriter.getWritingFile() != null && isSingleFile) {
