This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bd3a798 [IOTDB-63] Use TsFileInput instead of FileChannel as the
parameter passed to methods (#110)
bd3a798 is described below
commit bd3a7988606760ad47e9000ab83a70ea3b55c106
Author: Da Rui Lei <[email protected]>
AuthorDate: Tue Mar 26 17:25:27 2019 +0800
[IOTDB-63] Use TsFileInput instead of FileChannel as the parameter passed
to methods (#110)
* change channel to TsFileInput for the future use of tsfile-spark-connector
---
.../iotdb/tsfile/file/footer/ChunkGroupFooter.java | 10 +++---
.../iotdb/tsfile/file/header/ChunkHeader.java | 12 +++----
.../iotdb/tsfile/file/header/PageHeader.java | 14 ++++----
.../file/metadata/statistics/BinaryStatistics.java | 6 ++--
.../file/metadata/statistics/Statistics.java | 16 ++++-----
.../iotdb/tsfile/read/TsFileSequenceReader.java | 39 ++++++++++++++++++----
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 34 +++++--------------
.../iotdb/tsfile/file/header/PageHeaderTest.java | 7 ++--
8 files changed, 72 insertions(+), 66 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java
index 4cdcf85..a69468e 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/footer/ChunkGroupFooter.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class ChunkGroupFooter {
@@ -84,11 +84,11 @@ public class ChunkGroupFooter {
}
/**
- * deserialize from FileChannel.
+ * deserialize from TsFileInput.
*
* @param markerRead Whether the marker of the CHUNK_GROUP_FOOTER is read
ahead.
*/
- public static ChunkGroupFooter deserializeFrom(FileChannel channel, long
offset,
+ public static ChunkGroupFooter deserializeFrom(TsFileInput input, long
offset,
boolean markerRead)
throws IOException {
long offsetVar = offset;
@@ -96,12 +96,12 @@ public class ChunkGroupFooter {
offsetVar++;
}
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
- channel.read(buffer, offsetVar);
+ input.read(buffer, offsetVar);
buffer.flip();
int size = buffer.getInt();
offsetVar += Integer.BYTES;
buffer = ByteBuffer.allocate(getSerializedSize(size));
- ReadWriteIOUtils.readAsPossible(channel, offsetVar, buffer);
+ ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
buffer.flip();
String deviceID = ReadWriteIOUtils.readStringWithoutLength(buffer, size);
long dataSize = ReadWriteIOUtils.readLong(buffer);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 2c27256..8af0fc3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -23,11 +23,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class ChunkHeader {
@@ -124,27 +124,27 @@ public class ChunkHeader {
}
/**
- * deserialize from FileChannel.
+ * deserialize from TsFileInput.
*
- * @param channel FileChannel
+ * @param input TsFileInput
* @param offset offset
* @param markerRead read marker (boolean type)
* @return CHUNK_HEADER object
* @throws IOException IOException
*/
- public static ChunkHeader deserializeFrom(FileChannel channel, long offset,
boolean markerRead)
+ public static ChunkHeader deserializeFrom(TsFileInput input, long offset,
boolean markerRead)
throws IOException {
long offsetVar = offset;
if (!markerRead) {
offsetVar++;
}
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
- channel.read(buffer, offsetVar);
+ input.read(buffer, offsetVar);
buffer.flip();
int size = buffer.getInt();
offsetVar += Integer.BYTES;
buffer = ByteBuffer.allocate(getSerializedSize(size));
- ReadWriteIOUtils.readAsPossible(channel, offsetVar, buffer);
+ ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
buffer.flip();
String measurementID = ReadWriteIOUtils.readStringWithoutLength(buffer,
size);
return deserializePartFrom(measurementID, buffer);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 101a1ac..b516f54 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -23,10 +23,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.NoStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class PageHeader {
@@ -90,16 +90,16 @@ public class PageHeader {
}
/**
- * deserialize from FileChannel.
+ * deserialize from TsFileInput.
*
* @param dataType data type
- * @param channel FileChannel
+ * @param input TsFileInput
* @param offset offset
* @param markerRead read marker (boolean type)
* @return CHUNK_HEADER object
* @throws IOException IOException
*/
- public static PageHeader deserializeFrom(TSDataType dataType, FileChannel
channel, long offset,
+ public static PageHeader deserializeFrom(TSDataType dataType, TsFileInput
input, long offset,
boolean markerRead)
throws IOException {
long offsetVar = offset;
@@ -110,16 +110,16 @@ public class PageHeader {
if (dataType == TSDataType.TEXT) {
int sizeWithoutStatistics = calculatePageHeaderSizeWithoutStatistics();
ByteBuffer bufferWithoutStatistics =
ByteBuffer.allocate(sizeWithoutStatistics);
- ReadWriteIOUtils.readAsPossible(channel, offsetVar,
bufferWithoutStatistics);
+ ReadWriteIOUtils.readAsPossible(input, offsetVar,
bufferWithoutStatistics);
bufferWithoutStatistics.flip();
offsetVar += sizeWithoutStatistics;
- Statistics statistics = Statistics.deserialize(channel, offsetVar,
dataType);
+ Statistics statistics = Statistics.deserialize(input, offsetVar,
dataType);
return deserializePartFrom(statistics, bufferWithoutStatistics);
} else {
int size = calculatePageHeaderSize(dataType);
ByteBuffer buffer = ByteBuffer.allocate(size);
- ReadWriteIOUtils.readAsPossible(channel, offsetVar, buffer);
+ ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
buffer.flip();
return deserializeFrom(buffer, dataType);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index 51ea983..c06422c 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.tsfile.file.metadata.statistics;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -208,10 +208,10 @@ public class BinaryStatistics extends Statistics<Binary> {
}
@Override
- protected void fill(FileChannel channel, long offset) throws IOException {
+ protected void fill(TsFileInput input, long offset) throws IOException {
int size = getSerializedSize();
ByteBuffer buffer = ByteBuffer.allocate(size);
- ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+ ReadWriteIOUtils.readAsPossible(input, offset, buffer);
buffer.flip();
fill(buffer);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index f4bc17a..27e76ee 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -23,13 +23,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -91,9 +87,9 @@ public abstract class Statistics<T> {
return statistics;
}
- public static Statistics deserialize(FileChannel channel, long offset,
TSDataType dataType) throws IOException {
+ public static Statistics deserialize(TsFileInput input, long offset,
TSDataType dataType) throws IOException {
Statistics statistics = getStatsByType(dataType);
- statistics.fill(channel, offset);
+ statistics.fill(input, offset);
return statistics;
}
@@ -149,7 +145,7 @@ public abstract class Statistics<T> {
String thisClass = this.getClass().toString();
String statsClass = stats.getClass().toString();
LOG.warn("tsfile-file Statistics classes mismatched,no merge: {} v.s.
{}",
- thisClass, statsClass);
+ thisClass, statsClass);
throw new StatisticsClassException(this.getClass(), stats.getClass());
}
@@ -216,10 +212,10 @@ public abstract class Statistics<T> {
abstract void fill(ByteBuffer byteBuffer) throws IOException;
- protected void fill(FileChannel channel, long offset) throws IOException {
+ protected void fill(TsFileInput input, long offset) throws IOException {
int size = getSerializedSize();
ByteBuffer buffer = ByteBuffer.allocate(size);
- ReadWriteIOUtils.readAsPossible(channel, offset, buffer);
+ ReadWriteIOUtils.readAsPossible(input, offset, buffer);
buffer.flip();
fill(buffer);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 7107c8b..deb2e4f 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -94,6 +94,31 @@ public class TsFileSequenceReader implements AutoCloseable{
}
/**
+ * Create a file reader of the given file. The reader will read the tail of
the file to get the
+ * file metadata size.Then the reader will skip the first
TSFileConfig.MAGIC_STRING.length() bytes
+ * of the file for preparing reading real data.
+ *
+ * @param input given input
+ */
+ public TsFileSequenceReader(TsFileInput input) throws IOException {
+ this(input, true);
+ }
+
+ /**
+ * construct function for TsFileSequenceReader.
+ *
+ * @param input -given input
+ * @param loadMetadataSize -load meta data size
+ */
+ public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize)
+ throws IOException {
+ this.tsFileInput = input;
+ if (loadMetadataSize) { // NOTE no autoRepair here
+ loadMetadataSize();
+ }
+ }
+
+ /**
* construct function for TsFileSequenceReader.
*
* @param input the input of a tsfile. The current position should be a
markder and then a chunk
@@ -226,7 +251,7 @@ public class TsFileSequenceReader implements AutoCloseable{
*/
public ChunkGroupFooter readChunkGroupFooter(long position, boolean
markerRead)
throws IOException {
- return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsFileChannel(),
position, markerRead);
+ return ChunkGroupFooter.deserializeFrom(tsFileInput, position, markerRead);
}
/**
@@ -259,7 +284,7 @@ public class TsFileSequenceReader implements AutoCloseable{
* @param markerRead true if the offset does not contains the marker ,
otherwise false
*/
private ChunkHeader readChunkHeader(long position, boolean markerRead)
throws IOException {
- return ChunkHeader.deserializeFrom(tsFileInput.wrapAsFileChannel(),
position, markerRead);
+ return ChunkHeader.deserializeFrom(tsFileInput, position, markerRead);
}
/**
@@ -323,7 +348,7 @@ public class TsFileSequenceReader implements AutoCloseable{
* @param markerRead true if the offset does not contains the marker ,
otherwise false
*/
private PageHeader readPageHeader(TSDataType dataType, long position,
boolean markerRead) throws IOException {
- return PageHeader.deserializeFrom(dataType,
tsFileInput.wrapAsFileChannel(), position, markerRead);
+ return PageHeader.deserializeFrom(dataType, tsFileInput, position,
markerRead);
}
public long position() throws IOException {
@@ -372,7 +397,7 @@ public class TsFileSequenceReader implements AutoCloseable{
*/
public byte readMarker() throws IOException {
markerBuffer.clear();
- if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
markerBuffer) == 0) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
throw new IOException("reach the end of the file.");
}
markerBuffer.flip();
@@ -409,11 +434,11 @@ public class TsFileSequenceReader implements
AutoCloseable{
private ByteBuffer readData(long position, int size) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
if (position == -1) {
- if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
buffer) != size) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
throw new IOException("reach the end of the data");
}
} else {
- if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(),
buffer, position, size) != size) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size)
!= size) {
throw new IOException("reach the end of the data");
}
}
@@ -426,7 +451,7 @@ public class TsFileSequenceReader implements AutoCloseable{
*/
public int readRaw(long position, int length, ByteBuffer target) throws
IOException {
return ReadWriteIOUtils
- .readAsPossible(tsFileInput.wrapAsFileChannel(), target, position,
length);
+ .readAsPossible(tsFileInput, target, position, length);
}
/**
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 2e289f5..2743691 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.List;
@@ -31,6 +30,7 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.enums.TSFreqType;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
/**
* ConverterUtils is a utility class. It provide conversion between normal
datatype and byte array.
@@ -540,14 +540,14 @@ public class ReadWriteIOUtils {
/**
* read bytes from buffer with offset position to the end of buffer.
*/
- public static int readAsPossible(FileChannel channel, long position,
ByteBuffer buffer)
+ public static int readAsPossible(TsFileInput input, long position,
ByteBuffer buffer)
throws IOException {
int length = 0;
int read;
- while (buffer.hasRemaining() && (read = channel.read(buffer, position)) !=
-1) {
+ while (buffer.hasRemaining() && (read = input.read(buffer, position)) !=
-1) {
length += read;
position += read;
- channel.read(buffer, position);
+ input.read(buffer, position);
}
return length;
}
@@ -555,37 +555,19 @@ public class ReadWriteIOUtils {
/**
* read util to the end of buffer.
*/
- public static int readAsPossible(FileChannel channel, ByteBuffer buffer)
throws IOException {
+ public static int readAsPossible(TsFileInput input, ByteBuffer buffer)
throws IOException {
int length = 0;
int read;
- while (buffer.hasRemaining() && (read = channel.read(buffer)) != -1) {
+ while (buffer.hasRemaining() && (read = input.read(buffer)) != -1) {
length += read;
}
return length;
}
/**
- * read util to the end of buffer or up to len.
- */
- public static int readAsPossible(FileChannel channel, ByteBuffer buffer, int
len)
- throws IOException {
- int length = 0;
- int limit = buffer.limit();
- if (buffer.remaining() > len) {
- buffer.limit(buffer.position() + len);
- }
- int read;
- while (length < len && buffer.hasRemaining() && (read =
channel.read(buffer)) != -1) {
- length += read;
- }
- buffer.limit(limit);
- return length;
- }
-
- /**
* read bytes from buffer with offset position to the end of buffer or up to
len.
*/
- public static int readAsPossible(FileChannel channel, ByteBuffer target,
long offset, int len)
+ public static int readAsPossible(TsFileInput input, ByteBuffer target, long
offset, int len)
throws IOException {
int length = 0;
int limit = target.limit();
@@ -593,7 +575,7 @@ public class ReadWriteIOUtils {
target.limit(target.position() + len);
}
int read;
- while (length < len && target.hasRemaining() && (read =
channel.read(target, offset)) != -1) {
+ while (length < len && target.hasRemaining() && (read = input.read(target,
offset)) != -1) {
length += read;
offset += read;
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/header/PageHeaderTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/header/PageHeaderTest.java
index 0ae4810..4cd733e 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/header/PageHeaderTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/header/PageHeaderTest.java
@@ -24,9 +24,12 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.utils.TestHelper;
import org.apache.iotdb.tsfile.file.metadata.utils.Utils;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -119,8 +122,8 @@ public class PageHeaderTest {
FileInputStream fis = null;
PageHeader header = null;
try {
- fis = new FileInputStream(new File(PATH));
- header = PageHeader.deserializeFrom(DATA_TYPE, fis.getChannel(), offset,
true);
+ TsFileInput input = new DefaultTsFileInput(Paths.get(PATH));
+ header = PageHeader.deserializeFrom(DATA_TYPE, input, offset, true);
return header;
} catch (IOException e) {
e.printStackTrace();