This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-receiver-fsync-file
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96383eacfb663ae6ce858dd04dc70e068e09f308
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Nov 4 12:39:59 2024 +0800

    Update PipeConsensusReceiver.java
---
 .../protocol/pipeconsensus/PipeConsensusReceiver.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 0a8b04dbef7..3721c0277a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -487,7 +487,7 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter);
+      closeCurrentWritingFileWriter(tsFileWriter, false);
       deleteCurrentWritingFile(tsFileWriter);
     }
   }
@@ -550,6 +550,10 @@ public class PipeConsensusReceiver {
       // loaded file. Since the writing file writer has already been closed, 
it will throw a Stream
       // Close exception.
       writingFileWriter.close();
+      // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
+      // load the file before the data is written to the disk and cause 
unexpected behavior after
+      // system restart. (e.g., empty file in data region's data directory)
+      writingFileWriter.getFD().sync();
       tsFileWriter.setWritingFileWriter(null);
 
       // WritingFile will be deleted after load if no exception occurs
@@ -599,7 +603,7 @@ public class PipeConsensusReceiver {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
       // sender.
-      closeCurrentWritingFileWriter(tsFileWriter);
+      closeCurrentWritingFileWriter(tsFileWriter, false);
       // Clear the directory instead of only deleting the referenced files in 
seal request
       // to avoid previously undeleted file being redundant when transferring 
multi files
       
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
@@ -794,10 +798,14 @@ public class PipeConsensusReceiver {
     return !offsetCorrect;
   }
 
-  private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter 
tsFileWriter) {
+  private void closeCurrentWritingFileWriter(
+      PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) {
     if (tsFileWriter.getWritingFileWriter() != null) {
       try {
         tsFileWriter.getWritingFileWriter().close();
+        if (fsyncAfterClose) {
+          tsFileWriter.getWritingFileWriter().getFD().sync();
+        }
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Current writing file writer {} was 
closed.",
             consensusPipeName,
@@ -878,7 +886,7 @@ public class PipeConsensusReceiver {
         fileName,
         tsFileWriter.getWritingFile() == null ? "null" : 
tsFileWriter.getWritingFile().getPath());
 
-    closeCurrentWritingFileWriter(tsFileWriter);
+    closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile);
     // If there are multiple files we can not delete the current file
     // instead they will be deleted after seal request
     if (tsFileWriter.getWritingFile() != null && isSingleFile) {

Reply via email to