This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch feature_read_data_from_unclosed_tsfile in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 5cbfd492f229f521b9392119d5cbda07d5501ef8 Author: xiangdong huang <[email protected]> AuthorDate: Tue Mar 5 01:04:02 2019 +0800 add a restorable tsfile writer without restore file --- .../engine/bufferwrite/BufferWriteProcessor.java | 1 + .../iotdb/db/integration/IoTDBCompleteIT.java | 2 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 32 +++- .../writer/NativeRestorableTsFileIOWriter.java | 164 +++++++++++++++++++++ .../write/writer}/RestorableTsFileIOWriter.java | 2 +- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 6 + .../writer}/RestorableTsFileIOWriterTest.java | 75 +++++----- 7 files changed, 240 insertions(+), 42 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java index dc56312..ecdcaa3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java @@ -53,6 +53,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.schema.FileSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java index 34bc509..52340a8 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java @@ -157,7 +157,7 @@ public class IoTDBCompleteIT { + " DataType: DOUBLE,\n" + " Encoding: RLE,\n" + " Compressor: UNCOMPRESSED,\n" - + " args: {max_point_number=10},\n" + + " args: {max_point_number=101},\n" + " StorageGroup: root.vehicle\n" + " }\n" + " }\n" diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index b66fe38..d50af27 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -267,6 +267,22 @@ public class TsFileSequenceReader { return tsFileInput.position(); } + public void skipPageData(PageHeader header) throws IOException { + tsFileInput.position(tsFileInput.position() + header.getCompressedSize()); + } + + /** + * + * @param header + * @param position + * @return + * @throws IOException + */ + public long skipPageData(PageHeader header, long position) throws IOException { + return position + header.getCompressedSize(); + } + + public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException { return readPage(header, type, -1); } @@ -293,7 +309,9 @@ public class TsFileSequenceReader { */ public byte readMarker() throws IOException { markerBuffer.clear(); - ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), markerBuffer); + if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), markerBuffer) == 0) { + throw new IOException("reach the end of the file."); + } markerBuffer.flip(); return markerBuffer.get(); } @@ -310,6 +328,10 @@ public class TsFileSequenceReader { return this.file; } + public long fileSize() throws IOException { + return tsFileInput.size(); + } + /** * read data from tsFileInput, from the current position (if position = -1), or the given * position. <br> if position = -1, the tsFileInput's position will be changed to the current @@ -324,9 +346,13 @@ public class TsFileSequenceReader { private ByteBuffer readData(long position, int size) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(size); if (position == -1) { - ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer); + if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer) != size) { + throw new IOException("reach the end of the data"); + } } else { - ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer, position, size); + if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer, position, size) != size) { + throw new IOException("reach the end of the data"); + } } buffer.flip(); return buffer; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java new file mode 100644 index 0000000..19f98a6 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * a restorable tsfile which do not depend on a restore file. + */ +public class NativeRestorableTsFileIOWriter extends TsFileIOWriter { + + private static final Logger LOGGER = LoggerFactory + .getLogger(NativeRestorableTsFileIOWriter.class); + + + public NativeRestorableTsFileIOWriter(String insertFilePath) throws IOException { + super(); + File insertFile = new File(insertFilePath); + + if (!insertFile.exists()) { + this.out = new DefaultTsFileOutput(insertFile); + startFile(); + return; + } + + //we need to read data to recover TsFileIOWriter.chunkGroupMetaDataList + //and remove broken data if exists. + List<ChunkGroupMetaData> metadatas = this.getChunkGroupMetaDatas(); + + ChunkMetaData currentChunk; + String measurementID; + TSDataType dataType; + long fileOffsetOfChunk; + long startTimeOfChunk = 0; + long endTimeOfChunk; + + ChunkGroupMetaData currentChunkGroup; + List<ChunkMetaData> chunks = null; + String deviceID; + long startOffsetOfChunkGroup = 0; + long endOffsetOfChunkGroup; + long versionOfChunkGroup = 0; + boolean newGroup = true; + + TsFileSequenceReader reader = new TsFileSequenceReader(insertFilePath); + if (reader.fileSize() <= 4) { + LOGGER.debug("{} does not worth to recover, will create a new one.", insertFilePath); + reader.close(); + this.out.truncate(0); + startFile(); + return; + } + if (reader.readTailMagic().equals(reader.readHeadMagic())) { + LOGGER.debug("{} is an complete TsFile.", insertFilePath); + complete = true; + return; + } + + // not a complete file, we will recover it... + long pos = magicStringBytes.length; + boolean goon = true; + byte marker; + try { + while (goon && (marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + //this is a chunk. + if (newGroup) { + chunks = new ArrayList<>(); + startOffsetOfChunkGroup = reader.position() - 1; + newGroup = false; + } + //if there is something wrong with a chunk, we will drop this part of data + // (the whole ChunkGroup) + try { + ChunkHeader header = reader.readChunkHeader(); + measurementID = header.getMeasurementID(); + dataType = header.getDataType(); + fileOffsetOfChunk = reader.position() - 1; + if (header.getNumOfPages() > 0) { + PageHeader pageHeader = reader.readPageHeader(header.getDataType()); + startTimeOfChunk = pageHeader.getMinTimestamp(); + reader.skipPageData(pageHeader); + } + for (int j = 1; j < header.getNumOfPages() - 1; j++) { + //a new Page + PageHeader pageHeader = reader.readPageHeader(header.getDataType()); + reader.skipPageData(pageHeader); + } + if (header.getNumOfPages() > 1) { + PageHeader pageHeader = reader.readPageHeader(header.getDataType()); + endTimeOfChunk = pageHeader.getMinTimestamp(); + reader.skipPageData(pageHeader); + } else { + endTimeOfChunk = startTimeOfChunk; + } + currentChunk = new ChunkMetaData(measurementID, dataType, fileOffsetOfChunk, + startTimeOfChunk, endTimeOfChunk); + chunks.add(currentChunk); + } catch (IOException e) { + goon = false; + } + break; + case MetaMarker.CHUNK_GROUP_FOOTER: + //this is a chunk group + //if there is something wrong with the chunkGroup Footer, we will drop this part of data + //because we can not guarantee the correction of the deviceId. + try { + ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter(); + deviceID = chunkGroupFooter.getDeviceID(); + endOffsetOfChunkGroup = reader.position(); + currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks, startOffsetOfChunkGroup); + currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup); + currentChunkGroup.setVersion(versionOfChunkGroup++);//TODO is this OK? + metadatas.add(currentChunkGroup); + newGroup = true; + //we get a complete chunk group now. + pos = getPos(); + } catch (IOException e) { + goon = false; + } + break; + default: + //no data else + goon = false; + } + } + } catch (IOException e2) { + } finally { + //something wrong or all data is complete. We will discard current FileMetadata + out.truncate(pos); + } + + + } + + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java similarity index 99% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java rename to tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 7331104..83b627a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.bufferwrite; +package org.apache.iotdb.tsfile.write.writer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 02f9fee..ae896dc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -67,6 +67,7 @@ public class TsFileIOWriter { protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>(); private ChunkGroupMetaData currentChunkGroupMetaData; private ChunkMetaData currentChunkMetaData; + protected boolean complete = false; /** * empty construct function. @@ -236,6 +237,7 @@ public class TsFileIOWriter { // close file out.close(); + complete = true; LOG.info("output stream is closed"); } @@ -315,4 +317,8 @@ public class TsFileIOWriter { return chunkGroupMetaDataList; } + public boolean isComplete() { + return complete; + } + } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java similarity index 80% rename from iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java rename to tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java index 3cdc9f1..eadcaa5 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.bufferwrite; +package org.apache.iotdb.tsfile.write.writer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -46,6 +46,7 @@ import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.FileSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Before; @@ -74,13 +75,13 @@ public class RestorableTsFileIOWriterTest { writer = new RestorableTsFileIOWriter(processorName, insertPath); Pair<Long, List<ChunkGroupMetaData>> pair = writer.readRestoreInfo(); - assertEquals(true, new File(restorePath).exists()); + Assert.assertEquals(true, new File(restorePath).exists()); - assertEquals(TsFileIOWriter.magicStringBytes.length, (long) pair.left); - assertEquals(0, pair.right.size()); + Assert.assertEquals(TsFileIOWriter.magicStringBytes.length, (long) pair.left); + Assert.assertEquals(0, pair.right.size()); writer.endFile(new FileSchema()); deleteInsertFile(); - assertEquals(false, new File(restorePath).exists()); + Assert.assertEquals(false, new File(restorePath).exists()); } @Test @@ -92,9 +93,9 @@ public class RestorableTsFileIOWriterTest { // mkdir fileOutputStream.write(new byte[400]); fileOutputStream.close(); - assertEquals(true, insertFile.exists()); - assertEquals(true, restoreFile.exists()); - assertEquals(400, insertFile.length()); + Assert.assertEquals(true, insertFile.exists()); + Assert.assertEquals(true, restoreFile.exists()); + Assert.assertEquals(400, insertFile.length()); writer.endFile(new FileSchema()); FileOutputStream out = new FileOutputStream(new File(restorePath)); @@ -106,10 +107,10 @@ public class RestorableTsFileIOWriterTest { out.close(); writer = new RestorableTsFileIOWriter(processorName, insertPath); - assertEquals(true, insertFile.exists()); - assertEquals(200, insertFile.length()); - assertEquals(insertPath, writer.getInsertFilePath()); - assertEquals(restorePath, writer.getRestoreFilePath()); + Assert.assertEquals(true, insertFile.exists()); + Assert.assertEquals(200, insertFile.length()); + Assert.assertEquals(insertPath, writer.getInsertFilePath()); + Assert.assertEquals(restorePath, writer.getRestoreFilePath()); writer.endFile(new FileSchema()); deleteInsertFile(); } @@ -133,16 +134,16 @@ public class RestorableTsFileIOWriterTest { writer = new RestorableTsFileIOWriter(processorName, insertPath); // writer.endFile(new FileSchema()); - assertEquals(true, insertFile.exists()); - assertEquals(true, restoreFile.exists()); + Assert.assertEquals(true, insertFile.exists()); + Assert.assertEquals(true, restoreFile.exists()); RestorableTsFileIOWriter tempbufferwriteResource = new RestorableTsFileIOWriter(processorName, insertPath); - assertEquals(true, insertFile.exists()); - assertEquals(200, insertFile.length()); - assertEquals(insertPath, tempbufferwriteResource.getInsertFilePath()); - assertEquals(restorePath, tempbufferwriteResource.getRestoreFilePath()); + Assert.assertEquals(true, insertFile.exists()); + Assert.assertEquals(200, insertFile.length()); + Assert.assertEquals(insertPath, tempbufferwriteResource.getInsertFilePath()); + Assert.assertEquals(restorePath, tempbufferwriteResource.getRestoreFilePath()); tempbufferwriteResource.endFile(new FileSchema()); writer.endFile(new FileSchema()); @@ -175,31 +176,31 @@ public class RestorableTsFileIOWriterTest { TsFileSequenceReader reader = new TsFileSequenceReader(insertPath); TsFileMetaData metaData = reader.readFileMetadata(); - assertEquals(2, metaData.getDeviceMap().size()); + Assert.assertEquals(2, metaData.getDeviceMap().size()); List<ChunkGroupMetaData> chunkGroups = reader .readTsDeviceMetaData(metaData.getDeviceMap().get("d1")) .getChunkGroupMetaDataList(); - assertEquals(1, chunkGroups.size()); + Assert.assertEquals(1, chunkGroups.size()); List<ChunkMetaData> chunks = chunkGroups.get(0).getChunkMetaDataList(); - assertEquals(2, chunks.size()); + Assert.assertEquals(2, chunks.size()); // d1.s1 - assertEquals(chunks.get(0).getStartTime(), 1); - assertEquals(chunks.get(0).getEndTime(), 2); - assertEquals(chunks.get(0).getNumOfPoints(), 2); + Assert.assertEquals(chunks.get(0).getStartTime(), 1); + Assert.assertEquals(chunks.get(0).getEndTime(), 2); + Assert.assertEquals(chunks.get(0).getNumOfPoints(), 2); // d1.s2 - assertEquals(chunks.get(1).getStartTime(), 1); - assertEquals(chunks.get(1).getEndTime(), 3); - assertEquals(chunks.get(1).getNumOfPoints(), 2); + Assert.assertEquals(chunks.get(1).getStartTime(), 1); + Assert.assertEquals(chunks.get(1).getEndTime(), 3); + Assert.assertEquals(chunks.get(1).getNumOfPoints(), 2); chunkGroups = reader.readTsDeviceMetaData(metaData.getDeviceMap().get("d2")).getChunkGroupMetaDataList(); - assertEquals(1, chunkGroups.size()); + Assert.assertEquals(1, chunkGroups.size()); chunks = chunkGroups.get(0).getChunkMetaDataList(); - assertEquals(1, chunks.size()); + Assert.assertEquals(1, chunks.size()); // da.s2 - assertEquals(chunks.get(0).getStartTime(), 2); - assertEquals(chunks.get(0).getEndTime(), 4); - assertEquals(chunks.get(0).getNumOfPoints(), 2); + Assert.assertEquals(chunks.get(0).getStartTime(), 2); + Assert.assertEquals(chunks.get(0).getEndTime(), 4); + Assert.assertEquals(chunks.get(0).getNumOfPoints(), 2); reader.close(); } @@ -208,7 +209,7 @@ public class RestorableTsFileIOWriterTest { public void testFlushAndGetMetadata() throws IOException { writer = new RestorableTsFileIOWriter(processorName, insertPath); - assertEquals(0, + Assert.assertEquals(0, writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0).size()); @@ -220,18 +221,18 @@ public class RestorableTsFileIOWriterTest { MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0); writer.flush(); - assertEquals(0, + Assert.assertEquals(0, writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0).size()); writer.appendMetadata(); - assertEquals(1, + Assert.assertEquals(1, writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0).size()); MemTableTestUtils.produceData(memTable, 200, 300, MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0); writer.appendMetadata(); - assertEquals(1, + Assert.assertEquals(1, writer.getMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0, MemTableTestUtils.dataType0).size()); @@ -259,7 +260,7 @@ public class RestorableTsFileIOWriterTest { try { Files.delete(Paths.get(insertPath)); } catch (IOException e) { - fail(e.getMessage()); + Assert.fail(e.getMessage()); } } }
