This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch iotdb39-autoreadrepair in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 52972491945662deaf93def6a4e4d849726240ab Author: xiangdong huang <[email protected]> AuthorDate: Sat Mar 16 13:35:44 2019 +0800 temporary commit --- .../iotdb/tsfile/read/TsFileCheckStatus.java | 28 ++++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 165 ++++++++++++++++++++- .../write/writer/NativeRestorableIOWriter2.java | 97 ++++++++++++ 3 files changed, 289 insertions(+), 1 deletion(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java new file mode 100644 index 0000000..3738b42 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.read; + +public class TsFileCheckStatus { + public static final long COMPLETE_FILE = -1; + public static final long ONLY_MAGIC_HEAD = -2; + public static final long INCOMPATIBLE_FILE = -3; + public static final long FILE_NOT_FOUND = -4; + +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 72f268d..853fcf6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -18,16 +18,23 @@ */ package org.apache.iotdb.tsfile.read; +import static org.apache.iotdb.tsfile.write.writer.TsFileIOWriter.magicStringBytes; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.MetaMarker; import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; @@ -39,11 +46,12 @@ import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput; import org.apache.iotdb.tsfile.read.reader.TsFileInput; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TsFileSequenceReader { +public class TsFileSequenceReader implements AutoCloseable{ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSequenceReader.class); @@ -427,4 +435,159 @@ public class TsFileSequenceReader { .readAsPossible(tsFileInput.wrapAsFileChannel(), target, position, length); } + /** + * Self Check the file and return the position before where the data is safe. + * + * @param newMetaData @OUT can not be null, the chunk group metadta in the file will be added into + * this parameter. If the file is complete, then this parameter will be not modified. + * @return the position of the file that is fine. All data after the position in the file should + * be truncated. + */ + public long selfCheck(List<ChunkGroupMetaData> newMetaData) throws IOException { + return selfCheck(null, newMetaData); + } + + /** + * Self Check the file and return the position before where the data is safe. + * + * @param newSchema @OUT. the measurement schema in the file will be added into + * this parameter. If the file is complete, then this parameter will be not modified. + * @param newMetaData @OUT can not be null, the chunk group metadta in the file will be added into + * this parameter. If the file is complete, then this parameter will be not modified. + * @return the position of the file that is fine. All data after the position in the file should + * be truncated. + */ + public long selfCheck(Map<String, MeasurementSchema> newSchema, + List<ChunkGroupMetaData> newMetaData) throws IOException { + File checkFile = new File(this.file); + long fileSize; + if (!checkFile.exists()) { + return TsFileCheckStatus.FILE_NOT_FOUND; + } else { + fileSize = checkFile.length(); + } + ChunkMetaData currentChunk; + String measurementID; + TSDataType dataType; + long fileOffsetOfChunk; + long startTimeOfChunk = 0; + long endTimeOfChunk = 0; + long numOfPoints = 0; + + ChunkGroupMetaData currentChunkGroup; + List<ChunkMetaData> chunks = null; + String deviceID; + long startOffsetOfChunkGroup = 0; + long endOffsetOfChunkGroup; + long versionOfChunkGroup = 0; + boolean haveReadAnUnverifiedGroupFooter = false; + boolean newGroup = true; + + if (fileSize < TSFileConfig.MAGIC_STRING.length()) { + return TsFileCheckStatus.INCOMPATIBLE_FILE; + } + String magic = readHeadMagic(true); + if (!magic.equals(TSFileConfig.MAGIC_STRING)) { + return TsFileCheckStatus.INCOMPATIBLE_FILE; + } + + if (fileSize == TSFileConfig.MAGIC_STRING.length()) { + return TsFileCheckStatus.ONLY_MAGIC_HEAD; + } else if (readTailMagic().equals(magic)) { + return TsFileCheckStatus.COMPLETE_FILE; + } + + // not a complete file, we will recover it... + long truncatedPosition = magicStringBytes.length; + boolean goon = true; + byte marker; + try { + while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + //this is a chunk. + if (haveReadAnUnverifiedGroupFooter) { + //now we are sure that the last ChunkGroupFooter is complete. + haveReadAnUnverifiedGroupFooter = false; + truncatedPosition = this.position() - 1; + newGroup = true; + } + if (newGroup) { + chunks = new ArrayList<>(); + startOffsetOfChunkGroup = this.position() - 1; + newGroup = false; + } + //if there is something wrong with a chunk, we will drop this part of data + // (the whole ChunkGroup) + ChunkHeader header = this.readChunkHeader(); + measurementID = header.getMeasurementID(); + if (newSchema != null) { + newSchema.putIfAbsent(measurementID, + new MeasurementSchema(measurementID, header.getDataType(), + header.getEncodingType(), header.getCompressionType())); + } + dataType = header.getDataType(); + fileOffsetOfChunk = this.position() - 1; + if (header.getNumOfPages() > 0) { + PageHeader pageHeader = this.readPageHeader(header.getDataType()); + numOfPoints += pageHeader.getNumOfValues(); + startTimeOfChunk = pageHeader.getMinTimestamp(); + endTimeOfChunk = pageHeader.getMaxTimestamp(); + this.skipPageData(pageHeader); + } + for (int j = 1; j < header.getNumOfPages() - 1; j++) { + //a new Page + PageHeader pageHeader = this.readPageHeader(header.getDataType()); + this.skipPageData(pageHeader); + } + if (header.getNumOfPages() > 1) { + PageHeader pageHeader = this.readPageHeader(header.getDataType()); + endTimeOfChunk = pageHeader.getMaxTimestamp(); + this.skipPageData(pageHeader); + } + currentChunk = new ChunkMetaData(measurementID, dataType, fileOffsetOfChunk, + startTimeOfChunk, endTimeOfChunk); + currentChunk.setNumOfPoints(numOfPoints); + chunks.add(currentChunk); + numOfPoints = 0; + break; + case MetaMarker.CHUNK_GROUP_FOOTER: + //this is a chunk group + //if there is something wrong with the chunkGroup Footer, we will drop this part of data + //because we can not guarantee the correction of the deviceId. + ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter(); + deviceID = chunkGroupFooter.getDeviceID(); + endOffsetOfChunkGroup = this.position(); + currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks, startOffsetOfChunkGroup); + currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup); + currentChunkGroup.setVersion(versionOfChunkGroup++); + newMetaData.add(currentChunkGroup); + // though we have read the current ChunkMetaData from Disk, it may be incomplete. + // because if the file only loses one byte, the ChunkMetaData.deserialize() returns ok, + // while the last filed of the ChunkMetaData is incorrect. + // So, only reading the next MASK, can make sure that this ChunkMetaData is complete. + haveReadAnUnverifiedGroupFooter = true; + break; + + default: + // it is impossible that we read an incorrect data. + MetaMarker.handleUnexpectedMarker(marker); + goon = false; + } + } + //now we read the tail of the file, so we are sure that the last ChunkGroupFooter is complete. + truncatedPosition = this.position() - 1; + } catch (IOException e2) { + //if it is the end of the file, and we read an unverifiedGroupFooter, we must remove this ChunkGroup + if (haveReadAnUnverifiedGroupFooter && !newMetaData.isEmpty()) { + newMetaData.remove(newMetaData.size() - 1); + } + } finally { + //something wrong or all data is complete. We will discard current FileMetadata + // so that we can continue to write data into this tsfile. + return truncatedPosition; + } + } + + } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java new file mode 100644 index 0000000..b26047f --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter2.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileCheckStatus; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * a restorable tsfile which do not depend on a restore file. + */ +public class NativeRestorableIOWriter2 extends TsFileIOWriter { + + private static final Logger LOGGER = LoggerFactory + .getLogger(NativeRestorableIOWriter2.class); + + private long truncatedPosition = -1; + private Map<String, MeasurementSchema> knownSchemas = new HashMap<>(); + + long getTruncatedPosition() { + return truncatedPosition; + } + + public NativeRestorableIOWriter2(File file) throws IOException { + this(file, true); + } + + /** + * @param file a given tsfile path you want to (continue to) write + * @param append whether append to write data in this file + * @throws IOException if write failed, or the file is broken but autoRepair==false. + */ + public NativeRestorableIOWriter2(File file, boolean append) throws IOException { + super(file, true); + try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())){ + truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList); + if (truncatedPosition == TsFileCheckStatus.FILE_NOT_FOUND) { + //it is ok.. because this is a new file. + } else if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE) { + if (!append) { + throw new IOException(String + .format("%s is a complete file but not in the append mode.", file.getAbsolutePath())); + } else { + //TODO remove filemetadata and then keep to write.. + } + } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) { + if (!append) { + throw new IOException(String + .format("%s is not complete and has nothing valuable data.", file.getAbsolutePath())); + } else { + //TODO keep to write + } + } else if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) { + throw new IOException(String.format("%s is not in TsFile format.", file.getAbsolutePath())); + } else { + out.truncate(truncatedPosition); + } + } + } + + @Override + public Map<String, MeasurementSchema> getKnownSchema() { + return knownSchemas; + } +}
