This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 161e3a7b175 Pipe: add fsync ops for PipePluginInfo, PipeTaskInfo,
IoTDBThriftReceiverV1, IoTDBThriftReceiverV1 and ExecutableManager (#11351)
161e3a7b175 is described below
commit 161e3a7b175a23bd09c6b158fa028a6acf044068
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Oct 24 14:33:22 2023 +0800
Pipe: add fsync ops for PipePluginInfo, PipeTaskInfo,
IoTDBThriftReceiverV1, IoTDBThriftReceiverV1 and ExecutableManager (#11351)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java | 1 +
.../java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 1 +
.../iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java | 1 +
.../org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java | 2 ++
.../java/org/apache/iotdb/commons/executable/ExecutableManager.java | 1 +
5 files changed, 6 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 7854f1df8be..bdc30ddd683 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -254,6 +254,7 @@ public class PipePluginInfo implements SnapshotProcessor {
try (final FileOutputStream fileOutputStream = new
FileOutputStream(snapshotFile)) {
pipePluginMetaKeeper.processTakeSnapshot(fileOutputStream);
+ fileOutputStream.getFD().sync();
}
return true;
} finally {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 615f518221d..4fa9de14deb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -624,6 +624,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
try (final FileOutputStream fileOutputStream = new
FileOutputStream(snapshotFile)) {
pipeMetaKeeper.processTakeSnapshot(fileOutputStream);
+ fileOutputStream.getFD().sync();
}
return true;
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index 19dc3d51781..4b1c659927f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -290,6 +290,7 @@ public class IoTDBLegacyPipeReceiverAgent {
byte[] byteArray = new byte[length];
buff.get(byteArray);
randomAccessFile.write(byteArray);
+ randomAccessFile.getFD().sync();
recordStartIndex(new File(fileDir, fileName), startIndex + length);
LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex,
startIndex + length);
} catch (IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 6d8253e0a40..cb0a0de3bb5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -307,6 +307,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
}
writingFileWriter.write(req.getFilePiece());
+ writingFileWriter.getFD().sync();
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (Exception e) {
@@ -461,6 +462,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
// updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue
to write to the already
// loaded file. Since the writing file writer has already been closed,
it will throw a Stream
// Close exception.
+ writingFileWriter.getFD().sync();
writingFileWriter.close();
writingFileWriter = null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4a730d2a5fa..912a49f9db4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -237,6 +237,7 @@ public class ExecutableManager {
// already exists.
try (FileOutputStream outputStream = new FileOutputStream(destination)) {
outputStream.getChannel().write(byteBuffer);
+ outputStream.getFD().sync();
}
} catch (IOException e) {
LOGGER.warn(