This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 463b99c  [IOTDB-36]Enable recover data from a incomplete TsFile and 
continue to write (#87)
463b99c is described below

commit 463b99c21a7262bc7e403b83b54e5582373db68a
Author: Xiangdong Huang <[email protected]>
AuthorDate: Sat Mar 9 14:37:26 2019 +0800

    [IOTDB-36]Enable recover data from a incomplete TsFile and continue to 
write (#87)
    
    * add a restorable tsfile writer without restore file
---
 .../bufferwrite/RestorableTsFileIOWriterTest.java  |   1 +
 .../iotdb/tsfile/common/conf/TSFileDescriptor.java |   4 +
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  50 +++-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  40 ++-
 .../apache/iotdb/tsfile/write/record/TSRecord.java |   3 +-
 .../iotdb/tsfile/write/schema/FileSchema.java      |   2 +-
 .../tsfile/write/writer/DefaultTsFileOutput.java   |   6 +-
 .../write/writer/NativeRestorableIOWriter.java     | 206 ++++++++++++++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  28 ++
 .../iotdb/tsfile/write/TsFileReadWriteTest.java    |  16 +-
 .../write/writer/NativeRestorableIOWriterTest.java | 301 +++++++++++++++++++++
 11 files changed, 638 insertions(+), 19 deletions(-)

diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 3cdc9f1..f2c393a 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -51,6 +51,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 public class RestorableTsFileIOWriterTest {
 
   private RestorableTsFileIOWriter writer;
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index 83852e0..b4189fa 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -113,6 +113,10 @@ public class TSFileDescriptor {
       TSFileConfig.pageSizeInByte = Integer
           .parseInt(properties
               .getProperty("page_size_in_byte", 
Integer.toString(TSFileConfig.pageSizeInByte)));
+      if (TSFileConfig.pageSizeInByte > TSFileConfig.groupSizeInByte) {
+        LOGGER.warn("page_size is greater than group size, will set it as the 
same with group size");
+        TSFileConfig.pageSizeInByte = TSFileConfig.groupSizeInByte;
+      }
       TSFileConfig.maxNumberOfPointsInPage = Integer.parseInt(
           properties
               .getProperty("max_number_of_points_in_page",
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index b66fe38..be14bf9 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -122,8 +122,23 @@ public class TsFileSequenceReader {
    * this function does not modify the position of the file reader.
    */
   public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   *
+   * @param movePosition whether move the position of the file reader after 
reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
     ByteBuffer magicStringBytes = 
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
-    tsFileInput.read(magicStringBytes, 0);
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
     magicStringBytes.flip();
     return new String(magicStringBytes.array());
   }
@@ -267,6 +282,22 @@ public class TsFileSequenceReader {
     return tsFileInput.position();
   }
 
+  public void skipPageData(PageHeader header) throws IOException {
+    tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
+  }
+
+  /**
+   *
+   * @param header
+   * @param position
+   * @return
+   * @throws IOException
+   */
+  public long skipPageData(PageHeader header, long position) throws 
IOException {
+    return position + header.getCompressedSize();
+  }
+
+
   public ByteBuffer readPage(PageHeader header, CompressionType type) throws 
IOException {
     return readPage(header, type, -1);
   }
@@ -293,7 +324,9 @@ public class TsFileSequenceReader {
    */
   public byte readMarker() throws IOException {
     markerBuffer.clear();
-    ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), 
markerBuffer);
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), 
markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
     markerBuffer.flip();
     return markerBuffer.get();
   }
@@ -310,6 +343,10 @@ public class TsFileSequenceReader {
     return this.file;
   }
 
+  public long fileSize() throws IOException {
+    return tsFileInput.size();
+  }
+
   /**
    * read data from tsFileInput, from the current position (if position = -1), 
or the given
    * position. <br> if position = -1, the tsFileInput's position will be 
changed to the current
@@ -324,9 +361,13 @@ public class TsFileSequenceReader {
   private ByteBuffer readData(long position, int size) throws IOException {
     ByteBuffer buffer = ByteBuffer.allocate(size);
     if (position == -1) {
-      ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer);
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), 
buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
     } else {
-      ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer, 
position, size);
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), 
buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
     }
     buffer.flip();
     return buffer;
@@ -339,4 +380,5 @@ public class TsFileSequenceReader {
     return ReadWriteIOUtils
         .readAsPossible(tsFileInput.wrapAsFileChannel(), target, position, 
length);
   }
+
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 0a291db..497f67d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.write.chunk.ChunkGroupWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -91,6 +91,15 @@ public class TsFileWriter {
   /**
    * init this TsFileWriter.
    *
+   * @param fileWriter the io writer of this TsFile
+   */
+  public TsFileWriter(TsFileIOWriter fileWriter) throws IOException {
+    this(fileWriter, new FileSchema(), 
TSFileDescriptor.getInstance().getConfig());
+  }
+
+  /**
+   * init this TsFileWriter.
+   *
    * @param file the File to be written by this TsFileWriter
    * @param schema the schema of this TsFile
    */
@@ -126,11 +135,22 @@ public class TsFileWriter {
    * @param schema the schema of this TsFile
    * @param conf the configuration of this TsFile
    */
-  protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema, 
TSFileConfig conf) {
+  protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema, 
TSFileConfig conf)
+      throws IOException {
+    if (!fileWriter.canWrite()) {
+      throw new IOException(
+          "the given file Writer does not support writing any more. Maybe it 
is an complete TsFile");
+    }
     this.fileWriter = fileWriter;
     this.schema = schema;
+    this.schema.registerMeasurements(fileWriter.getKnownSchema());
     this.pageSize = TSFileConfig.pageSizeInByte;
     this.chunkGroupSizeThreshold = TSFileConfig.groupSizeInByte;
+    if (this.pageSize >= chunkGroupSizeThreshold) {
+      LOG.warn(
+          "TsFile's page size {} is greater than chunk group size {}, please 
enlarge the chunk group"
+              + " size or decrease page size. ", pageSize, 
chunkGroupSizeThreshold);
+    }
   }
 
   /**
@@ -290,4 +310,20 @@ public class TsFileWriter {
     flushAllChunkGroups();
     fileWriter.endFile(this.schema);
   }
+
+  /**
+   * this function is only for Test.
+   * @return
+   */
+   public TsFileIOWriter getIOWriter() {
+    return this.fileWriter;
+  }
+
+  /**
+   * this function is only for Test
+   * @throws IOException
+   */
+  public void flushForTest() throws IOException {
+    flushAllChunkGroups();
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
index b30a483..9ce98a7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
@@ -64,8 +64,9 @@ public class TSRecord {
    *
    * @param tuple data point to be added
    */
-  public void addTuple(DataPoint tuple) {
+  public TSRecord addTuple(DataPoint tuple) {
     this.dataPointList.add(tuple);
+    return this;
   }
 
   /**
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
index 349fa50..2aa4d4a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
@@ -104,7 +104,7 @@ public class FileSchema {
   /**
    * register all measurementSchemas in measurements.
    */
-  private void registerMeasurements(Map<String, MeasurementSchema> 
measurements) {
+  public void registerMeasurements(Map<String, MeasurementSchema> 
measurements) {
     measurements.forEach((id, md) -> registerMeasurement(md));
   }
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
index 4ea8be8..0ab5e21 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
@@ -31,12 +31,16 @@ import java.nio.ByteBuffer;
  */
 public class DefaultTsFileOutput implements TsFileOutput {
 
-  FileOutputStream outputStream;
+  private FileOutputStream outputStream;
 
   public DefaultTsFileOutput(File file) throws FileNotFoundException {
     this.outputStream = new FileOutputStream(file);
   }
 
+  public DefaultTsFileOutput(File file, boolean append) throws 
FileNotFoundException {
+    this.outputStream = new FileOutputStream(file, append);
+  }
+
   public DefaultTsFileOutput(FileOutputStream outputStream) {
     this.outputStream = outputStream;
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
new file mode 100644
index 0000000..a63ca84
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * a restorable tsfile which do not depend on a restore file.
+ */
+public class NativeRestorableIOWriter extends TsFileIOWriter {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(NativeRestorableIOWriter.class);
+
+  private long truncatedPosition = -1;
+  private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
+
+  long getTruncatedPosition() {
+    return truncatedPosition;
+  }
+
+  public NativeRestorableIOWriter(File file) throws IOException {
+    super();
+    long fileSize;
+    if (!file.exists()) {
+      this.out = new DefaultTsFileOutput(file, true);
+      startFile();
+      return;
+    } else {
+      fileSize = file.length();
+      this.out = new DefaultTsFileOutput(file, true);
+    }
+
+    //we need to read data to recover TsFileIOWriter.chunkGroupMetaDataList
+    //and remove broken data if exists.
+
+    ChunkMetaData currentChunk;
+    String measurementID;
+    TSDataType dataType;
+    long fileOffsetOfChunk;
+    long startTimeOfChunk = 0;
+    long endTimeOfChunk = 0;
+    long numOfPoints = 0;
+
+    ChunkGroupMetaData currentChunkGroup;
+    List<ChunkMetaData> chunks = null;
+    String deviceID;
+    long startOffsetOfChunkGroup = 0;
+    long endOffsetOfChunkGroup;
+    long versionOfChunkGroup = 0;
+    boolean haveReadAnUnverifiedGroupFooter = false;
+    boolean newGroup = true;
+
+    TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath(), false);
+    if (fileSize <= magicStringBytes.length) {
+      LOGGER.debug("{} only has magic header, does not worth to recover.", 
file.getAbsolutePath());
+      reader.close();
+      this.out.truncate(0);
+      startFile();
+      truncatedPosition = magicStringBytes.length;
+      return;
+    }
+    String magic = reader.readHeadMagic(true);
+    if (!magic.equals(new String(magicStringBytes))) {
+      throw new IOException(String
+          .format("%s is not using TsFile format, and will be ignored...", 
file.getAbsolutePath()));
+    }
+    if (reader.readTailMagic().equals(magic)) {
+      LOGGER.debug("{} is an complete TsFile.", file.getAbsolutePath());
+      canWrite = false;
+      reader.close();
+      out.close();
+      return;
+    }
+
+    // not a complete file, we will recover it...
+    truncatedPosition = magicStringBytes.length;
+    boolean goon = true;
+    byte marker;
+    try {
+      while (goon && (marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            //this is a chunk.
+            if (haveReadAnUnverifiedGroupFooter) {
+              //now we are sure that the last ChunkGroupFooter is complete.
+              haveReadAnUnverifiedGroupFooter = false;
+              truncatedPosition = reader.position() - 1;
+              newGroup = true;
+            }
+            if (newGroup) {
+              chunks = new ArrayList<>();
+              startOffsetOfChunkGroup = reader.position() - 1;
+              newGroup = false;
+            }
+            //if there is something wrong with a chunk, we will drop this part 
of data
+            // (the whole ChunkGroup)
+            ChunkHeader header = reader.readChunkHeader();
+            measurementID = header.getMeasurementID();
+            knownSchemas.putIfAbsent(measurementID,
+                new MeasurementSchema(measurementID, header.getDataType(),
+                    header.getEncodingType(), header.getCompressionType()));
+            dataType = header.getDataType();
+            fileOffsetOfChunk = reader.position() - 1;
+            if (header.getNumOfPages() > 0) {
+              PageHeader pageHeader = 
reader.readPageHeader(header.getDataType());
+              numOfPoints += pageHeader.getNumOfValues();
+              startTimeOfChunk = pageHeader.getMinTimestamp();
+              endTimeOfChunk = pageHeader.getMaxTimestamp();
+              reader.skipPageData(pageHeader);
+            }
+            for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+              //a new Page
+              PageHeader pageHeader = 
reader.readPageHeader(header.getDataType());
+              reader.skipPageData(pageHeader);
+            }
+            if (header.getNumOfPages() > 1) {
+              PageHeader pageHeader = 
reader.readPageHeader(header.getDataType());
+              endTimeOfChunk = pageHeader.getMaxTimestamp();
+              reader.skipPageData(pageHeader);
+            }
+            currentChunk = new ChunkMetaData(measurementID, dataType, 
fileOffsetOfChunk,
+                startTimeOfChunk, endTimeOfChunk);
+            currentChunk.setNumOfPoints(numOfPoints);
+            chunks.add(currentChunk);
+            numOfPoints = 0;
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            //this is a chunk group
+            //if there is something wrong with the chunkGroup Footer, we will 
drop this part of data
+            //because we can not guarantee the correction of the deviceId.
+            ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
+            deviceID = chunkGroupFooter.getDeviceID();
+            endOffsetOfChunkGroup = reader.position();
+            currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks, 
startOffsetOfChunkGroup);
+            currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+            currentChunkGroup.setVersion(versionOfChunkGroup++);
+            chunkGroupMetaDataList.add(currentChunkGroup);
+            // though we have read the current ChunkMetaData from Disk, it may 
be incomplete.
+            // because if the file only loses one byte, the 
ChunkMetaData.deserialize() returns ok,
+            // while the last filed of the ChunkMetaData is incorrect.
+            // So, only reading the next MASK, can make sure that this 
ChunkMetaData is complete.
+            haveReadAnUnverifiedGroupFooter = true;
+            break;
+
+          default:
+            // it is impossible that we read an incorrect data.
+            MetaMarker.handleUnexpectedMarker(marker);
+            goon = false;
+        }
+      }
+      //now we read the tail of the file, so we are sure that the last 
ChunkGroupFooter is complete.
+      truncatedPosition = reader.position() - 1;
+    } catch (IOException e2) {
+      //if it is the end of the file, and we read an unverifiedGroupFooter, we 
must remove this ChunkGroup
+      if (haveReadAnUnverifiedGroupFooter && 
!chunkGroupMetaDataList.isEmpty()) {
+        chunkGroupMetaDataList.remove(chunkGroupMetaDataList.size() - 1);
+      }
+    } finally {
+      //something wrong or all data is complete. We will discard current 
FileMetadata
+      // so that we can continue to write data into this tsfile.
+      LOGGER.info("File {} has {} bytes, and will be truncated from {}.",
+          file.getAbsolutePath(), file.length(), truncatedPosition);
+      out.truncate(truncatedPosition);
+      reader.close();
+    }
+  }
+
+  @Override
+  public Map<String, MeasurementSchema> getKnownSchema() {
+    return knownSchemas;
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 02f9fee..c339f8a 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,7 @@ public class TsFileIOWriter {
   protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new 
ArrayList<>();
   private ChunkGroupMetaData currentChunkGroupMetaData;
   private ChunkMetaData currentChunkMetaData;
+  protected boolean canWrite = true;
 
   /**
    * empty construct function.
@@ -236,6 +238,7 @@ public class TsFileIOWriter {
 
     // close file
     out.close();
+    canWrite = false;
     LOG.info("output stream is closed");
   }
 
@@ -315,4 +318,29 @@ public class TsFileIOWriter {
     return chunkGroupMetaDataList;
   }
 
+  public boolean canWrite() {
+    return canWrite;
+  }
+
+  /**
+   * close the inputstream or file channel in force. This is just used for 
Testing.
+   */
+  void forceClose() throws IOException {
+    out.close();
+  }
+
+  void writeSeparatorMaskForTest() throws IOException {
+    out.write(new byte[]{MetaMarker.SEPARATOR});
+  }
+  void writeChunkMaskForTest() throws IOException {
+    out.write(new byte[]{MetaMarker.CHUNK_HEADER});
+  }
+
+  /**
+   * @return all Schema that this ioWriter know. By default implementation 
(TsFileIOWriter.class),
+   * it is empty
+   */
+  public Map<String, MeasurementSchema> getKnownSchema() {
+    return Collections.emptyMap();
+  }
 }
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
index 3cb2a84..f5e76fc 100644
--- 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
+++ 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
@@ -50,28 +50,27 @@ public class TsFileReadWriteTest {
   private final double delta = 0.0000001;
   private String path = "read_write_rle.tsfile";
   private File f;
-  private TsFileWriter tsFileWriter;
 
   @Before
   public void setUp() throws Exception {
     f = new File(path);
     if (f.exists()) {
-      f.delete();
+      assertTrue(f.delete());
     }
-    tsFileWriter = new TsFileWriter(f);
   }
 
   @After
   public void tearDown() throws Exception {
     f = new File(path);
     if (f.exists()) {
-      f.delete();
+      assertTrue(f.delete());;
     }
   }
 
   @Test
   public void intTest() throws IOException, WriteProcessException {
     int floatCount = 1024 * 1024 * 13 + 1023;
+    TsFileWriter tsFileWriter = new TsFileWriter(f);
     // add measurements into file schema
     tsFileWriter
         .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, 
TSEncoding.RLE));
@@ -110,6 +109,7 @@ public class TsFileReadWriteTest {
   public void longTest() throws IOException, WriteProcessException {
     int floatCount = 1024 * 1024 * 13 + 1023;
     // add measurements into file schema
+    TsFileWriter tsFileWriter = new TsFileWriter(f);
     tsFileWriter
         .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, 
TSEncoding.RLE));
     for (long i = 1; i < floatCount; i++) {
@@ -147,6 +147,7 @@ public class TsFileReadWriteTest {
   public void floatTest() throws IOException, WriteProcessException {
     int floatCount = 1024 * 1024 * 13 + 1023;
     // add measurements into file schema
+    TsFileWriter tsFileWriter = new TsFileWriter(f);
     tsFileWriter
         .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, 
TSEncoding.RLE));
     for (long i = 1; i < floatCount; i++) {
@@ -185,6 +186,7 @@ public class TsFileReadWriteTest {
   public void doubleTest() throws IOException, WriteProcessException {
     int floatCount = 1024 * 1024 * 13 + 1023;
     // add measurements into file schema
+    TsFileWriter tsFileWriter = new TsFileWriter(f);
     tsFileWriter
         .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.DOUBLE, 
TSEncoding.RLE));
     for (long i = 1; i < floatCount; i++) {
@@ -220,13 +222,7 @@ public class TsFileReadWriteTest {
 
   @Test
   public void readEmptyMeasurementTest() throws IOException, 
WriteProcessException {
-    String path = "test.tsfile";
-    File f = new File(path);
-    if (f.exists()) {
-      assertTrue(f.delete());
-    }
     TsFileWriter tsFileWriter = new TsFileWriter(f);
-
     // add measurements into file schema
     tsFileWriter
         .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, 
TSEncoding.RLE));
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
new file mode 100644
index 0000000..370c526
--- /dev/null
+++ 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NativeRestorableIOWriterTest {
+
+  private static final String FILE_NAME = "test.ts";
+
+  @Test
+  public void testOnlyHeadMagic() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+
+  }
+
+  @Test
+  public void testOnlyFirstMask() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    //we have to flush using inner API.
+    writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
+    writer.getIOWriter().forceClose();
+    assertEquals(TsFileIOWriter.magicStringBytes.length + 1, file.length());
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testOnlyOneIncompleteChunkHeader() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+
+    ChunkHeader header = new ChunkHeader("s1", 100, TSDataType.FLOAT, 
CompressionType.SNAPPY,
+        TSEncoding.PLAIN, 5);
+    ByteBuffer buffer = ByteBuffer.allocate(header.getSerializedSize());
+    header.serializeTo(buffer);
+    buffer.flip();
+    byte[] data = new byte[3];
+    buffer.get(data, 0, 3);
+    writer.getIOWriter().out.write(data);
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testOnlyOneChunkHeader() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.getIOWriter()
+        .startFlushChunk(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.PLAIN),
+            CompressionType.SNAPPY, TSDataType.FLOAT,
+            TSEncoding.PLAIN, new FloatStatistics(), 100, 50, 100, 10);
+    writer.getIOWriter().forceClose();
+
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    long pos = writer.getIOWriter().getPos();
+    //let's delete one byte.
+    writer.getIOWriter().out.truncate(pos - 1);
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+  }
+
+
+  @Test
+  public void testOnlyOneChunkGroup() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testOnlyOneChunkGroupAndOneMask() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    writer.getIOWriter().writeChunkMaskForTest();
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    assertNotEquals(TsFileIOWriter.magicStringBytes.length, 
rWriter.getTruncatedPosition());
+    TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+    TsDeviceMetadataIndex index = 
reader.readFileMetadata().getDeviceMap().get("d1");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    reader.close();
+    assertTrue(file.delete());
+  }
+
+
+  @Test
+  public void testTwoChunkGroupAndMore() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+
+    writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+    TsDeviceMetadataIndex index = 
reader.readFileMetadata().getDeviceMap().get("d1");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    reader.close();
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testNoSeperatorMask() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+
+    writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    writer.getIOWriter().writeSeparatorMaskForTest();
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+    TsDeviceMetadataIndex index = 
reader.readFileMetadata().getDeviceMap().get("d1");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    index = reader.readFileMetadata().getDeviceMap().get("d2");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    reader.close();
+    assertTrue(file.delete());
+  }
+
+
+  @Test
+  public void testHavingSomeFileMetadata() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+
+    writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.flushForTest();
+    writer.getIOWriter().writeSeparatorMaskForTest();
+    writer.getIOWriter().writeSeparatorMaskForTest();
+    writer.getIOWriter().forceClose();
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    writer = new TsFileWriter(rWriter);
+    writer.close();
+    TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+    TsDeviceMetadataIndex index = 
reader.readFileMetadata().getDeviceMap().get("d1");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    index = reader.readFileMetadata().getDeviceMap().get("d2");
+    assertEquals(1, 
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+    reader.close();
+    assertTrue(file.delete());
+  }
+
+  @Test
+  public void testOpenCompleteFile() throws Exception {
+    File file = new File(FILE_NAME);
+    TsFileWriter writer = new TsFileWriter(file);
+    writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, 
TSEncoding.RLE));
+    writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+        .addTuple(new FloatDataPoint("s2", 4)));
+    writer.close();
+
+    NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+    assertFalse(rWriter.canWrite());
+    assertTrue(file.delete());
+  }
+}
\ No newline at end of file

Reply via email to