This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-receiver-fsync-file
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 491448c90ae6302421ca1d2c6281bca1d87b50cd
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Nov 4 12:51:22 2024 +0800

    refactor
---
 .../java/org/apache/iotdb/commons/conf/CommonConfig.java    | 10 ++++++++++
 .../org/apache/iotdb/commons/conf/CommonDescriptor.java     |  5 +++++
 .../org/apache/iotdb/commons/pipe/config/PipeConfig.java    |  5 +++++
 .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java      | 13 ++++++++++---
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9f5721d18c3..cf33d9e98f1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -193,6 +193,8 @@ public class CommonConfig {
 
   private boolean pipeHardLinkWALEnabled = false;
 
+  private boolean pipeFileReceiverFsyncEnabled = true;
+
   private int pipeRealTimeQueuePollHistoryThreshold = 100;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
@@ -674,6 +676,14 @@ public class CommonConfig {
     this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled;
   }
 
+  public boolean getPipeFileReceiverFsyncEnabled() {
+    return pipeFileReceiverFsyncEnabled;
+  }
+
+  public void setPipeFileReceiverFsyncEnabled(boolean 
pipeFileReceiverFsyncEnabled) {
+    this.pipeFileReceiverFsyncEnabled = pipeFileReceiverFsyncEnabled;
+  }
+
   public int getPipeDataStructureTabletRowSize() {
     return pipeDataStructureTabletRowSize;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 872fad33550..c1294b91e02 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -266,6 +266,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_hardlink_wal_enabled",
                 Boolean.toString(config.getPipeHardLinkWALEnabled()))));
+    config.setPipeFileReceiverFsyncEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_file_receiver_fsync_enabled",
+                Boolean.toString(config.getPipeFileReceiverFsyncEnabled()))));
 
     config.setPipeDataStructureTabletRowSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 373b21a5b1c..4f1d6da436d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -54,6 +54,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeHardLinkWALEnabled();
   }
 
+  public boolean getPipeFileReceiverFsyncEnabled() {
+    return COMMON_CONFIG.getPipeFileReceiverFsyncEnabled();
+  }
+
   /////////////////////////////// Tablet ///////////////////////////////
 
   public int getPipeDataStructureTabletRowSize() {
@@ -335,6 +339,7 @@ public class PipeConfig {
     LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
     LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
     LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
+    LOGGER.info("PipeFileReceiverFsyncEnabled: {}", 
getPipeFileReceiverFsyncEnabled());
 
     LOGGER.info("PipeDataStructureTabletRowSize: {}", 
getPipeDataStructureTabletRowSize());
     LOGGER.info("PipeDataStructureTabletSizeInBytes: {}", 
getPipeDataStructureTabletSizeInBytes());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 5b7a8c22105..7ceac6c9c6e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
@@ -72,6 +73,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
   protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 
+  private static final boolean IS_FSYNC_ENABLED =
+      PipeConfig.getInstance().getPipeFileReceiverFsyncEnabled();
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
@@ -376,7 +379,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (writingFileWriter != null) {
       try {
         writingFileWriter.close();
-        if (fsyncAfterClose) {
+        if (IS_FSYNC_ENABLED && fsyncAfterClose) {
           writingFileWriter.getFD().sync();
         }
         LOGGER.info(
@@ -482,7 +485,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
       // load the file before the data is written to the disk and cause 
unexpected behavior after
       // system restart. (e.g., empty file in data region's data directory)
-      writingFileWriter.getFD().sync();
+      if (IS_FSYNC_ENABLED) {
+        writingFileWriter.getFD().sync();
+      }
       writingFileWriter = null;
 
       // writingFile will be deleted after load if no exception occurs
@@ -562,7 +567,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
       // load the file before the data is written to the disk and cause 
unexpected behavior after
       // system restart. (e.g., empty file in data region's data directory)
-      writingFileWriter.getFD().sync();
+      if (IS_FSYNC_ENABLED) {
+        writingFileWriter.getFD().sync();
+      }
       writingFileWriter = null;
 
       // WritingFile will be deleted after load if no exception occurs

Reply via email to