This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9f24a93ecb374dcf6c252ea352ba6999c6134e5f Author: JackieTien97 <[email protected]> AuthorDate: Thu Jul 23 15:24:02 2020 +0800 improve recover process --- .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 4 +- .../writelog/recover/TsFileRecoverPerformer.java | 12 ++-- .../iotdb/tsfile/read/TsFileCheckStatus.java | 1 - .../iotdb/tsfile/read/TsFileSequenceReader.java | 79 +++++++++++----------- .../tsfile/write/writer/LocalTsFileOutput.java | 4 +- .../write/writer/RestorableTsFileIOWriter.java | 28 +++----- .../iotdb/tsfile/write/writer/TsFileOutput.java | 4 +- .../write/writer/RestorableTsFileIOWriterTest.java | 16 +++-- 8 files changed, 69 insertions(+), 79 deletions(-) diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java index 8693eb2..aec1a58 100644 --- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java +++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java @@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput { } @Override - public void truncate(long position) throws IOException { + public void truncate(long size) throws IOException { if (fs.exists(path)) { fsDataOutputStream.close(); } - fs.truncate(path, position); + fs.truncate(path, size); if (fs.exists(path)) { fsDataOutputStream = fs.append(path); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index 128eba5..35e3e7e 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -91,7 +91,7 @@ public class TsFileRecoverPerformer { * data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs * * @return a RestorableTsFileIOWriter and a list of RestorableTsFileIOWriter of vmfiles, if the - * file and the vmfiles are not closed before crush, so these writers can be used to continue + * file and the vmfiles are not closed before crash, so these writers can be used to continue * writing */ public Pair<RestorableTsFileIOWriter, List<List<RestorableTsFileIOWriter>>> recover() @@ -147,13 +147,13 @@ public class TsFileRecoverPerformer { "recover the resource file failed: " + filePath + RESOURCE_SUFFIX + e); } - } else { - // tsfile has crashed - // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time - // map must be updated first to avoid duplicated insertion - recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource); } + // tsfile has crashed + // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time + // map must be updated first to avoid duplicated insertion + recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource); + // If the vm is not enable, the walTargetWriter points to the tsfile. // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log // if the vm is enable and flush log does not exist, the walTargetWriter is null. diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java index 0217147..fb00e62 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileCheckStatus.java @@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.read; public class TsFileCheckStatus { public static final long COMPLETE_FILE = -1; - public static final long ONLY_MAGIC_HEAD = -2; public static final long INCOMPATIBLE_FILE = -3; public static final long FILE_NOT_FOUND = -4; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index e72230a..4639101 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param file -given file name + * @param file -given file name * @param loadMetadataSize -whether load meta data size */ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { @@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param input -given input + * @param input -given input * @param loadMetadataSize -load meta data size */ public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException { @@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param input the input of a tsfile. The current position should be a markder and then a chunk - * Header, rather than the magic number - * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning - * of the input to the current position + * @param input the input of a tsfile. The current position should be a markder and + * then a chunk Header, rather than the magic number + * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning + * of the input to the current position * @param fileMetadataSize the byte size of the file metadata in the input */ public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) { @@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable { /** * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas * - * @param metadataIndex MetadataIndexEntry - * @param buffer byte buffer - * @param deviceId String + * @param metadataIndex MetadataIndexEntry + * @param buffer byte buffer + * @param deviceId String * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list */ private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer, @@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable { * Get target MetadataIndexEntry and its end offset * * @param metadataIndex given MetadataIndexNode - * @param name target device / measurement name - * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When - * searching for a device node, return when it is not INTERNAL_DEVICE. Likewise, when searching - * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the - * situation when the index tree does NOT have the device level and ONLY has the measurement - * level. + * @param name target device / measurement name + * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or + * INTERNAL_MEASUREMENT. When searching for a device node, return when it is + * not INTERNAL_DEVICE. Likewise, when searching for a measurement node, + * return when it is not INTERNAL_MEASUREMENT. This works for the situation + * when the index tree does NOT have the device level and ONLY has the + * measurement level. * @return target MetadataIndexEntry, endOffset pair */ private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex, @@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. * - * @param position the offset of the chunk group footer in the file + * @param position the offset of the chunk group footer in the file * @param markerRead true if the offset does not contains the marker , otherwise false * @return a CHUNK_GROUP_FOOTER * @throws IOException io error @@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable { /** * read the chunk's header. * - * @param position the file offset of this chunk's header + * @param position the file offset of this chunk's header * @param chunkHeaderSize the size of chunk's header - * @param markerRead true if the offset does not contains the marker , otherwise false + * @param markerRead true if the offset does not contains the marker , otherwise false */ private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead) throws IOException { @@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable { * changed. * * @param position the start position of data in the tsFileInput, or the current position if - * position = -1 - * @param size the size of data that want to read + * position = -1 + * @param size the size of data that want to read * @return data that been read. */ private ByteBuffer readData(long position, int size) throws IOException { @@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable { * position. * * @param start the start position of data in the tsFileInput, or the current position if position - * = -1 - * @param end the end position of data that want to read + * = -1 + * @param end the end position of data that want to read * @return data that been read. */ private ByteBuffer readData(long start, long end) throws IOException { @@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable { /** * Self Check the file and return the position before where the data is safe. * - * @param newSchema the schema on each time series in the file + * @param newSchema the schema on each time series in the file * @param chunkGroupMetadataList ChunkGroupMetadata List - * @param versionInfo version pair List - * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList - * parameter will be not modified. + * @param versionInfo version pair List + * @param fastFinish if true and the file is complete, then newSchema and + * chunkGroupMetadataList parameter will be not modified. * @return the position of the file that is fine. All data after the position in the file should * be truncated. */ @@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable { if (fileSize < headerLength) { return TsFileCheckStatus.INCOMPATIBLE_FILE; } - String magic = readHeadMagic(); - tsFileInput.position(headerLength); - if (!magic.equals(TSFileConfig.MAGIC_STRING)) { + if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER + .equals(readTailMagic())) { return TsFileCheckStatus.INCOMPATIBLE_FILE; } + tsFileInput.position(headerLength); if (fileSize == headerLength) { - return TsFileCheckStatus.ONLY_MAGIC_HEAD; - } else if (readTailMagic().equals(magic)) { + return headerLength; + } else if (isComplete()) { loadMetadataSize(); if (fastFinish) { return TsFileCheckStatus.COMPLETE_FILE; @@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable { } boolean newChunkGroup = true; // not a complete file, we will recover it... - long truncatedPosition = headerLength; + long truncatedSize = headerLength; byte marker; int chunkCnt = 0; List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); @@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable { } chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList)); newChunkGroup = true; - truncatedPosition = this.position(); + truncatedSize = this.position(); totalChunkNum += chunkCnt; chunkCnt = 0; @@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable { case MetaMarker.VERSION: long version = readVersion(); versionInfo.add(new Pair<>(position(), version)); - truncatedPosition = this.position(); + truncatedSize = this.position(); break; default: // the disk file is corrupted, using this file may be dangerous @@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable { } // now we read the tail of the data section, so we are sure that the last // ChunkGroupFooter is complete. - truncatedPosition = this.position() - 1; + truncatedSize = this.position() - 1; } catch (Exception e) { logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}", file, this.position(), e.getMessage()); } // Despite the completeness of the data section, we will discard current FileMetadata // so that we can continue to write data into this tsfile. - return truncatedPosition; + return truncatedSize; } public int getTotalChunkNum() { @@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable { * get device names which has valid chunks in [start, end) * * @param start start of the partition - * @param end end of the partition + * @param end end of the partition * @return device names in range */ public List<String> getDeviceNameInRange(long start, long end) throws IOException { @@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable { * Check if the device has at least one Chunk in this partition * * @param seriesMetadataMap chunkMetaDataList of each measurement - * @param start the start position of the space partition - * @param end the end position of the space partition + * @param start the start position of the space partition + * @param end the end position of the space partition */ private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap, long start, long end) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java index 1e6e105..8423dfb 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java @@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput { } @Override - public void truncate(long position) throws IOException { - outputStream.getChannel().truncate(position); + public void truncate(long size) throws IOException { + outputStream.getChannel().truncate(size); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 3586f93..52052d4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory; public class RestorableTsFileIOWriter extends TsFileIOWriter { private static final Logger logger = LoggerFactory.getLogger("FileMonitor"); - private long truncatedPosition = -1; + private long truncatedSize = -1; private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>(); private int lastFlushedChunkGroupIndex = 0; @@ -91,32 +90,21 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { if (file.exists()) { try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { - // this tsfile is complete - if (reader.isComplete()) { + truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true); + totalChunkNum = reader.getTotalChunkNum(); + if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) { crashed = false; canWrite = false; out.close(); - return; - } - - // uncompleted file - truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true); - totalChunkNum = reader.getTotalChunkNum(); - if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) { + } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) { out.close(); throw new NotCompatibleTsFileException( String.format("%s is not in TsFile format.", file.getAbsolutePath())); - } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) { - crashed = true; - canWrite = true; - out.truncate( - (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER - .getBytes().length); } else { crashed = true; canWrite = true; // remove broken data - out.truncate(truncatedPosition); + out.truncate(truncatedSize); } } } @@ -153,8 +141,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { return new RestorableTsFileIOWriter(file); } - long getTruncatedPosition() { - return truncatedPosition; + long getTruncatedSize() { + return truncatedSize; } public Map<Path, MeasurementSchema> getKnownSchema() { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java index 5b9fab1..77ce3e2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java @@ -78,8 +78,8 @@ public interface TsFileOutput { /** * The same with {@link java.nio.channels.FileChannel#truncate(long)}. * - * @param position -position + * @param size size The new size, a non-negative byte count */ - void truncate(long position) throws IOException; + void truncate(long size) throws IOException; } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java index 98def4c..28fee6d 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileWriter; import java.util.ArrayList; import java.util.List; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.constant.TestConstant; import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; import org.apache.iotdb.tsfile.file.MetaMarker; @@ -81,10 +82,11 @@ public class RestorableTsFileIOWriterTest { RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file); writer = new TsFileWriter(rWriter); writer.close(); - assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition()); + assertEquals(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER + .getBytes().length, rWriter.getTruncatedSize()); rWriter = new RestorableTsFileIOWriter(file); - assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition()); + assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize()); assertFalse(rWriter.canWrite()); rWriter.close(); assertTrue(file.delete()); @@ -101,7 +103,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -115,7 +117,7 @@ public class RestorableTsFileIOWriterTest { TsFileWriter writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -133,7 +135,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -159,7 +161,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); // truncate version marker and version - assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition()); + assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -219,7 +221,7 @@ public class RestorableTsFileIOWriterTest { RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file); writer = new TsFileWriter(rWriter); writer.close(); - assertNotEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition()); + assertNotEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedSize()); TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME); List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1")); assertNotNull(chunkMetadataList);
