This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch fix_nativeRestoreTsFileWriter_if_file_is_complete in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 473841039ee85d334fbcee264215abcc6f0e396f Author: xiangdong huang <[email protected]> AuthorDate: Fri May 31 09:56:54 2019 +0800 fix bug when using nativeRestoreIOWriter to open a sealed TsFile with append=true mode --- .../apache/iotdb/tsfile/TsFileSequenceRead.java | 16 ++++++-- .../iotdb/tsfile/read/TsFileSequenceReader.java | 39 ++++++++++++++++++-- .../write/writer/NativeRestorableIOWriter.java | 43 +++++++++++++++++++--- 3 files changed, 86 insertions(+), 12 deletions(-) diff --git a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java index fb82e91..3bb9042 100644 --- a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java +++ b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; @@ -104,13 +106,21 @@ public class TsFileSequenceRead { List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream() .sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList()); for (TsDeviceMetadataIndex index : deviceMetadataIndexList) { + System.out.println(String + .format("\t[DeviceMetadata List]File Offset: %d, Len %d", + index.getOffset(), index.getLen())); + TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index); List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList(); + int i = 0; for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) { System.out.println(String - .format("\t[Device]File Offset: %d, Device %s, Number of Chunk Groups %d", - index.getOffset(), chunkGroupMetaData.getDeviceID(), - chunkGroupMetaDataList.size())); + .format("\t[DeviceMetadata]Data Start Offset: %d, Data End Offset: %d, " + + "Device %s, Number %d/%d of Chunk Groups", + chunkGroupMetaData.getStartOffsetOfChunkGroup(), + chunkGroupMetaData.getEndOffsetOfChunkGroup(), + chunkGroupMetaData.getDeviceID(), + ++i, chunkGroupMetaDataList.size())); for (ChunkMetaData chunkMetadata : chunkGroupMetaData.getChunkMetaDataList()) { System.out.println("\t\tMeasurement:" + chunkMetadata.getMeasurementUid()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 9f7ab47..1000a8a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -138,7 +138,14 @@ public class TsFileSequenceReader implements AutoCloseable{ this.fileMetadataSize = fileMetadataSize; } - protected void loadMetadataSize() throws IOException { + /** + * this method can be called only if the tsfile is complete. + * + * init fileMeataSize and Position according to the fileMetadata. + * <br>This method will move the file reader position behind of the magic header.</> + * @throws IOException + */ + public void loadMetadataSize() throws IOException { ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES); tsFileInput.read(metadataSize, tsFileInput.size() - TSFileConfig.MAGIC_STRING.length() - Integer.BYTES); @@ -459,6 +466,29 @@ public class TsFileSequenceReader implements AutoCloseable{ .readAsPossible(tsFileInput, target, position, length); } + public List<ChunkGroupMetaData> readAllChunkGroupMetaData(TsFileMetaData fileMetaData) + throws IOException { + List<ChunkGroupMetaData> result = new ArrayList<>(); + for (TsDeviceMetadataIndex index : fileMetaData.getDeviceMap().values()) { + result.addAll(this.readTsDeviceMetaData(index).getChunkGroupMetaDataList()); + } + return result; + } + + /** + * + * @param fileMetaData + * @return the first tsdevicMetadata location in the file (not include the Marker) + */ + public long getFirstTsDeviceMetadataPosition(TsFileMetaData fileMetaData) { + long position = this.getFileMetadataPos(); + for(TsDeviceMetadataIndex index : fileMetaData.getDeviceMap().values()) { + if (index.getOffset() < position) { + position = index.getOffset(); + } + } + return position; + } /** * Self Check the file and return the position before where the data is safe. * @@ -509,9 +539,12 @@ public class TsFileSequenceReader implements AutoCloseable{ return TsFileCheckStatus.ONLY_MAGIC_HEAD; } else if (readTailMagic().equals(magic)) { loadMetadataSize(); - if (fastFinish) { - return TsFileCheckStatus.COMPLETE_FILE; + if (!fastFinish) { + TsFileMetaData metaData = readFileMetadata(); + newSchema.putAll(metaData.getMeasurementSchema()); + newMetaData.addAll(readAllChunkGroupMetaData(metaData)); } + return TsFileCheckStatus.COMPLETE_FILE; } // not a complete file, we will recover it... diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java index 2496d42..4d9c0d5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; import org.apache.iotdb.tsfile.read.TsFileCheckStatus; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.write.schema.FileSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ public class NativeRestorableIOWriter extends TsFileIOWriter { /** * @param file a given tsfile path you want to (continue to) write * @param append if true, then the file can support appending data even though the file is complete (i.e., tail magic string exists) + * if false, whether the file can support appending data depends on whether the file is complete. * @throws IOException if write failed, or the file is broken but autoRepair==false. */ public NativeRestorableIOWriter(File file, boolean append) throws IOException { @@ -62,16 +65,16 @@ public class NativeRestorableIOWriter extends TsFileIOWriter { return; } if (file.exists()) { + //TODO try to use a cached reader rather than create a new reader. try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { - if (reader.isComplete() && !append) { - canWrite = false; - out.close(); + if (reader.isComplete()) { + handleCompleteFile(reader, append); return; } truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, !append); - if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE && !append) { - this.canWrite = false; - out.close(); + if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE ) { + //actually, there is no way to access this code... + handleCompleteFile(reader, append); } else if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) { out.close(); throw new IOException( @@ -86,8 +89,36 @@ public class NativeRestorableIOWriter extends TsFileIOWriter { } } + private void handleCompleteFile(TsFileSequenceReader reader, boolean supportAppend) throws IOException { + if (!supportAppend) { + canWrite = false; + out.close(); + }else { + //remove the fileMeatadata + reader.loadMetadataSize(); + //TODO try to use a cached reader rather than create a new reader. + TsFileMetaData metaData = reader.readFileMetadata(); + knownSchemas = metaData.getMeasurementSchema(); + chunkGroupMetaDataList = reader.readAllChunkGroupMetaData(metaData); + out.truncate(reader.getFirstTsDeviceMetadataPosition(metaData) - 1); + } + } + @Override public Map<String, MeasurementSchema> getKnownSchema() { return knownSchemas; } + + @Override + public void endFile(FileSchema schema) throws IOException { + if (!canWrite) { + return; + }else { + super.endFile(schema); + } + } + + public void endFile() throws IOException { + super.endFile(new FileSchema(knownSchemas)); + } }
