This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 463b99c [IOTDB-36]Enable recover data from a incomplete TsFile and
continue to write (#87)
463b99c is described below
commit 463b99c21a7262bc7e403b83b54e5582373db68a
Author: Xiangdong Huang <[email protected]>
AuthorDate: Sat Mar 9 14:37:26 2019 +0800
[IOTDB-36]Enable recover data from a incomplete TsFile and continue to
write (#87)
* add a restorable tsfile writer without restore file
---
.../bufferwrite/RestorableTsFileIOWriterTest.java | 1 +
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 4 +
.../iotdb/tsfile/read/TsFileSequenceReader.java | 50 +++-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 40 ++-
.../apache/iotdb/tsfile/write/record/TSRecord.java | 3 +-
.../iotdb/tsfile/write/schema/FileSchema.java | 2 +-
.../tsfile/write/writer/DefaultTsFileOutput.java | 6 +-
.../write/writer/NativeRestorableIOWriter.java | 206 ++++++++++++++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 28 ++
.../iotdb/tsfile/write/TsFileReadWriteTest.java | 16 +-
.../write/writer/NativeRestorableIOWriterTest.java | 301 +++++++++++++++++++++
11 files changed, 638 insertions(+), 19 deletions(-)
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 3cdc9f1..f2c393a 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -51,6 +51,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
public class RestorableTsFileIOWriterTest {
private RestorableTsFileIOWriter writer;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index 83852e0..b4189fa 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -113,6 +113,10 @@ public class TSFileDescriptor {
TSFileConfig.pageSizeInByte = Integer
.parseInt(properties
.getProperty("page_size_in_byte",
Integer.toString(TSFileConfig.pageSizeInByte)));
+ if (TSFileConfig.pageSizeInByte > TSFileConfig.groupSizeInByte) {
+ LOGGER.warn("page_size is greater than group size, will set it as the
same with group size");
+ TSFileConfig.pageSizeInByte = TSFileConfig.groupSizeInByte;
+ }
TSFileConfig.maxNumberOfPointsInPage = Integer.parseInt(
properties
.getProperty("max_number_of_points_in_page",
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index b66fe38..be14bf9 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -122,8 +122,23 @@ public class TsFileSequenceReader {
* this function does not modify the position of the file reader.
*/
public String readHeadMagic() throws IOException {
+ return readHeadMagic(false);
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ *
+ * @param movePosition whether move the position of the file reader after
reading the magic header
+ * to the end of the magic head string.
+ */
+ public String readHeadMagic(boolean movePosition) throws IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
- tsFileInput.read(magicStringBytes, 0);
+ if (movePosition) {
+ tsFileInput.position(0);
+ tsFileInput.read(magicStringBytes);
+ } else {
+ tsFileInput.read(magicStringBytes, 0);
+ }
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
@@ -267,6 +282,22 @@ public class TsFileSequenceReader {
return tsFileInput.position();
}
+ public void skipPageData(PageHeader header) throws IOException {
+ tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
+ }
+
+ /**
+ *
+ * @param header
+ * @param position
+ * @return
+ * @throws IOException
+ */
+ public long skipPageData(PageHeader header, long position) throws
IOException {
+ return position + header.getCompressedSize();
+ }
+
+
public ByteBuffer readPage(PageHeader header, CompressionType type) throws
IOException {
return readPage(header, type, -1);
}
@@ -293,7 +324,9 @@ public class TsFileSequenceReader {
*/
public byte readMarker() throws IOException {
markerBuffer.clear();
- ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
markerBuffer);
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
markerBuffer) == 0) {
+ throw new IOException("reach the end of the file.");
+ }
markerBuffer.flip();
return markerBuffer.get();
}
@@ -310,6 +343,10 @@ public class TsFileSequenceReader {
return this.file;
}
+ public long fileSize() throws IOException {
+ return tsFileInput.size();
+ }
+
/**
* read data from tsFileInput, from the current position (if position = -1),
or the given
* position. <br> if position = -1, the tsFileInput's position will be
changed to the current
@@ -324,9 +361,13 @@ public class TsFileSequenceReader {
private ByteBuffer readData(long position, int size) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
if (position == -1) {
- ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer);
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
buffer) != size) {
+ throw new IOException("reach the end of the data");
+ }
} else {
- ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer,
position, size);
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
buffer, position, size) != size) {
+ throw new IOException("reach the end of the data");
+ }
}
buffer.flip();
return buffer;
@@ -339,4 +380,5 @@ public class TsFileSequenceReader {
return ReadWriteIOUtils
.readAsPossible(tsFileInput.wrapAsFileChannel(), target, position,
length);
}
+
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 0a291db..497f67d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -22,12 +22,12 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.write.chunk.ChunkGroupWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -91,6 +91,15 @@ public class TsFileWriter {
/**
* init this TsFileWriter.
*
+ * @param fileWriter the io writer of this TsFile
+ */
+ public TsFileWriter(TsFileIOWriter fileWriter) throws IOException {
+ this(fileWriter, new FileSchema(),
TSFileDescriptor.getInstance().getConfig());
+ }
+
+ /**
+ * init this TsFileWriter.
+ *
* @param file the File to be written by this TsFileWriter
* @param schema the schema of this TsFile
*/
@@ -126,11 +135,22 @@ public class TsFileWriter {
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
- protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema,
TSFileConfig conf) {
+ protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema,
TSFileConfig conf)
+ throws IOException {
+ if (!fileWriter.canWrite()) {
+ throw new IOException(
+ "the given file Writer does not support writing any more. Maybe it
is an complete TsFile");
+ }
this.fileWriter = fileWriter;
this.schema = schema;
+ this.schema.registerMeasurements(fileWriter.getKnownSchema());
this.pageSize = TSFileConfig.pageSizeInByte;
this.chunkGroupSizeThreshold = TSFileConfig.groupSizeInByte;
+ if (this.pageSize >= chunkGroupSizeThreshold) {
+ LOG.warn(
+ "TsFile's page size {} is greater than chunk group size {}, please
enlarge the chunk group"
+ + " size or decrease page size. ", pageSize,
chunkGroupSizeThreshold);
+ }
}
/**
@@ -290,4 +310,20 @@ public class TsFileWriter {
flushAllChunkGroups();
fileWriter.endFile(this.schema);
}
+
+ /**
+ * this function is only for Test.
+ * @return
+ */
+ public TsFileIOWriter getIOWriter() {
+ return this.fileWriter;
+ }
+
+ /**
+ * this function is only for Test
+ * @throws IOException
+ */
+ public void flushForTest() throws IOException {
+ flushAllChunkGroups();
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
index b30a483..9ce98a7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/TSRecord.java
@@ -64,8 +64,9 @@ public class TSRecord {
*
* @param tuple data point to be added
*/
- public void addTuple(DataPoint tuple) {
+ public TSRecord addTuple(DataPoint tuple) {
this.dataPointList.add(tuple);
+ return this;
}
/**
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
index 349fa50..2aa4d4a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
@@ -104,7 +104,7 @@ public class FileSchema {
/**
* register all measurementSchemas in measurements.
*/
- private void registerMeasurements(Map<String, MeasurementSchema>
measurements) {
+ public void registerMeasurements(Map<String, MeasurementSchema>
measurements) {
measurements.forEach((id, md) -> registerMeasurement(md));
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
index 4ea8be8..0ab5e21 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
@@ -31,12 +31,16 @@ import java.nio.ByteBuffer;
*/
public class DefaultTsFileOutput implements TsFileOutput {
- FileOutputStream outputStream;
+ private FileOutputStream outputStream;
public DefaultTsFileOutput(File file) throws FileNotFoundException {
this.outputStream = new FileOutputStream(file);
}
+ public DefaultTsFileOutput(File file, boolean append) throws
FileNotFoundException {
+ this.outputStream = new FileOutputStream(file, append);
+ }
+
public DefaultTsFileOutput(FileOutputStream outputStream) {
this.outputStream = outputStream;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
new file mode 100644
index 0000000..a63ca84
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * a restorable tsfile which do not depend on a restore file.
+ */
+public class NativeRestorableIOWriter extends TsFileIOWriter {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(NativeRestorableIOWriter.class);
+
+ private long truncatedPosition = -1;
+ private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
+
+ long getTruncatedPosition() {
+ return truncatedPosition;
+ }
+
+ public NativeRestorableIOWriter(File file) throws IOException {
+ super();
+ long fileSize;
+ if (!file.exists()) {
+ this.out = new DefaultTsFileOutput(file, true);
+ startFile();
+ return;
+ } else {
+ fileSize = file.length();
+ this.out = new DefaultTsFileOutput(file, true);
+ }
+
+ //we need to read data to recover TsFileIOWriter.chunkGroupMetaDataList
+ //and remove broken data if exists.
+
+ ChunkMetaData currentChunk;
+ String measurementID;
+ TSDataType dataType;
+ long fileOffsetOfChunk;
+ long startTimeOfChunk = 0;
+ long endTimeOfChunk = 0;
+ long numOfPoints = 0;
+
+ ChunkGroupMetaData currentChunkGroup;
+ List<ChunkMetaData> chunks = null;
+ String deviceID;
+ long startOffsetOfChunkGroup = 0;
+ long endOffsetOfChunkGroup;
+ long versionOfChunkGroup = 0;
+ boolean haveReadAnUnverifiedGroupFooter = false;
+ boolean newGroup = true;
+
+ TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath(), false);
+ if (fileSize <= magicStringBytes.length) {
+ LOGGER.debug("{} only has magic header, does not worth to recover.",
file.getAbsolutePath());
+ reader.close();
+ this.out.truncate(0);
+ startFile();
+ truncatedPosition = magicStringBytes.length;
+ return;
+ }
+ String magic = reader.readHeadMagic(true);
+ if (!magic.equals(new String(magicStringBytes))) {
+ throw new IOException(String
+ .format("%s is not using TsFile format, and will be ignored...",
file.getAbsolutePath()));
+ }
+ if (reader.readTailMagic().equals(magic)) {
+ LOGGER.debug("{} is an complete TsFile.", file.getAbsolutePath());
+ canWrite = false;
+ reader.close();
+ out.close();
+ return;
+ }
+
+ // not a complete file, we will recover it...
+ truncatedPosition = magicStringBytes.length;
+ boolean goon = true;
+ byte marker;
+ try {
+ while (goon && (marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ //this is a chunk.
+ if (haveReadAnUnverifiedGroupFooter) {
+ //now we are sure that the last ChunkGroupFooter is complete.
+ haveReadAnUnverifiedGroupFooter = false;
+ truncatedPosition = reader.position() - 1;
+ newGroup = true;
+ }
+ if (newGroup) {
+ chunks = new ArrayList<>();
+ startOffsetOfChunkGroup = reader.position() - 1;
+ newGroup = false;
+ }
+ //if there is something wrong with a chunk, we will drop this part
of data
+ // (the whole ChunkGroup)
+ ChunkHeader header = reader.readChunkHeader();
+ measurementID = header.getMeasurementID();
+ knownSchemas.putIfAbsent(measurementID,
+ new MeasurementSchema(measurementID, header.getDataType(),
+ header.getEncodingType(), header.getCompressionType()));
+ dataType = header.getDataType();
+ fileOffsetOfChunk = reader.position() - 1;
+ if (header.getNumOfPages() > 0) {
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ numOfPoints += pageHeader.getNumOfValues();
+ startTimeOfChunk = pageHeader.getMinTimestamp();
+ endTimeOfChunk = pageHeader.getMaxTimestamp();
+ reader.skipPageData(pageHeader);
+ }
+ for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+ //a new Page
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ reader.skipPageData(pageHeader);
+ }
+ if (header.getNumOfPages() > 1) {
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ endTimeOfChunk = pageHeader.getMaxTimestamp();
+ reader.skipPageData(pageHeader);
+ }
+ currentChunk = new ChunkMetaData(measurementID, dataType,
fileOffsetOfChunk,
+ startTimeOfChunk, endTimeOfChunk);
+ currentChunk.setNumOfPoints(numOfPoints);
+ chunks.add(currentChunk);
+ numOfPoints = 0;
+ break;
+ case MetaMarker.CHUNK_GROUP_FOOTER:
+ //this is a chunk group
+ //if there is something wrong with the chunkGroup Footer, we will
drop this part of data
+ //because we can not guarantee the correction of the deviceId.
+ ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
+ deviceID = chunkGroupFooter.getDeviceID();
+ endOffsetOfChunkGroup = reader.position();
+ currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks,
startOffsetOfChunkGroup);
+ currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+ currentChunkGroup.setVersion(versionOfChunkGroup++);
+ chunkGroupMetaDataList.add(currentChunkGroup);
+ // though we have read the current ChunkMetaData from Disk, it may
be incomplete.
+ // because if the file only loses one byte, the
ChunkMetaData.deserialize() returns ok,
+ // while the last filed of the ChunkMetaData is incorrect.
+ // So, only reading the next MASK, can make sure that this
ChunkMetaData is complete.
+ haveReadAnUnverifiedGroupFooter = true;
+ break;
+
+ default:
+ // it is impossible that we read an incorrect data.
+ MetaMarker.handleUnexpectedMarker(marker);
+ goon = false;
+ }
+ }
+ //now we read the tail of the file, so we are sure that the last
ChunkGroupFooter is complete.
+ truncatedPosition = reader.position() - 1;
+ } catch (IOException e2) {
+ //if it is the end of the file, and we read an unverifiedGroupFooter, we
must remove this ChunkGroup
+ if (haveReadAnUnverifiedGroupFooter &&
!chunkGroupMetaDataList.isEmpty()) {
+ chunkGroupMetaDataList.remove(chunkGroupMetaDataList.size() - 1);
+ }
+ } finally {
+ //something wrong or all data is complete. We will discard current
FileMetadata
+ // so that we can continue to write data into this tsfile.
+ LOGGER.info("File {} has {} bytes, and will be truncated from {}.",
+ file.getAbsolutePath(), file.length(), truncatedPosition);
+ out.truncate(truncatedPosition);
+ reader.close();
+ }
+ }
+
+ @Override
+ public Map<String, MeasurementSchema> getKnownSchema() {
+ return knownSchemas;
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 02f9fee..c339f8a 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,7 @@ public class TsFileIOWriter {
protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new
ArrayList<>();
private ChunkGroupMetaData currentChunkGroupMetaData;
private ChunkMetaData currentChunkMetaData;
+ protected boolean canWrite = true;
/**
* empty construct function.
@@ -236,6 +238,7 @@ public class TsFileIOWriter {
// close file
out.close();
+ canWrite = false;
LOG.info("output stream is closed");
}
@@ -315,4 +318,29 @@ public class TsFileIOWriter {
return chunkGroupMetaDataList;
}
+ public boolean canWrite() {
+ return canWrite;
+ }
+
+ /**
+ * close the inputstream or file channel in force. This is just used for
Testing.
+ */
+ void forceClose() throws IOException {
+ out.close();
+ }
+
+ void writeSeparatorMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.SEPARATOR});
+ }
+ void writeChunkMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.CHUNK_HEADER});
+ }
+
+ /**
+ * @return all Schema that this ioWriter know. By default implementation
(TsFileIOWriter.class),
+ * it is empty
+ */
+ public Map<String, MeasurementSchema> getKnownSchema() {
+ return Collections.emptyMap();
+ }
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
index 3cb2a84..f5e76fc 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
@@ -50,28 +50,27 @@ public class TsFileReadWriteTest {
private final double delta = 0.0000001;
private String path = "read_write_rle.tsfile";
private File f;
- private TsFileWriter tsFileWriter;
@Before
public void setUp() throws Exception {
f = new File(path);
if (f.exists()) {
- f.delete();
+ assertTrue(f.delete());
}
- tsFileWriter = new TsFileWriter(f);
}
@After
public void tearDown() throws Exception {
f = new File(path);
if (f.exists()) {
- f.delete();
+ assertTrue(f.delete());;
}
}
@Test
public void intTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32,
TSEncoding.RLE));
@@ -110,6 +109,7 @@ public class TsFileReadWriteTest {
public void longTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64,
TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
@@ -147,6 +147,7 @@ public class TsFileReadWriteTest {
public void floatTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT,
TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
@@ -185,6 +186,7 @@ public class TsFileReadWriteTest {
public void doubleTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.DOUBLE,
TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
@@ -220,13 +222,7 @@ public class TsFileReadWriteTest {
@Test
public void readEmptyMeasurementTest() throws IOException,
WriteProcessException {
- String path = "test.tsfile";
- File f = new File(path);
- if (f.exists()) {
- assertTrue(f.delete());
- }
TsFileWriter tsFileWriter = new TsFileWriter(f);
-
// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT,
TSEncoding.RLE));
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
new file mode 100644
index 0000000..370c526
--- /dev/null
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NativeRestorableIOWriterTest {
+
+ private static final String FILE_NAME = "test.ts";
+
+ @Test
+ public void testOnlyHeadMagic() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+
+ }
+
+ @Test
+ public void testOnlyFirstMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ //we have to flush using inner API.
+ writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
+ writer.getIOWriter().forceClose();
+ assertEquals(TsFileIOWriter.magicStringBytes.length + 1, file.length());
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneIncompleteChunkHeader() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+
+ ChunkHeader header = new ChunkHeader("s1", 100, TSDataType.FLOAT,
CompressionType.SNAPPY,
+ TSEncoding.PLAIN, 5);
+ ByteBuffer buffer = ByteBuffer.allocate(header.getSerializedSize());
+ header.serializeTo(buffer);
+ buffer.flip();
+ byte[] data = new byte[3];
+ buffer.get(data, 0, 3);
+ writer.getIOWriter().out.write(data);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkHeader() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.getIOWriter()
+ .startFlushChunk(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.PLAIN),
+ CompressionType.SNAPPY, TSDataType.FLOAT,
+ TSEncoding.PLAIN, new FloatStatistics(), 100, 50, 100, 10);
+ writer.getIOWriter().forceClose();
+
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ long pos = writer.getIOWriter().getPos();
+ //let's delete one byte.
+ writer.getIOWriter().out.truncate(pos - 1);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testOnlyOneChunkGroup() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkGroupAndOneMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeChunkMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertNotEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testTwoChunkGroupAndMore() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testNoSeperatorMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ index = reader.readFileMetadata().getDeviceMap().get("d2");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testHavingSomeFileMetadata() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ index = reader.readFileMetadata().getDeviceMap().get("d2");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOpenCompleteFile() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.close();
+
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ assertFalse(rWriter.canWrite());
+ assertTrue(file.delete());
+ }
+}
\ No newline at end of file