This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new af56f90e26a Added fsync operation when persisting files (#11275)
af56f90e26a is described below
commit af56f90e26a4602eb33775738bf5c7753a566f24
Author: Zhijia Cao <[email protected]>
AuthorDate: Fri Oct 13 09:37:02 2023 +0800
Added fsync operation when persisting files (#11275)
---
.../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 5 ++
.../consensus/iot/IoTConsensusServerImpl.java | 32 ++++++----
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 8 ++-
.../execute/task/CrossSpaceCompactionTask.java | 2 +
.../execute/task/InnerSpaceCompactionTask.java | 3 +-
.../execute/utils/log/CompactionLogger.java | 34 +++++-----
.../dataregion/memtable/TsFileProcessor.java | 5 ++
.../io/LocalTextModificationAccessor.java | 36 ++++++-----
.../modification/io/ModificationWriter.java | 3 +
.../dataregion/snapshot/SnapshotLogger.java | 6 +-
.../dataregion/tsfile/TsFileResource.java | 73 ++++++++++++----------
.../tsfile/write/writer/LocalTsFileOutput.java | 6 ++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 7 +++
.../iotdb/tsfile/write/writer/TsFileOutput.java | 3 +
.../tsfile/write/writer/TestTsFileOutput.java | 5 ++
15 files changed, 149 insertions(+), 79 deletions(-)
diff --git
a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index de11c04a84c..aea0f72d792 100644
---
a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++
b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -101,4 +101,9 @@ public class HDFSOutput implements TsFileOutput {
fsDataOutputStream = fs.append(path);
}
}
+
+ @Override
+ public void force() throws IOException {
+ // do nothing
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 47aa7756344..66b7ccd976a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -59,7 +59,6 @@ import
org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
@@ -68,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -593,12 +593,8 @@ public class IoTConsensusServerImpl {
}
public void persistConfiguration() {
- try (PublicBAOS publicBAOS = new PublicBAOS();
- DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- serializeConfigurationTo(outputStream);
- Files.write(
- Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath()),
- publicBAOS.getBuf());
+ try {
+ serializeConfigurationAndFsyncToDisk(CONFIGURATION_FILE_NAME);
} catch (IOException e) {
// TODO: (xingtanzjr) need to handle the IOException because the
IoTConsensus won't
// work expectedly
@@ -608,15 +604,13 @@ public class IoTConsensusServerImpl {
}
public void persistConfigurationUpdate() throws
ConsensusGroupModifyPeerException {
- try (PublicBAOS publicBAOS = new PublicBAOS();
- DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- serializeConfigurationTo(outputStream);
+ try {
+ serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
Path tmpConfigurationPath =
Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
- Files.write(tmpConfigurationPath, publicBAOS.getBuf());
- Files.delete(configurationPath);
+ Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
@@ -819,6 +813,20 @@ public class IoTConsensusServerImpl {
return consensusGroupId;
}
+ private void serializeConfigurationAndFsyncToDisk(String
configurationFileName)
+ throws IOException {
+ FileOutputStream fileOutputStream =
+ new FileOutputStream(new File(storageDir, configurationFileName));
+ DataOutputStream outputStream = new DataOutputStream(fileOutputStream);
+ try {
+ serializeConfigurationTo(outputStream);
+ } finally {
+ fileOutputStream.flush();
+ fileOutputStream.getFD().sync();
+ outputStream.close();
+ }
+ }
+
/**
* This method is used for write of IoTConsensus SyncLog. By this method, we
can keep write order
* in follower the same as the leader. And besides order insurance, we can
make the
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index d8281bab5f5..053bfe68c3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -373,8 +373,8 @@ public class IoTDBStartCheck {
}
reloadProperties();
-
- try (FileOutputStream tmpFOS = new
FileOutputStream(tmpPropertiesFile.toString())) {
+ FileOutputStream tmpFOS = new
FileOutputStream(tmpPropertiesFile.toString());
+ try {
properties.setProperty(IoTDBConstant.CLUSTER_NAME, clusterName);
properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
@@ -382,6 +382,10 @@ public class IoTDBStartCheck {
if (propertiesFile.exists()) {
Files.delete(propertiesFile.toPath());
}
+ } finally {
+ tmpFOS.flush();
+ tmpFOS.getFD().sync();
+ tmpFOS.close();
}
// rename system.properties.tmp to system.properties
FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index c2e13051c77..4f9e23614ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -145,6 +145,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
compactionLogger.logFiles(selectedSequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
compactionLogger.logFiles(selectedUnsequenceFiles,
CompactionLogger.STR_SOURCE_FILES);
compactionLogger.logFiles(targetTsfileResourceList,
CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.force();
performer.setSourceFiles(selectedSequenceFiles,
selectedUnsequenceFiles);
performer.setTargetFiles(targetTsfileResourceList);
@@ -170,6 +171,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
for (TsFileResource targetResource : targetTsfileResourceList) {
if (targetResource.isDeleted()) {
compactionLogger.logFile(targetResource,
CompactionLogger.STR_DELETED_TARGET_FILES);
+ compactionLogger.force();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index d3bd3dbb746..ccde5bf6623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -164,7 +164,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
targetTsFileList = new
ArrayList<>(Collections.singletonList(targetTsFileResource));
compactionLogger.logFiles(selectedTsFileResourceList,
CompactionLogger.STR_SOURCE_FILES);
compactionLogger.logFiles(targetTsFileList,
CompactionLogger.STR_TARGET_FILES);
-
+ compactionLogger.force();
LOGGER.info(
"{}-{} [Compaction] compaction with {}",
storageGroupName,
@@ -216,6 +216,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if (targetTsFileResource.isDeleted()) {
compactionLogger.logFile(targetTsFileResource,
CompactionLogger.STR_DELETED_TARGET_FILES);
+ compactionLogger.force();
}
CompactionValidator validator = CompactionValidator.getInstance();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index a8eb3970b7e..9d02418b8ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -21,9 +21,8 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.lo
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
@@ -48,10 +47,10 @@ public class CompactionLogger implements AutoCloseable {
public static final String UNSEQUENCE_NAME_FROM_OLD = "unsequence";
public static final String STR_MERGE_START_FROM_OLD = "merge start";
- private BufferedWriter logStream;
+ private FileOutputStream logStream;
public CompactionLogger(File logFile) throws IOException {
- logStream = new BufferedWriter(new FileWriter(logFile, true));
+ logStream = new FileOutputStream(logFile, true);
}
@Override
@@ -61,24 +60,17 @@ public class CompactionLogger implements AutoCloseable {
public void logFiles(List<TsFileResource> tsFiles, String flag) throws
IOException {
for (TsFileResource tsFileResource : tsFiles) {
- logStream.write(
- flag
- + TsFileIdentifier.INFO_SEPARATOR
- + TsFileIdentifier.getFileIdentifierFromFilePath(
- tsFileResource.getTsFile().getAbsolutePath())
- .toString());
- logStream.newLine();
+ logFile(tsFileResource, flag);
}
- logStream.flush();
}
public void logFile(TsFileResource tsFile, String flag) throws IOException {
- logStream.write(
+ String log =
flag
+ TsFileIdentifier.INFO_SEPARATOR
- +
TsFileIdentifier.getFileIdentifierFromFilePath(tsFile.getTsFile().getAbsolutePath())
- .toString());
- logStream.newLine();
+ +
TsFileIdentifier.getFileIdentifierFromFilePath(tsFile.getTsFile().getAbsolutePath());
+ logStream.write(log.getBytes());
+ logStream.write(System.lineSeparator().getBytes());
logStream.flush();
}
@@ -92,4 +84,14 @@ public class CompactionLogger implements AutoCloseable {
return new File[0];
}
}
+
+ /**
+ * call system fsync function, make sure data flush to disk
+ *
+ * @throws IOException
+ */
+ public void force() throws IOException {
+ logStream.flush();
+ logStream.getFD().sync();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index eaf05b2313d..a1517fadace 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1245,6 +1245,11 @@ public class TsFileProcessor {
// for sync flush
syncReleaseFlushedMemTable(memTableToFlush);
+ try {
+ writer.getTsFileOutput().force();
+ } catch (IOException e) {
+ logger.error("fsync memTable data to disk error,", e);
+ }
// call flushed listener after memtable is released safely
for (FlushListener flushListener : flushListeners) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 242c6fd21e9..75c32adf5e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -57,7 +56,7 @@ public class LocalTextModificationAccessor
"No modification has been written to this file[{}]";
private final String filePath;
- private BufferedWriter writer;
+ private FileOutputStream fos;
/**
* Construct a LocalTextModificationAccessor using a file specified by
filePath.
@@ -150,37 +149,44 @@ public class LocalTextModificationAccessor
@Override
public void close() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
+ if (fos != null) {
+ fos.close();
+ fos = null;
}
}
+ @Override
+ public void force() throws IOException {
+ fos.flush();
+ fos.getFD().sync();
+ }
+
@Override
public void write(Modification mod) throws IOException {
- if (writer == null) {
- writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
+ if (fos == null) {
+ fos = new FileOutputStream(filePath, true);
}
- writer.write(encodeModification(mod));
- writer.newLine();
- writer.flush();
+ fos.write(encodeModification(mod).getBytes());
+ fos.write(System.lineSeparator().getBytes());
+ force();
}
@TestOnly
public void writeInComplete(Modification mod) throws IOException {
- if (writer == null) {
- writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
+ if (fos == null) {
+ fos = new FileOutputStream(filePath, true);
}
String line = encodeModification(mod);
if (line != null) {
- writer.write(line.substring(0, 2));
+ fos.write(line.substring(0, 2).getBytes());
+ force();
}
}
@TestOnly
public void writeMeetException(Modification mod) throws IOException {
- if (writer == null) {
- writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
+ if (fos == null) {
+ fos = new FileOutputStream(filePath, true);
}
writeInComplete(mod);
throw new IOException();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index d2213d7d8ca..314405c633c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -43,4 +43,7 @@ public interface ModificationWriter {
/** Release resources like streams. */
void close() throws IOException;
+
+ /** Make sure that the data in the buffer is flushed to disk */
+ void force() throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
index 05d5dead519..3c07e0926a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
@@ -35,6 +35,7 @@ public class SnapshotLogger implements AutoCloseable {
private File logFile;
private BufferedOutputStream os;
+ private FileOutputStream fos;
public SnapshotLogger(File logFile) throws IOException {
this.logFile = logFile;
@@ -44,11 +45,14 @@ public class SnapshotLogger implements AutoCloseable {
if (!this.logFile.createNewFile()) {
throw new IOException("Cannot create file " + logFile.getAbsolutePath());
}
- os = new BufferedOutputStream(new FileOutputStream(logFile));
+ fos = new FileOutputStream(logFile);
+ os = new BufferedOutputStream(fos);
}
@Override
public void close() throws Exception {
+ fos.flush();
+ fos.getFD().sync();
os.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index ac29a91e959..aceec04bdc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -52,10 +52,11 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -230,44 +231,52 @@ public class TsFileResource {
}
public synchronized void serialize() throws IOException {
- try (OutputStream outputStream =
- fsFactory.getBufferedOutputStream(file + RESOURCE_SUFFIX +
TEMP_SUFFIX)) {
- ReadWriteIOUtils.write(VERSION_NUMBER, outputStream);
- timeIndex.serialize(outputStream);
-
- ReadWriteIOUtils.write(maxPlanIndex, outputStream);
- ReadWriteIOUtils.write(minPlanIndex, outputStream);
-
- if (modFile != null && modFile.exists()) {
- String modFileName = new File(modFile.getFilePath()).getName();
- ReadWriteIOUtils.write(modFileName, outputStream);
- } else {
- // make the first "inputStream.available() > 0" in deserialize() happy.
- //
- // if modFile not exist, write null (-1). the first
"inputStream.available() > 0" in
- // deserialize() and deserializeFromOldFile() detect -1 and
deserialize modFileName as null
- // and skip the modFile deserialize.
- //
- // this make sure the first and the second "inputStream.available() >
0" in deserialize()
- // will always be called... which is a bit ugly but allows the
following variable
- // maxProgressIndex to be deserialized correctly.
- ReadWriteIOUtils.write((String) null, outputStream);
- }
-
- if (maxProgressIndex != null) {
- ReadWriteIOUtils.write(true, outputStream);
- maxProgressIndex.serialize(outputStream);
- } else {
- ReadWriteIOUtils.write(false, outputStream);
- }
+ FileOutputStream fileOutputStream = new FileOutputStream(file +
RESOURCE_SUFFIX + TEMP_SUFFIX);
+ BufferedOutputStream outputStream = new
BufferedOutputStream(fileOutputStream);
+ try {
+ serializeTo(outputStream);
+ } finally {
+ outputStream.flush();
+ fileOutputStream.getFD().sync();
+ outputStream.close();
}
-
File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
fsFactory.deleteIfExists(dest);
fsFactory.moveFile(src, dest);
}
+ private void serializeTo(BufferedOutputStream outputStream) throws
IOException {
+ ReadWriteIOUtils.write(VERSION_NUMBER, outputStream);
+ timeIndex.serialize(outputStream);
+
+ ReadWriteIOUtils.write(maxPlanIndex, outputStream);
+ ReadWriteIOUtils.write(minPlanIndex, outputStream);
+
+ if (modFile != null && modFile.exists()) {
+ String modFileName = new File(modFile.getFilePath()).getName();
+ ReadWriteIOUtils.write(modFileName, outputStream);
+ } else {
+ // make the first "inputStream.available() > 0" in deserialize() happy.
+ //
+ // if modFile not exist, write null (-1). the first
"inputStream.available() > 0" in
+ // deserialize() and deserializeFromOldFile() detect -1 and deserialize
modFileName as null
+ // and skip the modFile deserialize.
+ //
+ // this make sure the first and the second "inputStream.available() > 0"
in deserialize()
+ // will always be called... which is a bit ugly but allows the following
variable
+ // maxProgressIndex to be deserialized correctly.
+ ReadWriteIOUtils.write((String) null, outputStream);
+ }
+
+ if (maxProgressIndex != null) {
+ ReadWriteIOUtils.write(true, outputStream);
+ maxProgressIndex.serialize(outputStream);
+ } else {
+ ReadWriteIOUtils.write(false, outputStream);
+ }
+ }
+
/** deserialize from disk */
public void deserialize() throws IOException {
try (InputStream inputStream = fsFactory.getBufferedInputStream(file +
RESOURCE_SUFFIX)) {
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index e69915fabed..af0b19bb9aa 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -97,4 +97,10 @@ public class LocalTsFileOutput extends OutputStream
implements TsFileOutput {
outputStream.getChannel().truncate(size);
position = outputStream.getChannel().position();
}
+
+ @Override
+ public void force() throws IOException {
+ flush();
+ outputStream.getFD().sync();
+ }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6a6a29221c2..777f481d568 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -333,8 +333,11 @@ public class TsFileIOWriter implements AutoCloseable {
// write magic string
out.write(MAGIC_STRING_BYTES);
+ // flush page cache data to disk
+ out.force();
// close file
out.close();
+
if (resourceLogger.isDebugEnabled() && file != null) {
resourceLogger.debug("{} writer is closed.", file.getName());
}
@@ -733,4 +736,8 @@ public class TsFileIOWriter implements AutoCloseable {
public void flush() throws IOException {
out.flush();
}
+
+ public TsFileOutput getTsFileOutput() {
+ return this.out;
+ }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5e0b43349ae..5088e0c5344 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -88,4 +88,7 @@ public interface TsFileOutput {
* @param size size The new size, a non-negative byte count
*/
void truncate(long size) throws IOException;
+
+ /** Make sure that the data in the buffer is flushed to disk */
+ void force() throws IOException;
}
diff --git
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
index f61541e42b1..0b342206c3d 100644
---
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
+++
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
@@ -67,4 +67,9 @@ public class TestTsFileOutput implements TsFileOutput {
public void truncate(long size) {
publicBAOS.truncate((int) size);
}
+
+ @Override
+ public void force() {
+ // do nothing
+ }
}