This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-receiver-fsync-file in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 491448c90ae6302421ca1d2c6281bca1d87b50cd Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Nov 4 12:51:22 2024 +0800 refactor --- .../java/org/apache/iotdb/commons/conf/CommonConfig.java | 10 ++++++++++ .../org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++ .../org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++ .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 13 ++++++++++--- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 9f5721d18c3..cf33d9e98f1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -193,6 +193,8 @@ public class CommonConfig { private boolean pipeHardLinkWALEnabled = false; + private boolean pipeFileReceiverFsyncEnabled = true; + private int pipeRealTimeQueuePollHistoryThreshold = 100; /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ @@ -674,6 +676,14 @@ public class CommonConfig { this.pipeHardLinkWALEnabled = pipeHardLinkWALEnabled; } + public boolean getPipeFileReceiverFsyncEnabled() { + return pipeFileReceiverFsyncEnabled; + } + + public void setPipeFileReceiverFsyncEnabled(boolean pipeFileReceiverFsyncEnabled) { + this.pipeFileReceiverFsyncEnabled = pipeFileReceiverFsyncEnabled; + } + public int getPipeDataStructureTabletRowSize() { return pipeDataStructureTabletRowSize; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 872fad33550..c1294b91e02 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -266,6 +266,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_hardlink_wal_enabled", Boolean.toString(config.getPipeHardLinkWALEnabled())))); + config.setPipeFileReceiverFsyncEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_file_receiver_fsync_enabled", + Boolean.toString(config.getPipeFileReceiverFsyncEnabled())))); config.setPipeDataStructureTabletRowSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 373b21a5b1c..4f1d6da436d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -54,6 +54,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeHardLinkWALEnabled(); } + public boolean getPipeFileReceiverFsyncEnabled() { + return COMMON_CONFIG.getPipeFileReceiverFsyncEnabled(); + } + /////////////////////////////// Tablet /////////////////////////////// public int getPipeDataStructureTabletRowSize() { @@ -335,6 +339,7 @@ public class PipeConfig { LOGGER.info("PipeHardlinkTsFileDirName: {}", getPipeHardlinkTsFileDirName()); LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName()); LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled()); + LOGGER.info("PipeFileReceiverFsyncEnabled: {}", getPipeFileReceiverFsyncEnabled()); LOGGER.info("PipeDataStructureTabletRowSize: {}", getPipeDataStructureTabletRowSize()); LOGGER.info("PipeDataStructureTabletSizeInBytes: {}", getPipeDataStructureTabletSizeInBytes()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 5b7a8c22105..7ceac6c9c6e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; @@ -72,6 +73,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE; protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; + private static final boolean IS_FSYNC_ENABLED = + PipeConfig.getInstance().getPipeFileReceiverFsyncEnabled(); private File writingFile; private RandomAccessFile writingFileWriter; @@ -376,7 +379,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (writingFileWriter != null) { try { writingFileWriter.close(); - if (fsyncAfterClose) { + if (IS_FSYNC_ENABLED && fsyncAfterClose) { writingFileWriter.getFD().sync(); } LOGGER.info( @@ -482,7 +485,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // Sync here is necessary to ensure that the data is written to the disk. Or data region may // load the file before the data is written to the disk and cause unexpected behavior after // system restart. (e.g., empty file in data region's data directory) - writingFileWriter.getFD().sync(); + if (IS_FSYNC_ENABLED) { + writingFileWriter.getFD().sync(); + } writingFileWriter = null; // writingFile will be deleted after load if no exception occurs @@ -562,7 +567,9 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { // Sync here is necessary to ensure that the data is written to the disk. Or data region may // load the file before the data is written to the disk and cause unexpected behavior after // system restart. (e.g., empty file in data region's data directory) - writingFileWriter.getFD().sync(); + if (IS_FSYNC_ENABLED) { + writingFileWriter.getFD().sync(); + } writingFileWriter = null; // WritingFile will be deleted after load if no exception occurs
