This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 161e3a7b175 Pipe: add fsync ops for PipePluginInfo, PipeTaskInfo, 
IoTDBThriftReceiverV1, IoTDBThriftReceiverV1 and ExecutableManager (#11351)
161e3a7b175 is described below

commit 161e3a7b175a23bd09c6b158fa028a6acf044068
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Oct 24 14:33:22 2023 +0800

    Pipe: add fsync ops for PipePluginInfo, PipeTaskInfo, 
IoTDBThriftReceiverV1, IoTDBThriftReceiverV1 and ExecutableManager (#11351)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java    | 1 +
 .../java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 1 +
 .../iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java     | 1 +
 .../org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java | 2 ++
 .../java/org/apache/iotdb/commons/executable/ExecutableManager.java     | 1 +
 5 files changed, 6 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 7854f1df8be..bdc30ddd683 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -254,6 +254,7 @@ public class PipePluginInfo implements SnapshotProcessor {
 
       try (final FileOutputStream fileOutputStream = new 
FileOutputStream(snapshotFile)) {
         pipePluginMetaKeeper.processTakeSnapshot(fileOutputStream);
+        fileOutputStream.getFD().sync();
       }
       return true;
     } finally {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 615f518221d..4fa9de14deb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -624,6 +624,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
       try (final FileOutputStream fileOutputStream = new 
FileOutputStream(snapshotFile)) {
         pipeMetaKeeper.processTakeSnapshot(fileOutputStream);
+        fileOutputStream.getFD().sync();
       }
       return true;
     } finally {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index 19dc3d51781..4b1c659927f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -290,6 +290,7 @@ public class IoTDBLegacyPipeReceiverAgent {
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
       randomAccessFile.write(byteArray);
+      randomAccessFile.getFD().sync();
       recordStartIndex(new File(fileDir, fileName), startIndex + length);
       LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex, 
startIndex + length);
     } catch (IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 6d8253e0a40..cb0a0de3bb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -307,6 +307,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       }
 
       writingFileWriter.write(req.getFilePiece());
+      writingFileWriter.getFD().sync();
       return PipeTransferFilePieceResp.toTPipeTransferResp(
           RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
     } catch (Exception e) {
@@ -461,6 +462,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue 
to write to the already
       // loaded file. Since the writing file writer has already been closed, 
it will throw a Stream
       // Close exception.
+      writingFileWriter.getFD().sync();
       writingFileWriter.close();
       writingFileWriter = null;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4a730d2a5fa..912a49f9db4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -237,6 +237,7 @@ public class ExecutableManager {
       // already exists.
       try (FileOutputStream outputStream = new FileOutputStream(destination)) {
         outputStream.getChannel().write(byteBuffer);
+        outputStream.getFD().sync();
       }
     } catch (IOException e) {
       LOGGER.warn(

Reply via email to