This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch iotdb39-autoreadrepair
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 52972491945662deaf93def6a4e4d849726240ab
Author: xiangdong huang <[email protected]>
AuthorDate: Sat Mar 16 13:35:44 2019 +0800

    temporary commit
---
 .../iotdb/tsfile/read/TsFileCheckStatus.java       |  28 ++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 165 ++++++++++++++++++++-
 .../write/writer/NativeRestorableIOWriter2.java    |  97 ++++++++++++
 3 files changed, 289 insertions(+), 1 deletion(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java
new file mode 100644
index 0000000..3738b42
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read;
+
+public class TsFileCheckStatus {
+  public static final long COMPLETE_FILE = -1;
+  public static final long ONLY_MAGIC_HEAD = -2;
+  public static final long INCOMPATIBLE_FILE = -3;
+  public static final long FILE_NOT_FOUND = -4;
+
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 72f268d..853fcf6 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -18,16 +18,23 @@
  */
 package org.apache.iotdb.tsfile.read;
 
+import static 
org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
@@ -39,11 +46,12 @@ import 
org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TsFileSequenceReader {
+public class TsFileSequenceReader implements AutoCloseable{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileSequenceReader.class);
 
@@ -427,4 +435,159 @@ public class TsFileSequenceReader {
         .readAsPossible(tsFileInput.wrapAsFileChannel(), target, position, 
length);
   }
 
+  /**
+   * Self Check the file and return the position before where the data is safe.
+   *
+   * @param newMetaData @OUT can not be null, the chunk group metadta in the 
file will be added into
+   * this parameter.  If the file is complete, then this parameter will be not 
modified.
+   * @return the position of the file that is fine. All data after the 
position in the file should
+   * be truncated.
+   */
+  public long selfCheck(List<ChunkGroupMetaData> newMetaData) throws 
IOException {
+    return selfCheck(null, newMetaData);
+  }
+
+  /**
+   * Self Check the file and return the position before where the data is safe.
+   *
+   * @param newSchema @OUT.  the measurement schema in the file will be added 
into
+   * this parameter. If the file is complete, then this parameter will be not 
modified.
+   * @param newMetaData @OUT can not be null, the chunk group metadta in the 
file will be added into
+   * this parameter.  If the file is complete, then this parameter will be not 
modified.
+   * @return the position of the file that is fine. All data after the 
position in the file should
+   * be truncated.
+   */
+  public long selfCheck(Map<String, MeasurementSchema> newSchema,
+      List<ChunkGroupMetaData> newMetaData) throws IOException {
+    File checkFile = new File(this.file);
+    long fileSize;
+    if (!checkFile.exists()) {
+      return TsFileCheckStatus.FILE_NOT_FOUND;
+    } else {
+      fileSize = checkFile.length();
+    }
+    ChunkMetaData currentChunk;
+    String measurementID;
+    TSDataType dataType;
+    long fileOffsetOfChunk;
+    long startTimeOfChunk = 0;
+    long endTimeOfChunk = 0;
+    long numOfPoints = 0;
+
+    ChunkGroupMetaData currentChunkGroup;
+    List<ChunkMetaData> chunks = null;
+    String deviceID;
+    long startOffsetOfChunkGroup = 0;
+    long endOffsetOfChunkGroup;
+    long versionOfChunkGroup = 0;
+    boolean haveReadAnUnverifiedGroupFooter = false;
+    boolean newGroup = true;
+
+    if (fileSize < TSFileConfig.MAGIC_STRING.length()) {
+      return TsFileCheckStatus.INCOMPATIBLE_FILE;
+    }
+    String magic = readHeadMagic(true);
+    if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+      return TsFileCheckStatus.INCOMPATIBLE_FILE;
+    }
+
+    if (fileSize == TSFileConfig.MAGIC_STRING.length()) {
+      return TsFileCheckStatus.ONLY_MAGIC_HEAD;
+    } else if (readTailMagic().equals(magic)) {
+      return TsFileCheckStatus.COMPLETE_FILE;
+    }
+
+    // not a complete file, we will recover it...
+    long truncatedPosition = magicStringBytes.length;
+    boolean goon = true;
+    byte marker;
+    try {
+      while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            //this is a chunk.
+            if (haveReadAnUnverifiedGroupFooter) {
+              //now we are sure that the last ChunkGroupFooter is complete.
+              haveReadAnUnverifiedGroupFooter = false;
+              truncatedPosition = this.position() - 1;
+              newGroup = true;
+            }
+            if (newGroup) {
+              chunks = new ArrayList<>();
+              startOffsetOfChunkGroup = this.position() - 1;
+              newGroup = false;
+            }
+            //if there is something wrong with a chunk, we will drop this part 
of data
+            // (the whole ChunkGroup)
+            ChunkHeader header = this.readChunkHeader();
+            measurementID = header.getMeasurementID();
+            if (newSchema != null) {
+              newSchema.putIfAbsent(measurementID,
+                  new MeasurementSchema(measurementID, header.getDataType(),
+                      header.getEncodingType(), header.getCompressionType()));
+            }
+            dataType = header.getDataType();
+            fileOffsetOfChunk = this.position() - 1;
+            if (header.getNumOfPages() > 0) {
+              PageHeader pageHeader = 
this.readPageHeader(header.getDataType());
+              numOfPoints += pageHeader.getNumOfValues();
+              startTimeOfChunk = pageHeader.getMinTimestamp();
+              endTimeOfChunk = pageHeader.getMaxTimestamp();
+              this.skipPageData(pageHeader);
+            }
+            for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+              //a new Page
+              PageHeader pageHeader = 
this.readPageHeader(header.getDataType());
+              this.skipPageData(pageHeader);
+            }
+            if (header.getNumOfPages() > 1) {
+              PageHeader pageHeader = 
this.readPageHeader(header.getDataType());
+              endTimeOfChunk = pageHeader.getMaxTimestamp();
+              this.skipPageData(pageHeader);
+            }
+            currentChunk = new ChunkMetaData(measurementID, dataType, 
fileOffsetOfChunk,
+                startTimeOfChunk, endTimeOfChunk);
+            currentChunk.setNumOfPoints(numOfPoints);
+            chunks.add(currentChunk);
+            numOfPoints = 0;
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            //this is a chunk group
+            //if there is something wrong with the chunkGroup Footer, we will 
drop this part of data
+            //because we can not guarantee the correction of the deviceId.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            deviceID = chunkGroupFooter.getDeviceID();
+            endOffsetOfChunkGroup = this.position();
+            currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks, 
startOffsetOfChunkGroup);
+            currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+            currentChunkGroup.setVersion(versionOfChunkGroup++);
+            newMetaData.add(currentChunkGroup);
+            // though we have read the current ChunkMetaData from Disk, it may 
be incomplete.
+            // because if the file only loses one byte, the 
ChunkMetaData.deserialize() returns ok,
+            // while the last filed of the ChunkMetaData is incorrect.
+            // So, only reading the next MASK, can make sure that this 
ChunkMetaData is complete.
+            haveReadAnUnverifiedGroupFooter = true;
+            break;
+
+          default:
+            // it is impossible that we read an incorrect data.
+            MetaMarker.handleUnexpectedMarker(marker);
+            goon = false;
+        }
+      }
+      //now we read the tail of the file, so we are sure that the last 
ChunkGroupFooter is complete.
+      truncatedPosition = this.position() - 1;
+    } catch (IOException e2) {
+      //if it is the end of the file, and we read an unverifiedGroupFooter, we 
must remove this ChunkGroup
+      if (haveReadAnUnverifiedGroupFooter && !newMetaData.isEmpty()) {
+        newMetaData.remove(newMetaData.size() - 1);
+      }
+    } finally {
+      //something wrong or all data is complete. We will discard current 
FileMetadata
+      // so that we can continue to write data into this tsfile.
+      return truncatedPosition;
+    }
+  }
+
+
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java
new file mode 100644
index 0000000..b26047f
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * a restorable tsfile which do not depend on a restore file.
+ */
+public class NativeRestorableIOWriter2 extends TsFileIOWriter {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(NativeRestorableIOWriter2.class);
+
+  private long truncatedPosition = -1;
+  private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
+
+  long getTruncatedPosition() {
+    return truncatedPosition;
+  }
+
+  public NativeRestorableIOWriter2(File file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * @param file a given tsfile path you want to (continue to) write
+   * @param append whether append to write data in this file
+   * @throws IOException if write failed, or the file is broken but 
autoRepair==false.
+   */
+  public NativeRestorableIOWriter2(File file, boolean append) throws 
IOException {
+    super(file, true);
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath())){
+      truncatedPosition = reader.selfCheck(knownSchemas, 
chunkGroupMetaDataList);
+      if (truncatedPosition == TsFileCheckStatus.FILE_NOT_FOUND) {
+        //it is ok.. because this is a new file.
+      } else if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE) {
+        if (!append) {
+          throw new IOException(String
+              .format("%s is a complete file but not in the append mode.", 
file.getAbsolutePath()));
+        } else {
+          //TODO remove filemetadata and then keep to write..
+        }
+      } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
+        if (!append) {
+          throw new IOException(String
+              .format("%s is not complete and has nothing valuable data.", 
file.getAbsolutePath()));
+        } else {
+          //TODO keep to write
+        }
+      } else if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+        throw new IOException(String.format("%s is not in TsFile format.", 
file.getAbsolutePath()));
+      } else {
+        out.truncate(truncatedPosition);
+      }
+    }
+  }
+
+  @Override
+  public Map<String, MeasurementSchema> getKnownSchema() {
+    return knownSchemas;
+  }
+}

Reply via email to