This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch improve/recover in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit a6282dcc9e1b46fa78eb5632e03763f59403c219 Author: JackieTien97 <[email protected]> AuthorDate: Thu Jul 23 14:50:47 2020 +0800 refactor recover process --- .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 4 +- .../writelog/recover/TsFileRecoverPerformer.java | 24 +++---- .../iotdb/tsfile/read/TsFileSequenceReader.java | 79 +++++++++++----------- .../tsfile/write/writer/LocalTsFileOutput.java | 4 +- .../write/writer/RestorableTsFileIOWriter.java | 42 +++++------- .../iotdb/tsfile/write/writer/TsFileOutput.java | 4 +- .../write/writer/RestorableTsFileIOWriterTest.java | 14 ++-- 7 files changed, 83 insertions(+), 88 deletions(-) diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java index 8693eb2..aec1a58 100644 --- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java +++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSOutput.java @@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput { } @Override - public void truncate(long position) throws IOException { + public void truncate(long size) throws IOException { if (fs.exists(path)) { fsDataOutputStream.close(); } - fs.truncate(path, position); + fs.truncate(path, size); if (fs.exists(path)) { fsDataOutputStream = fs.append(path); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index 22d2904..38ab3c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -127,20 +127,20 @@ public class TsFileRecoverPerformer { // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time // map must be updated first to avoid duplicated insertion recoverResourceFromWriter(restorableTsFileIOWriter); - } - // redo logs - redoLogs(restorableTsFileIOWriter); + // redo logs + redoLogs(restorableTsFileIOWriter); - // clean logs - try { - MultiFileLogNodeManager.getInstance() - .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName()); - } catch (IOException e) { - throw new StorageGroupProcessorException(e); - } + // clean logs + try { + MultiFileLogNodeManager.getInstance() + .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName()); + } catch (IOException e) { + throw new StorageGroupProcessorException(e); + } - return restorableTsFileIOWriter; + return restorableTsFileIOWriter; + } } private void recoverResourceFromFile() throws IOException { @@ -206,10 +206,10 @@ public class TsFileRecoverPerformer { // end the file if it is not the last file or it is closed before crush restorableTsFileIOWriter.endFile(); resource.cleanCloseFlag(); + resource.serialize(); } // otherwise this file is not closed before crush, do nothing so we can continue writing // into it - resource.serialize(); } catch (IOException | InterruptedException | ExecutionException e) { throw new StorageGroupProcessorException(e); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index d259f1d..1cb124c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param file -given file name + * @param file -given file name * @param loadMetadataSize -whether load meta data size */ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { @@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param input -given input + * @param input -given input * @param loadMetadataSize -load meta data size */ public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException { @@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable { /** * construct function for TsFileSequenceReader. * - * @param input the input of a tsfile. The current position should be a markder and then a chunk - * Header, rather than the magic number - * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning - * of the input to the current position + * @param input the input of a tsfile. The current position should be a markder and + * then a chunk Header, rather than the magic number + * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning + * of the input to the current position * @param fileMetadataSize the byte size of the file metadata in the input */ public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) { @@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable { /** * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas * - * @param metadataIndex MetadataIndexEntry - * @param buffer byte buffer - * @param deviceId String + * @param metadataIndex MetadataIndexEntry + * @param buffer byte buffer + * @param deviceId String * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list */ private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer, @@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable { * Get target MetadataIndexEntry and its end offset * * @param metadataIndex given MetadataIndexNode - * @param name target device / measurement name - * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When - * searching for a device node, return when it is not INTERNAL_DEVICE. Likewise, when searching - * for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the - * situation when the index tree does NOT have the device level and ONLY has the measurement - * level. + * @param name target device / measurement name + * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or + * INTERNAL_MEASUREMENT. When searching for a device node, return when it is + * not INTERNAL_DEVICE. Likewise, when searching for a measurement node, + * return when it is not INTERNAL_MEASUREMENT. This works for the situation + * when the index tree does NOT have the device level and ONLY has the + * measurement level. * @return target MetadataIndexEntry, endOffset pair */ private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex, @@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable { /** * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. * - * @param position the offset of the chunk group footer in the file + * @param position the offset of the chunk group footer in the file * @param markerRead true if the offset does not contains the marker , otherwise false * @return a CHUNK_GROUP_FOOTER * @throws IOException io error @@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable { /** * read the chunk's header. * - * @param position the file offset of this chunk's header + * @param position the file offset of this chunk's header * @param chunkHeaderSize the size of chunk's header - * @param markerRead true if the offset does not contains the marker , otherwise false + * @param markerRead true if the offset does not contains the marker , otherwise false */ private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead) throws IOException { @@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable { * changed. * * @param position the start position of data in the tsFileInput, or the current position if - * position = -1 - * @param size the size of data that want to read + * position = -1 + * @param size the size of data that want to read * @return data that been read. */ private ByteBuffer readData(long position, int size) throws IOException { @@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable { * position. * * @param start the start position of data in the tsFileInput, or the current position if position - * = -1 - * @param end the end position of data that want to read + * = -1 + * @param end the end position of data that want to read * @return data that been read. */ private ByteBuffer readData(long start, long end) throws IOException { @@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable { /** * Self Check the file and return the position before where the data is safe. * - * @param newSchema the schema on each time series in the file + * @param newSchema the schema on each time series in the file * @param chunkGroupMetadataList ChunkGroupMetadata List - * @param versionInfo version pair List - * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList - * parameter will be not modified. + * @param versionInfo version pair List + * @param fastFinish if true and the file is complete, then newSchema and + * chunkGroupMetadataList parameter will be not modified. * @return the position of the file that is fine. All data after the position in the file should * be truncated. */ @@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable { if (fileSize < headerLength) { return TsFileCheckStatus.INCOMPATIBLE_FILE; } - String magic = readHeadMagic(); - tsFileInput.position(headerLength); - if (!magic.equals(TSFileConfig.MAGIC_STRING)) { + if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER + .equals(readTailMagic())) { return TsFileCheckStatus.INCOMPATIBLE_FILE; } + tsFileInput.position(headerLength); if (fileSize == headerLength) { - return TsFileCheckStatus.ONLY_MAGIC_HEAD; - } else if (readTailMagic().equals(magic)) { + return headerLength; + } else if (isComplete()) { loadMetadataSize(); if (fastFinish) { return TsFileCheckStatus.COMPLETE_FILE; @@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable { } boolean newChunkGroup = true; // not a complete file, we will recover it... - long truncatedPosition = headerLength; + long truncatedSize = headerLength; byte marker; int chunkCnt = 0; List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); @@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable { } chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList)); newChunkGroup = true; - truncatedPosition = this.position(); + truncatedSize = this.position(); totalChunkNum += chunkCnt; chunkCnt = 0; @@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable { case MetaMarker.VERSION: long version = readVersion(); versionInfo.add(new Pair<>(position(), version)); - truncatedPosition = this.position(); + truncatedSize = this.position(); break; default: // the disk file is corrupted, using this file may be dangerous @@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable { } // now we read the tail of the data section, so we are sure that the last // ChunkGroupFooter is complete. - truncatedPosition = this.position() - 1; + truncatedSize = this.position() - 1; } catch (Exception e) { logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}", file, this.position(), e.getMessage()); } // Despite the completeness of the data section, we will discard current FileMetadata // so that we can continue to write data into this tsfile. - return truncatedPosition; + return truncatedSize; } public int getTotalChunkNum() { @@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable { * get device names which has valid chunks in [start, end) * * @param start start of the partition - * @param end end of the partition + * @param end end of the partition * @return device names in range */ public List<String> getDeviceNameInRange(long start, long end) throws IOException { @@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable { * Check if the device has at least one Chunk in this partition * * @param seriesMetadataMap chunkMetaDataList of each measurement - * @param start the start position of the space partition - * @param end the end position of the space partition + * @param start the start position of the space partition + * @param end the end position of the space partition */ private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap, long start, long end) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java index 1e6e105..8423dfb 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java @@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput { } @Override - public void truncate(long position) throws IOException { - outputStream.getChannel().truncate(position); + public void truncate(long size) throws IOException { + outputStream.getChannel().truncate(size); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 8a12b05..536f887 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory; public class RestorableTsFileIOWriter extends TsFileIOWriter { private static final Logger logger = LoggerFactory.getLogger("FileMonitor"); - private long truncatedPosition = -1; + private long truncatedSize = -1; private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>(); private int lastFlushedChunkGroupIndex = 0; @@ -74,36 +73,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { // file doesn't exist if (file.length() == 0) { startFile(); + canWrite = true; + crashed = true; return; } if (file.exists()) { try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { - // this tsfile is complete - if (reader.isComplete()) { + truncatedSize = reader + .selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true); + totalChunkNum = reader.getTotalChunkNum(); + if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) { crashed = false; canWrite = false; out.close(); - return; - } - - // uncompleted file - truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true); - totalChunkNum = reader.getTotalChunkNum(); - if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) { + } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) { out.close(); throw new NotCompatibleTsFileException( String.format("%s is not in TsFile format.", file.getAbsolutePath())); - } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) { - crashed = true; - out.truncate( - (long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER - .getBytes().length); } else { crashed = true; + canWrite = true; // remove broken data - out.truncate(truncatedPosition); + out.truncate(truncatedSize); } } } @@ -140,8 +133,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { return new RestorableTsFileIOWriter(file); } - long getTruncatedPosition() { - return truncatedPosition; + long getTruncatedSize() { + return truncatedSize; } public Map<Path, MeasurementSchema> getKnownSchema() { @@ -162,7 +155,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId, TSDataType dataType) { List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); - if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) { + if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId) + .containsKey(measurementId)) { for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) { // filter: if a device'measurement is defined as float type, and data has been persistent. // Then someone deletes the timeseries and recreate it with Int type. We have to ignore @@ -180,8 +174,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { } /** - * add all appendChunkMetadatas into memory. After calling this method, other classes can - * read these metadata. + * add all appendChunkMetadatas into memory. After calling this method, other classes can read + * these metadata. */ public void makeMetadataVisible() { @@ -212,8 +206,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { } /** - * get all the chunk's metadata which are appended after the last calling of this method, or - * after the class instance is initialized if this is the first time to call the method. + * get all the chunk's metadata which are appended after the last calling of this method, or after + * the class instance is initialized if this is the first time to call the method. * * @return a list of Device ChunkMetadataList Pair */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java index 5b9fab1..8da1f17 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java @@ -78,8 +78,8 @@ public interface TsFileOutput { /** * The same with {@link java.nio.channels.FileChannel#truncate(long)}. * - * @param position -position + * @param size The new size, a non-negative byte count */ - void truncate(long position) throws IOException; + void truncate(long size) throws IOException; } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java index 0c777c3..a11296f 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java @@ -81,10 +81,10 @@ public class RestorableTsFileIOWriterTest { RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file); writer = new TsFileWriter(rWriter); writer.close(); - assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition()); + assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedSize()); rWriter = new RestorableTsFileIOWriter(file); - assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition()); + assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize()); assertFalse(rWriter.canWrite()); rWriter.close(); assertTrue(file.delete()); @@ -101,7 +101,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -115,7 +115,7 @@ public class RestorableTsFileIOWriterTest { TsFileWriter writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -133,7 +133,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -159,7 +159,7 @@ public class RestorableTsFileIOWriterTest { writer = new TsFileWriter(rWriter); writer.close(); // truncate version marker and version - assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition()); + assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize()); assertTrue(file.delete()); } @@ -221,7 +221,7 @@ public class RestorableTsFileIOWriterTest { writer.close(); assertNotEquals( TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length, - rWriter.getTruncatedPosition()); + rWriter.getTruncatedSize()); TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME); List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1")); assertNotNull(chunkMetadataList);
