This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new af56f90e26a Added fsync operation when persisting files (#11275)
af56f90e26a is described below

commit af56f90e26a4602eb33775738bf5c7753a566f24
Author: Zhijia Cao <[email protected]>
AuthorDate: Fri Oct 13 09:37:02 2023 +0800

    Added fsync operation when persisting files (#11275)
---
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java |  5 ++
 .../consensus/iot/IoTConsensusServerImpl.java      | 32 ++++++----
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |  8 ++-
 .../execute/task/CrossSpaceCompactionTask.java     |  2 +
 .../execute/task/InnerSpaceCompactionTask.java     |  3 +-
 .../execute/utils/log/CompactionLogger.java        | 34 +++++-----
 .../dataregion/memtable/TsFileProcessor.java       |  5 ++
 .../io/LocalTextModificationAccessor.java          | 36 ++++++-----
 .../modification/io/ModificationWriter.java        |  3 +
 .../dataregion/snapshot/SnapshotLogger.java        |  6 +-
 .../dataregion/tsfile/TsFileResource.java          | 73 ++++++++++++----------
 .../tsfile/write/writer/LocalTsFileOutput.java     |  6 ++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  7 +++
 .../iotdb/tsfile/write/writer/TsFileOutput.java    |  3 +
 .../tsfile/write/writer/TestTsFileOutput.java      |  5 ++
 15 files changed, 149 insertions(+), 79 deletions(-)

diff --git 
a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
 
b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
index de11c04a84c..aea0f72d792 100644
--- 
a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
+++ 
b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java
@@ -101,4 +101,9 @@ public class HDFSOutput implements TsFileOutput {
       fsDataOutputStream = fs.append(path);
     }
   }
+
+  @Override
+  public void force() throws IOException {
+    // do nothing
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 47aa7756344..66b7ccd976a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -59,7 +59,6 @@ import 
org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
 import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
@@ -68,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -593,12 +593,8 @@ public class IoTConsensusServerImpl {
   }
 
   public void persistConfiguration() {
-    try (PublicBAOS publicBAOS = new PublicBAOS();
-        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      serializeConfigurationTo(outputStream);
-      Files.write(
-          Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath()),
-          publicBAOS.getBuf());
+    try {
+      serializeConfigurationAndFsyncToDisk(CONFIGURATION_FILE_NAME);
     } catch (IOException e) {
       // TODO: (xingtanzjr) need to handle the IOException because the 
IoTConsensus won't
       // work expectedly
@@ -608,15 +604,13 @@ public class IoTConsensusServerImpl {
   }
 
   public void persistConfigurationUpdate() throws 
ConsensusGroupModifyPeerException {
-    try (PublicBAOS publicBAOS = new PublicBAOS();
-        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      serializeConfigurationTo(outputStream);
+    try {
+      serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
       Path tmpConfigurationPath =
           Paths.get(new File(storageDir, 
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
       Path configurationPath =
           Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath());
-      Files.write(tmpConfigurationPath, publicBAOS.getBuf());
-      Files.delete(configurationPath);
+      Files.deleteIfExists(configurationPath);
       Files.move(tmpConfigurationPath, configurationPath);
     } catch (IOException e) {
       throw new ConsensusGroupModifyPeerException(
@@ -819,6 +813,20 @@ public class IoTConsensusServerImpl {
     return consensusGroupId;
   }
 
+  private void serializeConfigurationAndFsyncToDisk(String 
configurationFileName)
+      throws IOException {
+    FileOutputStream fileOutputStream =
+        new FileOutputStream(new File(storageDir, configurationFileName));
+    DataOutputStream outputStream = new DataOutputStream(fileOutputStream);
+    try {
+      serializeConfigurationTo(outputStream);
+    } finally {
+      fileOutputStream.flush();
+      fileOutputStream.getFD().sync();
+      outputStream.close();
+    }
+  }
+
   /**
    * This method is used for write of IoTConsensus SyncLog. By this method, we 
can keep write order
    * in follower the same as the leader. And besides order insurance, we can 
make the
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index d8281bab5f5..053bfe68c3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -373,8 +373,8 @@ public class IoTDBStartCheck {
     }
 
     reloadProperties();
-
-    try (FileOutputStream tmpFOS = new 
FileOutputStream(tmpPropertiesFile.toString())) {
+    FileOutputStream tmpFOS = new 
FileOutputStream(tmpPropertiesFile.toString());
+    try {
       properties.setProperty(IoTDBConstant.CLUSTER_NAME, clusterName);
       properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
       properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
@@ -382,6 +382,10 @@ public class IoTDBStartCheck {
       if (propertiesFile.exists()) {
         Files.delete(propertiesFile.toPath());
       }
+    } finally {
+      tmpFOS.flush();
+      tmpFOS.getFD().sync();
+      tmpFOS.close();
     }
     // rename system.properties.tmp to system.properties
     FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index c2e13051c77..4f9e23614ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -145,6 +145,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         compactionLogger.logFiles(selectedSequenceFiles, 
CompactionLogger.STR_SOURCE_FILES);
         compactionLogger.logFiles(selectedUnsequenceFiles, 
CompactionLogger.STR_SOURCE_FILES);
         compactionLogger.logFiles(targetTsfileResourceList, 
CompactionLogger.STR_TARGET_FILES);
+        compactionLogger.force();
 
         performer.setSourceFiles(selectedSequenceFiles, 
selectedUnsequenceFiles);
         performer.setTargetFiles(targetTsfileResourceList);
@@ -170,6 +171,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         for (TsFileResource targetResource : targetTsfileResourceList) {
           if (targetResource.isDeleted()) {
             compactionLogger.logFile(targetResource, 
CompactionLogger.STR_DELETED_TARGET_FILES);
+            compactionLogger.force();
           }
         }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index d3bd3dbb746..ccde5bf6623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -164,7 +164,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
         compactionLogger.logFiles(selectedTsFileResourceList, 
CompactionLogger.STR_SOURCE_FILES);
         compactionLogger.logFiles(targetTsFileList, 
CompactionLogger.STR_TARGET_FILES);
-
+        compactionLogger.force();
         LOGGER.info(
             "{}-{} [Compaction] compaction with {}",
             storageGroupName,
@@ -216,6 +216,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
 
         if (targetTsFileResource.isDeleted()) {
           compactionLogger.logFile(targetTsFileResource, 
CompactionLogger.STR_DELETED_TARGET_FILES);
+          compactionLogger.force();
         }
 
         CompactionValidator validator = CompactionValidator.getInstance();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
index a8eb3970b7e..9d02418b8ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/CompactionLogger.java
@@ -21,9 +21,8 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.lo
 
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
@@ -48,10 +47,10 @@ public class CompactionLogger implements AutoCloseable {
   public static final String UNSEQUENCE_NAME_FROM_OLD = "unsequence";
   public static final String STR_MERGE_START_FROM_OLD = "merge start";
 
-  private BufferedWriter logStream;
+  private FileOutputStream logStream;
 
   public CompactionLogger(File logFile) throws IOException {
-    logStream = new BufferedWriter(new FileWriter(logFile, true));
+    logStream = new FileOutputStream(logFile, true);
   }
 
   @Override
@@ -61,24 +60,17 @@ public class CompactionLogger implements AutoCloseable {
 
   public void logFiles(List<TsFileResource> tsFiles, String flag) throws 
IOException {
     for (TsFileResource tsFileResource : tsFiles) {
-      logStream.write(
-          flag
-              + TsFileIdentifier.INFO_SEPARATOR
-              + TsFileIdentifier.getFileIdentifierFromFilePath(
-                      tsFileResource.getTsFile().getAbsolutePath())
-                  .toString());
-      logStream.newLine();
+      logFile(tsFileResource, flag);
     }
-    logStream.flush();
   }
 
   public void logFile(TsFileResource tsFile, String flag) throws IOException {
-    logStream.write(
+    String log =
         flag
             + TsFileIdentifier.INFO_SEPARATOR
-            + 
TsFileIdentifier.getFileIdentifierFromFilePath(tsFile.getTsFile().getAbsolutePath())
-                .toString());
-    logStream.newLine();
+            + 
TsFileIdentifier.getFileIdentifierFromFilePath(tsFile.getTsFile().getAbsolutePath());
+    logStream.write(log.getBytes());
+    logStream.write(System.lineSeparator().getBytes());
     logStream.flush();
   }
 
@@ -92,4 +84,14 @@ public class CompactionLogger implements AutoCloseable {
       return new File[0];
     }
   }
+
+  /**
+   * call system fsync function, make sure data flush to disk
+   *
+   * @throws IOException
+   */
+  public void force() throws IOException {
+    logStream.flush();
+    logStream.getFD().sync();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index eaf05b2313d..a1517fadace 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1245,6 +1245,11 @@ public class TsFileProcessor {
 
     // for sync flush
     syncReleaseFlushedMemTable(memTableToFlush);
+    try {
+      writer.getTsFileOutput().force();
+    } catch (IOException e) {
+      logger.error("fsync memTable data to disk error,", e);
+    }
 
     // call flushed listener after memtable is released safely
     for (FlushListener flushListener : flushListeners) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 242c6fd21e9..75c32adf5e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -57,7 +56,7 @@ public class LocalTextModificationAccessor
       "No modification has been written to this file[{}]";
 
   private final String filePath;
-  private BufferedWriter writer;
+  private FileOutputStream fos;
 
   /**
    * Construct a LocalTextModificationAccessor using a file specified by 
filePath.
@@ -150,37 +149,44 @@ public class LocalTextModificationAccessor
 
   @Override
   public void close() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
+    if (fos != null) {
+      fos.close();
+      fos = null;
     }
   }
 
+  @Override
+  public void force() throws IOException {
+    fos.flush();
+    fos.getFD().sync();
+  }
+
   @Override
   public void write(Modification mod) throws IOException {
-    if (writer == null) {
-      writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
+    if (fos == null) {
+      fos = new FileOutputStream(filePath, true);
     }
-    writer.write(encodeModification(mod));
-    writer.newLine();
-    writer.flush();
+    fos.write(encodeModification(mod).getBytes());
+    fos.write(System.lineSeparator().getBytes());
+    force();
   }
 
   @TestOnly
   public void writeInComplete(Modification mod) throws IOException {
-    if (writer == null) {
-      writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
+    if (fos == null) {
+      fos = new FileOutputStream(filePath, true);
     }
     String line = encodeModification(mod);
     if (line != null) {
-      writer.write(line.substring(0, 2));
+      fos.write(line.substring(0, 2).getBytes());
+      force();
     }
   }
 
   @TestOnly
   public void writeMeetException(Modification mod) throws IOException {
-    if (writer == null) {
-      writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
+    if (fos == null) {
+      fos = new FileOutputStream(filePath, true);
     }
     writeInComplete(mod);
     throw new IOException();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index d2213d7d8ca..314405c633c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -43,4 +43,7 @@ public interface ModificationWriter {
 
   /** Release resources like streams. */
   void close() throws IOException;
+
+  /** Make sure that the data in the buffer is flushed to disk */
+  void force() throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
index 05d5dead519..3c07e0926a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLogger.java
@@ -35,6 +35,7 @@ public class SnapshotLogger implements AutoCloseable {
 
   private File logFile;
   private BufferedOutputStream os;
+  private FileOutputStream fos;
 
   public SnapshotLogger(File logFile) throws IOException {
     this.logFile = logFile;
@@ -44,11 +45,14 @@ public class SnapshotLogger implements AutoCloseable {
     if (!this.logFile.createNewFile()) {
       throw new IOException("Cannot create file " + logFile.getAbsolutePath());
     }
-    os = new BufferedOutputStream(new FileOutputStream(logFile));
+    fos = new FileOutputStream(logFile);
+    os = new BufferedOutputStream(fos);
   }
 
   @Override
   public void close() throws Exception {
+    fos.flush();
+    fos.getFD().sync();
     os.close();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index ac29a91e959..aceec04bdc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -52,10 +52,11 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -230,44 +231,52 @@ public class TsFileResource {
   }
 
   public synchronized void serialize() throws IOException {
-    try (OutputStream outputStream =
-        fsFactory.getBufferedOutputStream(file + RESOURCE_SUFFIX + 
TEMP_SUFFIX)) {
-      ReadWriteIOUtils.write(VERSION_NUMBER, outputStream);
-      timeIndex.serialize(outputStream);
-
-      ReadWriteIOUtils.write(maxPlanIndex, outputStream);
-      ReadWriteIOUtils.write(minPlanIndex, outputStream);
-
-      if (modFile != null && modFile.exists()) {
-        String modFileName = new File(modFile.getFilePath()).getName();
-        ReadWriteIOUtils.write(modFileName, outputStream);
-      } else {
-        // make the first "inputStream.available() > 0" in deserialize() happy.
-        //
-        // if modFile not exist, write null (-1). the first 
"inputStream.available() > 0" in
-        // deserialize() and deserializeFromOldFile() detect -1 and 
deserialize modFileName as null
-        // and skip the modFile deserialize.
-        //
-        // this make sure the first and the second "inputStream.available() > 
0" in deserialize()
-        // will always be called... which is a bit ugly but allows the 
following variable
-        // maxProgressIndex to be deserialized correctly.
-        ReadWriteIOUtils.write((String) null, outputStream);
-      }
-
-      if (maxProgressIndex != null) {
-        ReadWriteIOUtils.write(true, outputStream);
-        maxProgressIndex.serialize(outputStream);
-      } else {
-        ReadWriteIOUtils.write(false, outputStream);
-      }
+    FileOutputStream fileOutputStream = new FileOutputStream(file + 
RESOURCE_SUFFIX + TEMP_SUFFIX);
+    BufferedOutputStream outputStream = new 
BufferedOutputStream(fileOutputStream);
+    try {
+      serializeTo(outputStream);
+    } finally {
+      outputStream.flush();
+      fileOutputStream.getFD().sync();
+      outputStream.close();
     }
-
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
     fsFactory.deleteIfExists(dest);
     fsFactory.moveFile(src, dest);
   }
 
+  private void serializeTo(BufferedOutputStream outputStream) throws 
IOException {
+    ReadWriteIOUtils.write(VERSION_NUMBER, outputStream);
+    timeIndex.serialize(outputStream);
+
+    ReadWriteIOUtils.write(maxPlanIndex, outputStream);
+    ReadWriteIOUtils.write(minPlanIndex, outputStream);
+
+    if (modFile != null && modFile.exists()) {
+      String modFileName = new File(modFile.getFilePath()).getName();
+      ReadWriteIOUtils.write(modFileName, outputStream);
+    } else {
+      // make the first "inputStream.available() > 0" in deserialize() happy.
+      //
+      // if modFile not exist, write null (-1). the first 
"inputStream.available() > 0" in
+      // deserialize() and deserializeFromOldFile() detect -1 and deserialize 
modFileName as null
+      // and skip the modFile deserialize.
+      //
+      // this make sure the first and the second "inputStream.available() > 0" 
in deserialize()
+      // will always be called... which is a bit ugly but allows the following 
variable
+      // maxProgressIndex to be deserialized correctly.
+      ReadWriteIOUtils.write((String) null, outputStream);
+    }
+
+    if (maxProgressIndex != null) {
+      ReadWriteIOUtils.write(true, outputStream);
+      maxProgressIndex.serialize(outputStream);
+    } else {
+      ReadWriteIOUtils.write(false, outputStream);
+    }
+  }
+
   /** deserialize from disk */
   public void deserialize() throws IOException {
     try (InputStream inputStream = fsFactory.getBufferedInputStream(file + 
RESOURCE_SUFFIX)) {
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index e69915fabed..af0b19bb9aa 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -97,4 +97,10 @@ public class LocalTsFileOutput extends OutputStream 
implements TsFileOutput {
     outputStream.getChannel().truncate(size);
     position = outputStream.getChannel().position();
   }
+
+  @Override
+  public void force() throws IOException {
+    flush();
+    outputStream.getFD().sync();
+  }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 6a6a29221c2..777f481d568 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -333,8 +333,11 @@ public class TsFileIOWriter implements AutoCloseable {
     // write magic string
     out.write(MAGIC_STRING_BYTES);
 
+    // flush page cache data to disk
+    out.force();
     // close file
     out.close();
+
     if (resourceLogger.isDebugEnabled() && file != null) {
       resourceLogger.debug("{} writer is closed.", file.getName());
     }
@@ -733,4 +736,8 @@ public class TsFileIOWriter implements AutoCloseable {
   public void flush() throws IOException {
     out.flush();
   }
+
+  public TsFileOutput getTsFileOutput() {
+    return this.out;
+  }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5e0b43349ae..5088e0c5344 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -88,4 +88,7 @@ public interface TsFileOutput {
    * @param size size The new size, a non-negative byte count
    */
   void truncate(long size) throws IOException;
+
+  /** Make sure that the data in the buffer is flushed to disk */
+  void force() throws IOException;
 }
diff --git 
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
 
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
index f61541e42b1..0b342206c3d 100644
--- 
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
+++ 
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TestTsFileOutput.java
@@ -67,4 +67,9 @@ public class TestTsFileOutput implements TsFileOutput {
   public void truncate(long size) {
     publicBAOS.truncate((int) size);
   }
+
+  @Override
+  public void force() {
+    // do nothing
+  }
 }

Reply via email to