This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature_read_data_from_unclosed_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to
refs/heads/feature_read_data_from_unclosed_tsfile by this push:
new 374fe55 restoredIOWriter pass UTs
374fe55 is described below
commit 374fe55dc32a3e5905f50b5fd2317c7b99950996
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Mar 6 00:17:59 2019 +0800
restoredIOWriter pass UTs
---
.../iotdb/tsfile/read/TsFileSequenceReader.java | 24 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 10 +-
.../iotdb/tsfile/write/schema/FileSchema.java | 2 +-
...IOWriter.java => NativeRestorableIOWriter.java} | 123 +++++----
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 25 +-
.../write/writer/NativeRestorableIOWriterTest.java | 282 +++++++++++++++++++++
.../writer/NativeRestorableTsFileIOWriterTest.java | 260 -------------------
7 files changed, 399 insertions(+), 327 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 4460098..be14bf9 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -122,8 +122,23 @@ public class TsFileSequenceReader {
* this function does not modify the position of the file reader.
*/
public String readHeadMagic() throws IOException {
+ return readHeadMagic(false);
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ *
+ * @param movePosition whether move the position of the file reader after
reading the magic header
+ * to the end of the magic head string.
+ */
+ public String readHeadMagic(boolean movePosition) throws IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
- tsFileInput.read(magicStringBytes, 0);
+ if (movePosition) {
+ tsFileInput.position(0);
+ tsFileInput.read(magicStringBytes);
+ } else {
+ tsFileInput.read(magicStringBytes, 0);
+ }
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
@@ -366,11 +381,4 @@ public class TsFileSequenceReader {
.readAsPossible(tsFileInput.wrapAsFileChannel(), target, position,
length);
}
- /**
- *
- * @param pos the position of the file you want to move to
- */
- public void setPosition(long pos) throws IOException {
- tsFileInput.position(pos);
- }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index cb7194c..497f67d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -93,7 +93,7 @@ public class TsFileWriter {
*
* @param fileWriter the io writer of this TsFile
*/
- public TsFileWriter(TsFileIOWriter fileWriter) {
+ public TsFileWriter(TsFileIOWriter fileWriter) throws IOException {
this(fileWriter, new FileSchema(),
TSFileDescriptor.getInstance().getConfig());
}
@@ -135,9 +135,15 @@ public class TsFileWriter {
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
- protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema,
TSFileConfig conf) {
+ protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema,
TSFileConfig conf)
+ throws IOException {
+ if (!fileWriter.canWrite()) {
+ throw new IOException(
+ "the given file Writer does not support writing any more. Maybe it
is an complete TsFile");
+ }
this.fileWriter = fileWriter;
this.schema = schema;
+ this.schema.registerMeasurements(fileWriter.getKnownSchema());
this.pageSize = TSFileConfig.pageSizeInByte;
this.chunkGroupSizeThreshold = TSFileConfig.groupSizeInByte;
if (this.pageSize >= chunkGroupSizeThreshold) {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
index 349fa50..2aa4d4a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/FileSchema.java
@@ -104,7 +104,7 @@ public class FileSchema {
/**
* register all measurementSchemas in measurements.
*/
- private void registerMeasurements(Map<String, MeasurementSchema>
measurements) {
+ public void registerMeasurements(Map<String, MeasurementSchema>
measurements) {
measurements.forEach((id, md) -> registerMeasurement(md));
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
similarity index 52%
rename from
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java
rename to
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index 35842c3..5eb3014 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -18,7 +18,9 @@ package org.apache.iotdb.tsfile.write.writer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -27,24 +29,26 @@ import
org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a restorable tsfile which do not depend on a restore file.
*/
-public class NativeRestorableTsFileIOWriter extends TsFileIOWriter {
+public class NativeRestorableIOWriter extends TsFileIOWriter {
private static final Logger LOGGER = LoggerFactory
- .getLogger(NativeRestorableTsFileIOWriter.class);
+ .getLogger(NativeRestorableIOWriter.class);
private long truncatedPosition = -1;
+ private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
public long getTruncatedPosition() {
return truncatedPosition;
}
- public NativeRestorableTsFileIOWriter(File file) throws IOException {
+ public NativeRestorableIOWriter(File file) throws IOException {
super();
long fileSize;
if (!file.exists()) {
@@ -58,14 +62,14 @@ public class NativeRestorableTsFileIOWriter extends
TsFileIOWriter {
//we need to read data to recover TsFileIOWriter.chunkGroupMetaDataList
//and remove broken data if exists.
- List<ChunkGroupMetaData> metadatas = this.getChunkGroupMetaDatas();
ChunkMetaData currentChunk;
String measurementID;
TSDataType dataType;
long fileOffsetOfChunk;
long startTimeOfChunk = 0;
- long endTimeOfChunk;
+ long endTimeOfChunk = 0;
+ long numOfPoints = 0;
ChunkGroupMetaData currentChunkGroup;
List<ChunkMetaData> chunks = null;
@@ -73,6 +77,7 @@ public class NativeRestorableTsFileIOWriter extends
TsFileIOWriter {
long startOffsetOfChunkGroup = 0;
long endOffsetOfChunkGroup;
long versionOfChunkGroup = 0;
+ boolean haveReadAnUnverifiedGroupFooter = false;
boolean newGroup = true;
TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath(), false);
@@ -84,15 +89,14 @@ public class NativeRestorableTsFileIOWriter extends
TsFileIOWriter {
truncatedPosition = magicStringBytes.length;
return;
}
- if (reader.readTailMagic().equals(reader.readHeadMagic())) {
+ if (reader.readTailMagic().equals(reader.readHeadMagic(true))) {
LOGGER.debug("{} is an complete TsFile.", file.getAbsolutePath());
- complete = true;
+ canWrite = false;
return;
}
// not a complete file, we will recover it...
truncatedPosition = magicStringBytes.length;
- reader.setPosition(truncatedPosition);
boolean goon = true;
byte marker;
try {
@@ -100,6 +104,12 @@ public class NativeRestorableTsFileIOWriter extends
TsFileIOWriter {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
//this is a chunk.
+ if (haveReadAnUnverifiedGroupFooter) {
+ //now we are sure that the last ChunkGroupFooter is complete.
+ haveReadAnUnverifiedGroupFooter = false;
+ truncatedPosition = reader.position() - 1;
+ newGroup = true;
+ }
if (newGroup) {
chunks = new ArrayList<>();
startOffsetOfChunkGroup = reader.position() - 1;
@@ -107,67 +117,78 @@ public class NativeRestorableTsFileIOWriter extends
TsFileIOWriter {
}
//if there is something wrong with a chunk, we will drop this part
of data
// (the whole ChunkGroup)
- try {
- ChunkHeader header = reader.readChunkHeader();
- measurementID = header.getMeasurementID();
- dataType = header.getDataType();
- fileOffsetOfChunk = reader.position() - 1;
- if (header.getNumOfPages() > 0) {
- PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
- startTimeOfChunk = pageHeader.getMinTimestamp();
- reader.skipPageData(pageHeader);
- }
- for (int j = 1; j < header.getNumOfPages() - 1; j++) {
- //a new Page
- PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
- reader.skipPageData(pageHeader);
- }
- if (header.getNumOfPages() > 1) {
- PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
- endTimeOfChunk = pageHeader.getMinTimestamp();
- reader.skipPageData(pageHeader);
- } else {
- endTimeOfChunk = startTimeOfChunk;
- }
- currentChunk = new ChunkMetaData(measurementID, dataType,
fileOffsetOfChunk,
- startTimeOfChunk, endTimeOfChunk);
- chunks.add(currentChunk);
- } catch (IOException e) {
- goon = false;
+ ChunkHeader header = reader.readChunkHeader();
+ measurementID = header.getMeasurementID();
+ knownSchemas.putIfAbsent(measurementID,
+ new MeasurementSchema(measurementID, header.getDataType(),
+ header.getEncodingType(), header.getCompressionType()));
+ dataType = header.getDataType();
+ fileOffsetOfChunk = reader.position() - 1;
+ if (header.getNumOfPages() > 0) {
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ numOfPoints += pageHeader.getNumOfValues();
+ startTimeOfChunk = pageHeader.getMinTimestamp();
+ endTimeOfChunk = pageHeader.getMaxTimestamp();
+ reader.skipPageData(pageHeader);
+ }
+ for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+ //a new Page
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ reader.skipPageData(pageHeader);
}
+ if (header.getNumOfPages() > 1) {
+ PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
+ endTimeOfChunk = pageHeader.getMaxTimestamp();
+ reader.skipPageData(pageHeader);
+ }
+ currentChunk = new ChunkMetaData(measurementID, dataType,
fileOffsetOfChunk,
+ startTimeOfChunk, endTimeOfChunk);
+ currentChunk.setNumOfPoints(numOfPoints);
+ chunks.add(currentChunk);
+ numOfPoints = 0;
break;
case MetaMarker.CHUNK_GROUP_FOOTER:
//this is a chunk group
//if there is something wrong with the chunkGroup Footer, we will
drop this part of data
//because we can not guarantee the correction of the deviceId.
- try {
- ChunkGroupFooter chunkGroupFooter =
reader.readChunkGroupFooter();
- deviceID = chunkGroupFooter.getDeviceID();
- endOffsetOfChunkGroup = reader.position();
- currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks,
startOffsetOfChunkGroup);
-
currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
- currentChunkGroup.setVersion(versionOfChunkGroup++);//TODO is
this OK?
- metadatas.add(currentChunkGroup);
- newGroup = true;
- //we get a complete chunk group now.
- truncatedPosition = getPos();
- } catch (IOException e) {
- goon = false;
- }
+ ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
+ deviceID = chunkGroupFooter.getDeviceID();
+ endOffsetOfChunkGroup = reader.position();
+ currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks,
startOffsetOfChunkGroup);
+ currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+ currentChunkGroup.setVersion(versionOfChunkGroup++);
+ chunkGroupMetaDataList.add(currentChunkGroup);
+ // though we have read the current ChunkMetaData from Disk, it may
be incomplete.
+ // because if the file only loses one byte, the
ChunkMetaData.deserialize() returns ok,
+ // while the last filed of the ChunkMetaData is incorrect.
+ // So, only reading the next MASK, can make sure that this
ChunkMetaData is complete.
+ haveReadAnUnverifiedGroupFooter = true;
break;
+
default:
- //no data else
+ // it is impossible taht we read an incorrect data.
+ MetaMarker.handleUnexpectedMarker(marker);
goon = false;
}
}
+ //now we read the tail of the file, so we are sure that the last
ChunkGroupFooter is complete.
+ truncatedPosition = reader.position() - 1;
} catch (IOException e2) {
+ //if it is the end of the file, and we read an unverifiedGroupFooter, we
must remove this ChunkGroup
+ if (haveReadAnUnverifiedGroupFooter &&
!chunkGroupMetaDataList.isEmpty()) {
+ chunkGroupMetaDataList.remove(chunkGroupMetaDataList.size() - 1);
+ }
} finally {
//something wrong or all data is complete. We will discard current
FileMetadata
+ // so that we can continue to write data into this tsfile.
LOGGER.info("File {} has {} bytes, and will be truncated from {}.",
file.getAbsolutePath(), file.length(), truncatedPosition);
out.truncate(truncatedPosition);
}
}
-
+ @Override
+ public Map<String, MeasurementSchema> getKnownSchema() {
+ return knownSchemas;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 83616fc..c339f8a 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,7 +68,7 @@ public class TsFileIOWriter {
protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new
ArrayList<>();
private ChunkGroupMetaData currentChunkGroupMetaData;
private ChunkMetaData currentChunkMetaData;
- protected boolean complete = false;
+ protected boolean canWrite = true;
/**
* empty construct function.
@@ -237,7 +238,7 @@ public class TsFileIOWriter {
// close file
out.close();
- complete = true;
+ canWrite = false;
LOG.info("output stream is closed");
}
@@ -317,15 +318,29 @@ public class TsFileIOWriter {
return chunkGroupMetaDataList;
}
- public boolean isComplete() {
- return complete;
+ public boolean canWrite() {
+ return canWrite;
}
/**
* close the inputstream or file channel in force. This is just used for
Testing.
*/
- public void forceClose() throws IOException {
+ void forceClose() throws IOException {
out.close();
}
+ void writeSeparatorMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.SEPARATOR});
+ }
+ void writeChunkMaskForTest() throws IOException {
+ out.write(new byte[]{MetaMarker.CHUNK_HEADER});
+ }
+
+ /**
+ * @return all Schema that this ioWriter know. By default implementation
(TsFileIOWriter.class),
+ * it is empty
+ */
+ public Map<String, MeasurementSchema> getKnownSchema() {
+ return Collections.emptyMap();
+ }
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
new file mode 100644
index 0000000..b80c038
--- /dev/null
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
@@ -0,0 +1,282 @@
+package org.apache.iotdb.tsfile.write.writer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NativeRestorableIOWriterTest {
+
+ private static final String FILE_NAME = "test.ts";
+
+ @Test
+ public void testOnlyHeadMagic() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+
+ }
+
+ @Test
+ public void testOnlyFirstMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ //we have to flush using inner API.
+ writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
+ writer.getIOWriter().forceClose();
+ assertEquals(TsFileIOWriter.magicStringBytes.length + 1, file.length());
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneIncompleteChunkHeader() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+
+ ChunkHeader header = new ChunkHeader("s1", 100, TSDataType.FLOAT,
CompressionType.SNAPPY,
+ TSEncoding.PLAIN, 5);
+ ByteBuffer buffer = ByteBuffer.allocate(header.getSerializedSize());
+ header.serializeTo(buffer);
+ buffer.flip();
+ byte[] data = new byte[3];
+ buffer.get(data, 0, 3);
+ writer.getIOWriter().out.write(data);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkHeader() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.getIOWriter()
+ .startFlushChunk(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.PLAIN),
+ CompressionType.SNAPPY, TSDataType.FLOAT,
+ TSEncoding.PLAIN, new FloatStatistics(), 100, 50, 100, 10);
+ writer.getIOWriter().forceClose();
+
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ long pos = writer.getIOWriter().getPos();
+ //let's delete one byte.
+ writer.getIOWriter().out.truncate(pos - 1);
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testOnlyOneChunkGroup() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOnlyOneChunkGroupAndOneMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeChunkMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ assertNotEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testTwoChunkGroupAndMore() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testNoSeperatorMask() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ index = reader.readFileMetadata().getDeviceMap().get("d2");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+
+ @Test
+ public void testHavingSomeFileMetadata() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+
+ writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().writeSeparatorMaskForTest();
+ writer.getIOWriter().forceClose();
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ writer = new TsFileWriter(rWriter);
+ writer.close();
+ TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
+ TsDeviceMetadataIndex index =
reader.readFileMetadata().getDeviceMap().get("d1");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ index = reader.readFileMetadata().getDeviceMap().get("d2");
+ assertEquals(1,
reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
+ reader.close();
+ assertTrue(file.delete());
+ }
+
+ @Test
+ public void testOpenCompleteFile() throws Exception {
+ File file = new File(FILE_NAME);
+ TsFileWriter writer = new TsFileWriter(file);
+ writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.close();
+
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
+ assertFalse(rWriter.canWrite());
+ assertTrue(file.delete());
+ }
+}
\ No newline at end of file
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriterTest.java
deleted file mode 100644
index 10bcda0..0000000
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableTsFileIOWriterTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-package org.apache.iotdb.tsfile.write.writer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.write.TsFileWriter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class NativeRestorableTsFileIOWriterTest {
- private int bkChunkGroupSize;
-
- private static final String FILE_NAME = "test.ts";
- @Before
- public void setUp() throws Exception {
- bkChunkGroupSize = TSFileConfig.groupSizeInByte;
- TSFileConfig.groupSizeInByte = 20;
- TSFileConfig.pageSizeInByte = 5;
- }
-
- @After
- public void tearDown() throws Exception {
- TSFileConfig.groupSizeInByte = bkChunkGroupSize;
- }
-
- @Test
- public void testOnlyHeadMagic() throws Exception {
- File file = new File(FILE_NAME);
- TsFileWriter writer = new TsFileWriter(file);
- writer.getIOWriter().forceClose();
- NativeRestorableTsFileIOWriter rWriter = new
NativeRestorableTsFileIOWriter(file);
- writer = new TsFileWriter(rWriter);
- writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
- printFile(FILE_NAME);
-
- assertTrue(file.delete());
-
- }
-
- @Test
- public void testOnlyFirstMask() throws Exception {
- File file = new File(FILE_NAME);
- TsFileWriter writer = new TsFileWriter(file);
- //we have to flush using inner API.
- writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
- writer.getIOWriter().forceClose();
- assertEquals(TsFileIOWriter.magicStringBytes.length + 1, file.length());
- NativeRestorableTsFileIOWriter rWriter = new
NativeRestorableTsFileIOWriter(file);
- writer = new TsFileWriter(rWriter);
- writer.close();
- printFile(FILE_NAME);
- assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
- assertTrue(file.delete());
- }
-
- @Test
- public void testOnlyOneIncompleteChunkHeader() throws Exception {
- File file = new File(FILE_NAME);
- TsFileWriter writer = new TsFileWriter(file);
-
- ChunkHeader header = new ChunkHeader("s1", 100, TSDataType.FLOAT,
CompressionType.SNAPPY,
- TSEncoding.PLAIN, 5);
- ByteBuffer buffer = ByteBuffer.allocate(header.getSerializedSize());
- header.serializeTo(buffer);
- buffer.flip();
- byte[] data = new byte[3];
- buffer.get(data, 0, 3);
- writer.getIOWriter().out.write(data);
- writer.getIOWriter().forceClose();
- NativeRestorableTsFileIOWriter rWriter = new
NativeRestorableTsFileIOWriter(file);
- writer = new TsFileWriter(rWriter);
- writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
- assertTrue(file.delete());
- }
-
- @Test
- public void testOnlyOneChunkHeader() throws Exception {
- File file = new File(FILE_NAME);
- TsFileWriter writer = new TsFileWriter(file);
- writer.getIOWriter()
- .startFlushChunk(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.PLAIN),
- CompressionType.SNAPPY, TSDataType.FLOAT,
- TSEncoding.PLAIN, new FloatStatistics(), 100, 50, 100, 10);
- writer.getIOWriter().forceClose();
-
- NativeRestorableTsFileIOWriter rWriter = new
NativeRestorableTsFileIOWriter(file);
- writer = new TsFileWriter(rWriter);
- writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
- assertTrue(file.delete());
- }
-
- @Test
- public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
-
- }
-
- @Test
- public void testOnlyOneCompleteChunk() throws Exception {
-
- }
-
- @Test
- public void testOnlyOneChunkAndMore() throws Exception {
-
- }
-
- @Test
- public void testOnlyOneChunkGroup() throws Exception {
- File file = new File(FILE_NAME);
- TsFileWriter writer = new TsFileWriter(file);
- writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT,
TSEncoding.RLE));
- writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT,
TSEncoding.RLE));
- writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
- .addTuple(new FloatDataPoint("s2", 4)));
- writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
- .addTuple(new FloatDataPoint("s2", 4)));
- writer.flushForTest();
-
- NativeRestorableTsFileIOWriter rWriter = new
NativeRestorableTsFileIOWriter(file);
- writer = new TsFileWriter(rWriter);
- writer.close();
- assertEquals(TsFileIOWriter.magicStringBytes.length,
rWriter.getTruncatedPosition());
- assertTrue(file.delete());
- }
-
- @Test
- public void testOneChunkGroupAndMore() throws Exception {
-
- }
-
- @Test
- public void testNoSeperatorMask() throws Exception {
-
- }
-
- @Test
- public void testHavingSeperatorMask() throws Exception {
-
- }
-
- @Test
- public void testHavingSomeFileMetadata() throws Exception {
-
- }
-
- @Test
- public void testOpenCompleteFile() throws Exception {
-
- }
-
- public void printFile(String fileName) throws IOException {
- TsFileSequenceReader reader = new TsFileSequenceReader(fileName);
- System.out.println("file length: " + new File(fileName).length());
- System.out.println("file magic head: " + reader.readHeadMagic());
- System.out.println("file magic tail: " + reader.readTailMagic());
- System.out.println("Level 1 metadata position: " +
reader.getFileMetadataPos());
- System.out.println("Level 1 metadata size: " +
reader.getFileMetadataPos());
- TsFileMetaData metaData = reader.readFileMetadata();
- // Sequential reading of one ChunkGroup now follows this order:
- // first SeriesChunks (headers and data) in one ChunkGroup, then the
CHUNK_GROUP_FOOTER
- // Because we do not know how many chunks a ChunkGroup may have, we should
read one byte (the marker) ahead and
- // judge accordingly.
- System.out.println("[Chunk Group]");
- System.out.println("position: " + reader.position());
- byte marker;
- while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
- switch (marker) {
- case MetaMarker.CHUNK_HEADER:
- System.out.println("\t[Chunk]");
- System.out.println("\tposition: " + reader.position());
- ChunkHeader header = reader.readChunkHeader();
- System.out.println("\tMeasurement: " + header.getMeasurementID());
- Decoder defaultTimeDecoder = Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().timeSeriesEncoder),
- TSDataType.INT64);
- Decoder valueDecoder = Decoder
- .getDecoderByType(header.getEncodingType(),
header.getDataType());
- for (int j = 0; j < header.getNumOfPages(); j++) {
- System.out.println("\t\t[Page]\n \t\tPage head position: " +
reader.position());
- PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
- System.out.println("\t\tPage data position: " + reader.position());
- System.out.println("\t\tpoints in the page: " +
pageHeader.getNumOfValues());
- ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
- System.out
- .println("\t\tUncompressed page data size: " +
pageHeader.getUncompressedSize());
- PageReader reader1 = new PageReader(pageData,
header.getDataType(), valueDecoder,
- defaultTimeDecoder);
- while (reader1.hasNextBatch()) {
- BatchData batchData = reader1.nextBatch();
- while (batchData.hasNext()) {
- System.out.println(
- "\t\t\ttime, value: " + batchData.currentTime() + ", " +
batchData
- .currentValue());
- batchData.next();
- }
- }
- }
- break;
- case MetaMarker.CHUNK_GROUP_FOOTER:
- System.out.println("Chunk Group Footer position: " +
reader.position());
- ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
- System.out.println("device: " + chunkGroupFooter.getDeviceID());
- break;
- default:
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
- System.out.println("[Metadata]");
- List<TsDeviceMetadataIndex> deviceMetadataIndexList =
metaData.getDeviceMap().values().stream()
- .sorted((x, y) -> (int) (x.getOffset() -
y.getOffset())).collect(Collectors.toList());
- for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
- List<ChunkGroupMetaData> chunkGroupMetaDataList =
deviceMetadata.getChunkGroupMetaDataList();
- for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
- System.out.println(String
- .format("\t[Device]File Offset: %d, Device %s, Number of Chunk
Groups %d",
- index.getOffset(), chunkGroupMetaData.getDeviceID(),
- chunkGroupMetaDataList.size()));
-
- for (ChunkMetaData chunkMetadata :
chunkGroupMetaData.getChunkMetaDataList()) {
- System.out.println("\t\tMeasurement:" +
chunkMetadata.getMeasurementUid());
- System.out.println("\t\tFile offset:" +
chunkMetadata.getOffsetOfChunkHeader());
- }
- }
- }
- reader.close();
- }
-
-}
\ No newline at end of file