This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 1cb41188 pass the first test
1cb41188 is described below
commit 1cb41188c5638da5b95c5f43ed71c603d41d94fd
Author: jt2594838 <[email protected]>
AuthorDate: Fri Apr 12 15:29:15 2024 +0800
pass the first test
---
.../tsfile/file/header/ChunkGroupHeader.java | 9 +-
.../file/metadata/DeviceMetadataIndexEntry.java | 4 +-
.../org/apache/tsfile/file/metadata/IDeviceID.java | 5 +-
.../apache/tsfile/file/metadata/PlainDeviceID.java | 4 +-
.../tsfile/file/metadata/StringArrayDeviceID.java | 37 ++-
.../apache/tsfile/read/TsFileSequenceReader.java | 325 +++++++++++++--------
.../java/org/apache/tsfile/read/common/Path.java | 5 +-
.../read/controller/MetadataQuerierByFileImpl.java | 2 +-
.../reader/block/SingleDeviceTsBlockReader.java | 190 +++++++++---
.../reader/series/AbstractFileSeriesReader.java | 6 +
.../read/reader/series/FileSeriesReader.java | 3 +
.../java/org/apache/tsfile/write/TsFileWriter.java | 30 +-
.../write/chunk/AlignedChunkGroupWriterImpl.java | 3 +-
.../chunk/NonAlignedChunkGroupWriterImpl.java | 3 +-
.../org/apache/tsfile/write/record/TSRecord.java | 3 +-
.../org/apache/tsfile/write/record/Tablet.java | 38 ++-
.../org/apache/tsfile/write/schema/Schema.java | 21 ++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 1 -
.../write/writer/TsFileIOWriterEndFileTest.java | 4 +-
.../write/writer/tsmiterator/TSMIterator.java | 5 +-
.../org/apache/tsfile/read/GetAllDevicesTest.java | 3 +-
...easurementChunkMetadataListMapIteratorTest.java | 3 +-
.../org/apache/tsfile/read/TsFileReaderTest.java | 6 +-
.../tsfile/read/TsFileSequenceReaderTest.java | 4 +-
.../tsfile/write/MetadataIndexConstructorTest.java | 12 +-
.../apache/tsfile/write/TableViewWriteTest.java | 11 +-
.../apache/tsfile/write/TsFileIOWriterTest.java | 4 +-
.../apache/tsfile/write/TsFileWriteApiTest.java | 7 +-
.../write/writer/RestorableTsFileIOWriterTest.java | 3 +-
.../writer/TsFileIOWriterMemoryControlTest.java | 2 +-
30 files changed, 516 insertions(+), 237 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkGroupHeader.java
b/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkGroupHeader.java
index 05c98a26..fd9588ea 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkGroupHeader.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkGroupHeader.java
@@ -21,7 +21,6 @@ package org.apache.tsfile.file.header;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.reader.TsFileInput;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -75,7 +74,7 @@ public class ChunkGroupHeader {
}
// TODO: add an interface in IDeviceID
- final IDeviceID deviceID =
IDeviceID.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+ final IDeviceID deviceID =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
return new ChunkGroupHeader(deviceID);
}
@@ -97,7 +96,7 @@ public class ChunkGroupHeader {
if (skipped != offsetVar) {
throw new IOException("Skipped " + skipped + " of " + offsetVar);
}
- final IDeviceID deviceID =
IDeviceID.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+ final IDeviceID deviceID =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
return new ChunkGroupHeader(deviceID);
}
@@ -115,7 +114,7 @@ public class ChunkGroupHeader {
public int serializeTo(OutputStream outputStream) throws IOException {
int length = 0;
length += ReadWriteIOUtils.write(MARKER, outputStream);
- length += ReadWriteIOUtils.writeVar(((PlainDeviceID)
deviceID).toStringID(), outputStream);
+ length += deviceID.serialize(outputStream);
return length;
}
@@ -123,7 +122,7 @@ public class ChunkGroupHeader {
public String toString() {
return "ChunkGroupHeader{"
+ "deviceID='"
- + ((PlainDeviceID) deviceID).toStringID()
+ + deviceID
+ '\''
+ ", serializedSize="
+ serializedSize
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/DeviceMetadataIndexEntry.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/DeviceMetadataIndexEntry.java
index 7d572d6a..59296db5 100644
---
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/DeviceMetadataIndexEntry.java
+++
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/DeviceMetadataIndexEntry.java
@@ -88,14 +88,14 @@ public class DeviceMetadataIndexEntry implements
IMetadataIndexEntry {
}
public static DeviceMetadataIndexEntry deserializeFrom(ByteBuffer buffer) {
- IDeviceID device = IDeviceID.DEFAULT_DESERIALIZER.deserializeFrom(buffer);
+ IDeviceID device =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer);
long offset = ReadWriteIOUtils.readLong(buffer);
return new DeviceMetadataIndexEntry(device, offset);
}
public static DeviceMetadataIndexEntry deserializeFrom(InputStream
inputStream)
throws IOException {
- IDeviceID device =
IDeviceID.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
+ IDeviceID device =
IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
long offset = ReadWriteIOUtils.readLong(inputStream);
return new DeviceMetadataIndexEntry(device, offset);
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
index 9e13b6da..256e3fd2 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
@@ -34,8 +34,6 @@ import java.nio.ByteBuffer;
public interface IDeviceID extends Comparable<IDeviceID>, Accountable {
Logger LOGGER = LoggerFactory.getLogger(IDeviceID.class);
- Deserializer DEFAULT_DESERIALIZER = StringArrayDeviceID.DESERIALIZER;
- Factory DEFAULT_FACTORY = StringArrayDeviceID.FACTORY;
int serialize(ByteBuffer byteBuffer);
@@ -80,11 +78,12 @@ public interface IDeviceID extends Comparable<IDeviceID>,
Accountable {
interface Deserializer {
IDeviceID deserializeFrom(ByteBuffer byteBuffer);
-
IDeviceID deserializeFrom(InputStream inputStream) throws IOException;
+ Deserializer DEFAULT_DESERIALIZER = StringArrayDeviceID.getDESERIALIZER();
}
interface Factory {
IDeviceID create(String deviceIdString);
+ Factory DEFAULT_FACTORY = StringArrayDeviceID.getFACTORY();
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/PlainDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/PlainDeviceID.java
index a983b2f0..0cf77051 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/PlainDeviceID.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/PlainDeviceID.java
@@ -88,11 +88,11 @@ public class PlainDeviceID implements IDeviceID {
}
public String toString() {
- return "PlainDeviceID{" + "deviceID='" + deviceID + '\'' + '}';
+ return deviceID;
}
public String toStringID() {
- return deviceID;
+ return toString();
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
index 40f16aff..1270962b 100644
---
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
+++
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.file.metadata;
+import java.util.Arrays;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.exception.TsFileRuntimeException;
@@ -35,7 +36,7 @@ import java.util.Objects;
public class StringArrayDeviceID implements IDeviceID {
- public static final Deserializer DESERIALIZER =
+ private static final Deserializer DESERIALIZER =
new Deserializer() {
@Override
public IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
@@ -48,7 +49,7 @@ public class StringArrayDeviceID implements IDeviceID {
}
};
- public static final Factory FACTORY =
+ private static final Factory FACTORY =
new Factory() {
@Override
public IDeviceID create(String deviceIdString) {
@@ -56,6 +57,8 @@ public class StringArrayDeviceID implements IDeviceID {
}
};
+
+
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(StringArrayDeviceID.class);
@@ -71,6 +74,14 @@ public class StringArrayDeviceID implements IDeviceID {
this.segments =
deviceIdString.split(TsFileConstant.PATH_SEPARATER_NO_REGEX);
}
+ public static Deserializer getDESERIALIZER() {
+ return DESERIALIZER;
+ }
+
+ public static Factory getFACTORY() {
+ return FACTORY;
+ }
+
@Override
public int serialize(ByteBuffer byteBuffer) {
int cnt = 0;
@@ -191,4 +202,26 @@ public class StringArrayDeviceID implements IDeviceID {
}
return cnt;
}
+
+ @Override
+ public String toString() {
+ return String.join(".", segments);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StringArrayDeviceID deviceID = (StringArrayDeviceID) o;
+ return Objects.deepEquals(segments, deviceID.segments);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(segments);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 4efdd504..68f65290 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -43,7 +43,6 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.tsfile.file.metadata.MeasurementMetadataIndexEntry;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -198,9 +197,9 @@ public class TsFileSequenceReader implements AutoCloseable {
* construct function for TsFileSequenceReader.
*
* @param input the input of a tsfile. The current position should be a
marker and then a chunk
- * Header, rather than the magic number
+ * Header, rather than the magic number
* @param fileMetadataPos the position of the file metadata in the
TsFileInput from the beginning
- * of the input to the current position
+ * of the input to the current position
* @param fileMetadataSize the byte size of the file metadata in the input
*/
public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int
fileMetadataSize) {
@@ -234,7 +233,9 @@ public class TsFileSequenceReader implements AutoCloseable {
return fileMetadataSize;
}
- /** Return the tsfile meta data size of this tsfile. */
+ /**
+ * Return the tsfile meta data size of this tsfile.
+ */
public long getFileMetadataSize() throws IOException {
return tsFileInput.size() - getFileMetadataPos();
}
@@ -250,7 +251,9 @@ public class TsFileSequenceReader implements AutoCloseable {
return tsFileInput.size() - tsFileMetaData.getMetaOffset();
}
- /** this function does not modify the position of the file reader. */
+ /**
+ * this function does not modify the position of the file reader.
+ */
public String readTailMagic() throws IOException {
long totalSize = tsFileInput.size();
ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
@@ -259,7 +262,9 @@ public class TsFileSequenceReader implements AutoCloseable {
return new String(magicStringBytes.array());
}
- /** whether the file is a complete TsFile: only if the head magic and tail
magic string exists. */
+ /**
+ * whether the file is a complete TsFile: only if the head magic and tail
magic string exists.
+ */
public boolean isComplete() throws IOException {
long size = tsFileInput.size();
// TSFileConfig.MAGIC_STRING.getBytes().length * 2 for two magic string
@@ -273,7 +278,9 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
- /** this function does not modify the position of the file reader. */
+ /**
+ * this function does not modify the position of the file reader.
+ */
public String readHeadMagic() throws IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
tsFileInput.read(magicStringBytes, 0);
@@ -281,7 +288,9 @@ public class TsFileSequenceReader implements AutoCloseable {
return new String(magicStringBytes.array());
}
- /** this function reads version number and checks compatibility of TsFile. */
+ /**
+ * this function reads version number and checks compatibility of TsFile.
+ */
public byte readVersionNumber() throws IOException {
ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES);
tsFileInput.read(versionNumberByte,
TSFileConfig.MAGIC_STRING.getBytes().length);
@@ -458,7 +467,7 @@ public class TsFileSequenceReader implements AutoCloseable {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
- firstTimeseriesMetadata =
tryToGetFirstTimeseriesMetadata(metadataIndexNode);
+ firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode);
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode,
path.getMeasurement(), false);
@@ -568,89 +577,175 @@ public class TsFileSequenceReader implements
AutoCloseable {
return metadataIndexPair;
}
- // This method is only used for TsFile
- public List<ITimeSeriesMetadata> readITimeseriesMetadata(
- IDeviceID device, Set<String> measurements, MetadataIndexNode root)
throws IOException {
+ /**
+ * Searching from the start node and try to find the root node of the
deviceID.
+ *
+ * @param deviceID desired device
+ * @param startNode start of the search, if not provided, start from the
table root
+ * @return MetadataIndexNode which is the root of deviceID
+ */
+ private MetadataIndexNode getDeviceRootNode(IDeviceID deviceID,
MetadataIndexNode startNode)
+ throws IOException {
readFileMetadata();
- MetadataIndexNode deviceMetadataIndexNode =
- root != null
- ? root
- :
tsFileMetaData.getTableMetadataIndexNodeMap().get(device.getTableName());
- Pair<IMetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
false);
- if (metadataIndexPair == null) {
+ startNode =
+ startNode != null
+ ? startNode
+ :
tsFileMetaData.getTableMetadataIndexNodeMap().get(deviceID.getTableName());
+
+ MetadataIndexNode measurementMetadataIndexNode;
+ ByteBuffer buffer;
+ if (startNode.isDeviceLevel()) {
+ Pair<IMetadataIndexEntry, Long> metadataIndexPair =
+ getMetadataAndEndOffsetOfDeviceNode(startNode, deviceID, false);
+ if (metadataIndexPair == null) {
+ return null;
+ }
+
+ // the content of next Layer MeasurementNode of the specific device's
DeviceNode
+ buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
+ // next layer MeasurementNode of the specific DeviceNode
+ try {
+ measurementMetadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer, false);
+ } catch (Exception e) {
+ logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
+ throw e;
+ }
+ } else {
+ measurementMetadataIndexNode = startNode;
+ }
+ return measurementMetadataIndexNode;
+ }
+
+ /**
+ * Read the TimeseriesMetadata of the given measurement and its successors
from the index node
+ * into the list.
+ *
+ * @param timeseriesMetadataList result holder
+ * @param node index node to be read from
+ * @param measurement the desired measurement
+ * @return true if the measurement exists
+ * @throws IOException when read fails
+ */
+ public boolean readITimeseriesMetadata(
+ List<TimeseriesMetadata> timeseriesMetadataList,
+ MetadataIndexNode node,
+ String measurement) throws IOException {
+ Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair =
+ getMetadataAndEndOffsetOfMeasurementNode(
+ node, measurement, false);
+
+ if (measurementMetadataIndexPair == null) {
+ return false;
+ }
+ // the content of TimeseriesNode of the specific MeasurementLeafNode
+ ByteBuffer buffer =
+ readData(
+ measurementMetadataIndexPair.left.getOffset(),
measurementMetadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ try {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer,
true));
+ } catch (Exception e) {
+ logger.error(
+ "Something error happened while deserializing TimeseriesMetadata
of file {}", file);
+ throw e;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Read TimeSeriesMetadata of the given device. This method is only used for
TsFile.
+ *
+ * @param device deviceId to be read
+ * @param measurements measurements to be read
+ * @param root search start node, if not provided, use the root node of the
table of the device
+ * @param mergeAlignedSeries see @return
+ * @return when the device is not aligned, or mergeAlignedSeries is false,
each result correspond
+ * to one series in the provided measurements (if exists); otherwise, all
columns in the aligned
+ * device will be merged into one AlignedTimeSeriesMetadata.
+ * @throws IOException if read fails
+ */
+ public List<ITimeSeriesMetadata> readITimeseriesMetadata(
+ IDeviceID device, Set<String> measurements, MetadataIndexNode root,
+ boolean mergeAlignedSeries) throws IOException {
+ // find the index node associated with the device
+ final MetadataIndexNode measurementMetadataIndexNode =
getDeviceRootNode(device, root);
+ if (measurementMetadataIndexNode == null) {
return Collections.emptyList();
}
+
+ // Get the time column metadata if the device is aligned
+ TimeseriesMetadata timeColumnMetadata =
+ getTimeColumnMetadata(measurementMetadataIndexNode);
+ List<TimeseriesMetadata> valueTimeseriesMetadataList =
+ timeColumnMetadata != null ? new ArrayList<>() : null;
+
List<ITimeSeriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
List<String> measurementList = new ArrayList<>(measurements);
- Set<String> measurementsHadFound = new HashSet<>();
- // the content of next Layer MeasurementNode of the specific device's
DeviceNode
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- Pair<IMetadataIndexEntry, Long> measurementMetadataIndexPair =
metadataIndexPair;
- List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
-
- // next layer MeasurementNode of the specific DeviceNode
- MetadataIndexNode measurementMetadataIndexNode;
- try {
- measurementMetadataIndexNode = MetadataIndexNode.deserializeFrom(buffer,
false);
- } catch (Exception e) {
- logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
- throw e;
- }
- // Get the first timeseriesMetadata of the device
- TimeseriesMetadata firstTimeseriesMetadata =
- tryToGetFirstTimeseriesMetadata(measurementMetadataIndexNode);
+ measurementList.sort(null);
+ boolean[] measurementFound = new boolean[measurements.size()];
+ int measurementFoundCnt = 0;
- for (int i = 0; i < measurementList.size(); i++) {
- if (measurementsHadFound.contains(measurementList.get(i))) {
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ for (int i = 0; i < measurementList.size() && measurementFoundCnt <
measurementList.size();
+ i++) {
+ final String measurementName = measurementList.get(i);
+ timeseriesMetadataList.clear();
+ // read the leaf node that may contain the i-th measurement into a list
+ if (measurementFound[i]
+ || !readITimeseriesMetadata(timeseriesMetadataList,
measurementMetadataIndexNode,
+ measurementName)) {
continue;
}
- timeseriesMetadataList.clear();
- measurementMetadataIndexPair =
- getMetadataAndEndOffsetOfMeasurementNode(
- measurementMetadataIndexNode, measurementList.get(i), false);
+ // in the list, search for the all measurements that are not found
+ measurementFoundCnt += searchInTimeseriesList(measurementList, i,
measurementFound,
+ timeseriesMetadataList,
+ resultTimeseriesMetadataList, timeColumnMetadata,
valueTimeseriesMetadataList,
+ mergeAlignedSeries);
+ }
+ if (valueTimeseriesMetadataList != null &&
!valueTimeseriesMetadataList.isEmpty()) {
+ resultTimeseriesMetadataList.add(
+ new AlignedTimeSeriesMetadata(
+ timeColumnMetadata, valueTimeseriesMetadataList));
+ }
+ return resultTimeseriesMetadataList;
+ }
- if (measurementMetadataIndexPair == null) {
+ private int searchInTimeseriesList(List<String> measurementList, int
startIndex,
+ boolean[] measurementFound, List<TimeseriesMetadata>
timeseriesMetadataList,
+ List<ITimeSeriesMetadata> resultTimeseriesMetadataList,
+ TimeseriesMetadata timeColumnMetadata,
+ List<TimeseriesMetadata> valueTimeseriesMetadataList,
+ boolean mergeAlignedSeries) {
+ int numOfFoundMeasurements = 0;
+ for (int j = startIndex; j < measurementList.size(); j++) {
+ int searchResult;
+ if (measurementFound[j] ||
+ (searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+ measurementList.get(j))) < 0
+ ) {
continue;
}
- // the content of TimeseriesNode of the specific MeasurementLeafNode
- buffer =
- readData(
- measurementMetadataIndexPair.left.getOffset(),
measurementMetadataIndexPair.right);
- while (buffer.hasRemaining()) {
- try {
-
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
- } catch (Exception e) {
- logger.error(
- "Something error happened while deserializing TimeseriesMetadata
of file {}", file);
- throw e;
- }
- }
- for (int j = i; j < measurementList.size(); j++) {
- String current = measurementList.get(j);
- if (!measurementsHadFound.contains(current)) {
- int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current);
- if (searchResult >= 0) {
- if (firstTimeseriesMetadata != null) {
- List<TimeseriesMetadata> valueTimeseriesMetadataList = new
ArrayList<>();
-
valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
- resultTimeseriesMetadataList.add(
- new AlignedTimeSeriesMetadata(
- firstTimeseriesMetadata, valueTimeseriesMetadataList));
- } else {
-
resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
- }
- measurementsHadFound.add(current);
- }
- }
- if (measurementsHadFound.size() == measurements.size()) {
- return resultTimeseriesMetadataList;
+
+ final TimeseriesMetadata valueColumnMetadata =
timeseriesMetadataList.get(searchResult);
+ if (timeColumnMetadata != null) {
+ if (!mergeAlignedSeries) {
+ resultTimeseriesMetadataList.add(
+ new AlignedTimeSeriesMetadata(
+ timeColumnMetadata,
Collections.singletonList(valueColumnMetadata)));
+ } else if (valueTimeseriesMetadataList != null) {
+ valueTimeseriesMetadataList.add(valueColumnMetadata);
}
+ } else {
+ resultTimeseriesMetadataList.add(valueColumnMetadata);
}
+ measurementFound[j] = true;
+ numOfFoundMeasurements++;
}
- return resultTimeseriesMetadataList;
+ return numOfFoundMeasurements;
}
+
protected int binarySearchInTimeseriesMetadataList(
List<TimeseriesMetadata> timeseriesMetadataList, String key) {
int low = 0;
@@ -710,8 +805,8 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* @return an iterator of "device, isAligned" list, in which names of
devices are ordered in
- * dictionary order, and isAligned represents whether the device is
aligned. Only read devices
- * on one device leaf node each time to save memory.
+ * dictionary order, and isAligned represents whether the device is aligned.
Only read devices on
+ * one device leaf node each time to save memory.
*/
public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws
IOException {
readFileMetadata();
@@ -866,7 +961,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* @return an iterator of timeseries list, in which names of timeseries are
ordered in dictionary
- * order
+ * order
* @throws IOException io error
*/
public Iterator<List<Path>> getPathsIterator() throws IOException {
@@ -969,7 +1064,7 @@ public class TsFileSequenceReader implements AutoCloseable
{
return "".equals(((MeasurementMetadataIndexEntry) entry).getName());
}
- TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode
measurementNode)
+ TimeseriesMetadata getTimeColumnMetadata(MetadataIndexNode measurementNode)
throws IOException {
// Not aligned timeseries
if (!isAlignedDevice(measurementNode)) {
@@ -996,7 +1091,7 @@ public class TsFileSequenceReader implements AutoCloseable
{
measurementNode.getChildren().get(0).getOffset(),
measurementNode.getChildren().get(1).getOffset());
MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer, false);
- return tryToGetFirstTimeseriesMetadata(metadataIndexNode);
+ return getTimeColumnMetadata(metadataIndexNode);
}
return null;
}
@@ -1007,16 +1102,16 @@ public class TsFileSequenceReader implements
AutoCloseable {
*
* @param measurementNode first measurement node of the device
* @param excludedMeasurementIds do not deserialize chunk metadatas whose
measurementId is in the
- * set. Notice: It only takes effect when the needChunkMetadata
parameter is true.
+ * set. Notice: It only takes effect when the needChunkMetadata parameter is
true.
* @param needChunkMetadata need to deserialize chunk metadatas or not
* @return measurement -> chunk metadata list -> timeseries metadata
<startOffset, endOffset>
*/
public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
- getTimeseriesMetadataOffsetByDevice(
- MetadataIndexNode measurementNode,
- Set<String> excludedMeasurementIds,
- boolean needChunkMetadata)
- throws IOException {
+ getTimeseriesMetadataOffsetByDevice(
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
timeseriesMetadataOffsetMap =
new LinkedHashMap<>();
List<IMetadataIndexEntry> childrenEntryList =
measurementNode.getChildren();
@@ -1336,7 +1431,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @param metadataIndex given MetadataIndexNode
* @param deviceID target device
* @param exactSearch whether is in exact search mode, return null when
there is no entry with
- * name; or else return the nearest MetadataIndexEntry before it (for
deeper search)
+ * name; or else return the nearest MetadataIndexEntry before it (for deeper
search)
* @return target MetadataIndexEntry, endOffset pair
*/
protected Pair<IMetadataIndexEntry, Long>
getMetadataAndEndOffsetOfDeviceNode(
@@ -1367,7 +1462,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @param metadataIndex given MetadataIndexNode
* @param measurement target measurement
* @param exactSearch whether is in exact search mode, return null when
there is no entry with
- * name; or else return the nearest MetadataIndexEntry before it (for
deeper search)
+ * name; or else return the nearest MetadataIndexEntry before it (for deeper
search)
* @return target MetadataIndexEntry, endOffset pair
*/
protected Pair<IMetadataIndexEntry, Long>
getMetadataAndEndOffsetOfMeasurementNode(
@@ -1434,8 +1529,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
/**
- * read data from current position of the input, and deserialize it to a
CHUNK_HEADER. <br>
- * This method is not threadsafe.
+ * read data from current position of the input, and deserialize it to a
CHUNK_HEADER. <br> This
+ * method is not threadsafe.
*
* @return a CHUNK_HEADER
* @throws IOException io error
@@ -1546,7 +1641,6 @@ public class TsFileSequenceReader implements
AutoCloseable {
*
* @param timeseriesMetadata timeseries' metadata
* @return a pair of {@link CompressionType} and {@link TSEncoding} of given
timeseries.
- * @throws IOException
*/
public Pair<CompressionType, TSEncoding>
readTimeseriesCompressionTypeAndEncoding(
TimeseriesMetadata timeseriesMetadata) throws IOException {
@@ -1561,7 +1655,9 @@ public class TsFileSequenceReader implements
AutoCloseable {
return
ChunkHeader.deserializeCompressionTypeAndEncoding(tsFileInput.wrapAsInputStream());
}
- /** Get measurement schema by chunkMetadatas. */
+ /**
+ * Get measurement schema by chunkMetadatas.
+ */
public MeasurementSchema getMeasurementSchema(List<IChunkMetadata>
chunkMetadataList)
throws IOException {
if (chunkMetadataList.isEmpty()) {
@@ -1621,8 +1717,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
/**
- * read one byte from the input. <br>
- * this method is not thread safe
+ * read one byte from the input. <br> this method is not thread safe
*/
public byte readMarker() throws IOException {
markerBuffer.clear();
@@ -1651,12 +1746,12 @@ public class TsFileSequenceReader implements
AutoCloseable {
/**
* read data from tsFileInput, from the current position (if position = -1),
or the given
- * position. <br>
- * if position = -1, the tsFileInput's position will be changed to the
current position + real
- * data size that been read. Other wise, the tsFileInput's position is not
changed.
+ * position. <br> if position = -1, the tsFileInput's position will be
changed to the current
+ * position + real data size that been read. Other wise, the tsFileInput's
position is not
+ * changed.
*
* @param position the start position of data in the tsFileInput, or the
current position if
- * position = -1
+ * position = -1
* @param totalSize the size of data that want to read
* @return data that been read.
*/
@@ -1697,7 +1792,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
* position.
*
* @param start the start position of data in the tsFileInput, or the
current position if position
- * = -1
+ * = -1
* @param end the end position of data that want to read
* @return data that been read.
*/
@@ -1712,7 +1807,9 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
}
- /** notice, the target bytebuffer are not flipped. */
+ /**
+ * notice, the target bytebuffer are not flipped.
+ */
public int readRaw(long position, int length, ByteBuffer target) throws
IOException {
return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position,
length);
}
@@ -1723,9 +1820,9 @@ public class TsFileSequenceReader implements
AutoCloseable {
* @param newSchema the schema on each time series in the file
* @param chunkGroupMetadataList ChunkGroupMetadata List
* @param fastFinish if true and the file is complete, then newSchema and
chunkGroupMetadataList
- * parameter will be not modified.
+ * parameter will be not modified.
* @return the position of the file that is fine. All data after the
position in the file should
- * be truncated.
+ * be truncated.
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public long selfCheck(
@@ -1806,7 +1903,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
if (dataSize > 0) {
if (((byte) (chunkHeader.getChunkType() & 0x3F))
== MetaMarker
- .CHUNK_HEADER) { // more than one page, we could use
page statistics to
+ .CHUNK_HEADER) { // more than one page, we could use page
statistics to
// generate chunk statistic
while (dataSize > 0) {
// a new Page
@@ -2009,7 +2106,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
*
* @param filename the path of file
* @param fastFinish if true, the method will only check the format of head
(Magic String TsFile,
- * Version Number) and tail (Magic String TsFile) of TsFile.
+ * Version Number) and tail (Magic String TsFile) of TsFile.
* @return the status of TsFile
*/
public long selfCheckWithInfo(
@@ -2174,7 +2271,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
public List<IChunkMetadata> getIChunkMetadataList(IDeviceID deviceID, String
measurementName)
throws IOException {
List<ITimeSeriesMetadata> timeseriesMetaData =
- readITimeseriesMetadata(deviceID,
Collections.singleton(measurementName), null);
+ readITimeseriesMetadata(deviceID,
Collections.singleton(measurementName), null, false);
if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
return Collections.emptyList();
}
@@ -2186,7 +2283,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
public List<List<IChunkMetadata>> getIChunkMetadataList(
IDeviceID deviceID, Set<String> measurementNames, MetadataIndexNode
root) throws IOException {
List<ITimeSeriesMetadata> timeseriesMetaData =
- readITimeseriesMetadata(deviceID, measurementNames, root);
+ readITimeseriesMetadata(deviceID, measurementNames, root, true);
if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
return Collections.emptyList();
}
@@ -2194,7 +2291,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
for (ITimeSeriesMetadata timeseriesMetaDatum : timeseriesMetaData) {
List<IChunkMetadata> chunkMetadataList =
readIChunkMetaDataList(timeseriesMetaDatum);
chunkMetadataList.sort(
-
Comparator.comparingLong(org.apache.tsfile.file.metadata.IChunkMetadata::getStartTime));
+ Comparator.comparingLong(IChunkMetadata::getStartTime));
results.add(chunkMetadataList);
}
return results;
@@ -2229,7 +2326,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
- firstTimeseriesMetadata =
tryToGetFirstTimeseriesMetadata(metadataIndexNode);
+ firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode);
if (firstTimeseriesMetadata == null) {
throw new IOException("Timeseries of device {" + device + "} are not
aligned");
}
@@ -2337,7 +2434,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
Map<String, TimeseriesMetadata> timeseriesMetadataMap =
readDeviceMetadata(device);
for (TimeseriesMetadata timeseriesMetadata :
timeseriesMetadataMap.values()) {
result.put(
- ((PlainDeviceID) device).toStringID()
+ device.toString()
+ TsFileConstant.PATH_SEPARATOR
+ timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTsDataType());
@@ -2435,10 +2532,10 @@ public class TsFileSequenceReader implements
AutoCloseable {
/**
* @return An iterator of linked hashmaps ( measurement -> chunk metadata
list ). When traversing
- * the linked hashmap, you will get chunk metadata lists according to
the lexicographic order
- * of the measurements. The first measurement of the linked hashmap of
each iteration is
- * always larger than the last measurement of the linked hashmap of the
previous iteration in
- * lexicographic order.
+ * the linked hashmap, you will get chunk metadata lists according to the
lexicographic order of
+ * the measurements. The first measurement of the linked hashmap of each
iteration is always
+ * larger than the last measurement of the linked hashmap of the previous
iteration in
+ * lexicographic order.
*/
public Iterator<Map<String, List<ChunkMetadata>>>
getMeasurementChunkMetadataListMapIterator(
IDeviceID device) throws IOException {
diff --git a/tsfile/src/main/java/org/apache/tsfile/read/common/Path.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/Path.java
index b89866b3..e7d531f6 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/common/Path.java
@@ -22,7 +22,6 @@ package org.apache.tsfile.read.common;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.exception.PathParseException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.read.common.parser.PathNodesGenerator;
import org.apache.tsfile.utils.PublicBAOS;
@@ -55,7 +54,7 @@ public class Path implements Serializable, Comparable<Path> {
// Only used for test
public Path(IDeviceID deviceID) {
- this(((PlainDeviceID) deviceID).toStringID());
+ this(deviceID.toString());
}
/**
@@ -98,7 +97,7 @@ public class Path implements Serializable, Comparable<Path> {
}
public Path(IDeviceID device, String measurement, boolean needCheck) {
- this(((PlainDeviceID) device).toStringID(), measurement, needCheck);
+ this(device.toString(), measurement, needCheck);
}
/**
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index fd04df6b..c38e8505 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -159,7 +159,7 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
}
List<ITimeSeriesMetadata> timeseriesMetaDataList =
- tsFileReader.readITimeseriesMetadata(selectedDevice,
selectedMeasurements, null);
+ tsFileReader.readITimeseriesMetadata(selectedDevice,
selectedMeasurements, null, false);
for (ITimeSeriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
List<IChunkMetadata> chunkMetadataList =
tsFileReader.readIChunkMetaDataList(timeseriesMetadata);
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
index f9b635c5..8099a6ee 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -1,6 +1,7 @@
package org.apache.tsfile.read.reader.block;
import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -13,6 +14,7 @@ import
org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.tsfile.read.reader.series.FileSeriesReader;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,20 +67,7 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
Filter timeFilter = timeExpression == null ? null :
timeExpression.toFilter();
for (List<IChunkMetadata> chunkMetadataList : chunkMetadataLists) {
- if (!chunkMetadataList.isEmpty()) {
- final String measurementUid =
chunkMetadataList.get(0).getMeasurementUid();
- AbstractFileSeriesReader seriesReader =
- new FileSeriesReader(chunkLoader, chunkMetadataList, timeFilter);
- if (seriesReader.hasNextBatch()) {
- measureColumnContextMap.put(
- measurementUid,
- new MeasurementColumnContext(
- measurementUid,
- task.getColumnMapping().getColumnPos(measurementUid),
- seriesReader.nextBatch(),
- seriesReader));
- }
- }
+ constructColumnContext(chunkMetadataList, chunkLoader, timeFilter);
}
for (String idColumn : task.getColumnMapping().getIdColumns()) {
@@ -88,6 +77,38 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
}
}
+ private void constructColumnContext(List<IChunkMetadata> chunkMetadataList,
+ IChunkLoader chunkLoader, Filter timeFilter) throws IOException {
+ if (chunkMetadataList.isEmpty()) {
+ return;
+ }
+ final IChunkMetadata chunkMetadata = chunkMetadataList.get(0);
+ AbstractFileSeriesReader seriesReader =
+ new FileSeriesReader(chunkLoader, chunkMetadataList, timeFilter);
+ if (seriesReader.hasNextBatch()) {
+ if (chunkMetadata instanceof AlignedChunkMetadata) {
+ final List<String> currentChunkMeasurementNames =
seriesReader.getCurrentChunkMeasurementNames();
+ List<List<Integer>> posInResult = new ArrayList<>();
+ for (String currentChunkMeasurementName :
currentChunkMeasurementNames) {
+
posInResult.add(task.getColumnMapping().getColumnPos(currentChunkMeasurementName));
+ }
+ measureColumnContextMap.put("",
+ new VectorMeasurementColumnContext(posInResult,
+ seriesReader.nextBatch(), seriesReader
+ ));
+ } else {
+ final String measurementUid = chunkMetadata.getMeasurementUid();
+ measureColumnContextMap.put(
+ measurementUid,
+ new SingleMeasurementColumnContext(
+ measurementUid,
+ task.getColumnMapping().getColumnPos(measurementUid),
+ seriesReader.nextBatch(),
+ seriesReader));
+ }
+ }
+ }
+
@Override
public boolean hasNext() {
if (!lastBlockReturned) {
@@ -100,7 +121,7 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
currentBlock.reset();
nextTime = Long.MAX_VALUE;
- List<MeasurementColumnContext> alignedColumns = new ArrayList<>();
+ List<MeasurementColumnContext> minTimeColumns = new ArrayList<>();
while (currentBlock.getPositionCount() < blockSize) {
// find the minimum time among the batches and the associated columns
@@ -109,14 +130,15 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
final long currentTime = batchData.currentTime();
if (nextTime > currentTime) {
nextTime = currentTime;
- alignedColumns.clear();
+ minTimeColumns.clear();
+ minTimeColumns.add(entry.getValue());
} else if (nextTime == currentTime) {
- alignedColumns.add(entry.getValue());
+ minTimeColumns.add(entry.getValue());
}
}
try {
- fillMeasurements(alignedColumns);
+ fillMeasurements(minTimeColumns);
} catch (IOException e) {
LOGGER.error("Cannot fill measurements", e);
return false;
@@ -152,32 +174,36 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
}
}
- private void fillMeasurements(List<MeasurementColumnContext> alignedColumns)
throws IOException {
+ private void fillMeasurements(List<MeasurementColumnContext> minTimeColumns)
throws IOException {
if (measurementExpression == null || measurementExpression.satisfy(this)) {
// use the time to fill the block
final int positionCount = currentBlock.getPositionCount();
currentBlock.getTimeColumn().getTimes()[positionCount] = nextTime;
// project the value columns to the result
- for (final MeasurementColumnContext columnContext : alignedColumns) {
+ for (final MeasurementColumnContext columnContext : minTimeColumns) {
+ columnContext.fillInto(currentBlock, positionCount);
+ advanceColumn(columnContext.currentBatch, columnContext);
+ }
+ currentBlock.setPositionCount(positionCount + 1);
+ } else {
+ for (final MeasurementColumnContext columnContext : minTimeColumns) {
final BatchData batchData = columnContext.currentBatch;
- final List<Integer> posInResult = columnContext.posInResult;
- for (Integer pos : posInResult) {
- final Column column = currentBlock.getColumn(pos);
- fillMeasurementColumn(column, batchData, positionCount);
- }
+ advanceColumn(batchData, columnContext);
+ }
+ }
+ }
- batchData.next();
- if (!batchData.hasCurrent()) {
- // get next batch of the column
- if (columnContext.seriesReader.hasNextBatch()) {
- columnContext.currentBatch =
columnContext.seriesReader.nextBatch();
- } else {
- // no more data in this column
- measureColumnContextMap.remove(columnContext.columnName);
- }
- }
+ private void advanceColumn(BatchData batchData, MeasurementColumnContext
columnContext)
+ throws IOException {
+ batchData.next();
+ if (!batchData.hasCurrent()) {
+ // get next batch of the column
+ if (columnContext.seriesReader.hasNextBatch()) {
+ columnContext.currentBatch = columnContext.seriesReader.nextBatch();
+ } else {
+ // no more data in this column
+ columnContext.removeFrom(measureColumnContextMap);
}
- currentBlock.setPositionCount(positionCount + 1);
}
}
@@ -210,7 +236,7 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
column.setPositionCount(endPos);
}
- private void fillMeasurementColumn(Column column, BatchData batchData, int
pos) {
+ private static void fillSingleMeasurementColumn(Column column, BatchData
batchData, int pos) {
switch (batchData.getDataType()) {
case BOOLEAN:
column.getBooleans()[pos] = batchData.getBoolean();
@@ -250,26 +276,104 @@ public class SingleDeviceTsBlockReader implements
TsBlockReader {
// nothing to be done
}
+ public abstract static class MeasurementColumnContext {
+
+ protected BatchData currentBatch;
+ protected final AbstractFileSeriesReader seriesReader;
+
+ protected MeasurementColumnContext(AbstractFileSeriesReader seriesReader,
+ BatchData currentBatch) {
+ this.seriesReader = seriesReader;
+ this.currentBatch = currentBatch;
+ }
+
+ abstract void removeFrom(Map<String, MeasurementColumnContext>
columnContextMap);
+
+ abstract void fillInto(TsBlock block, int position);
+ }
+
// gather necessary fields in this class to avoid redundant map access
- public static class MeasurementColumnContext {
+ public static class SingleMeasurementColumnContext extends
MeasurementColumnContext {
private final String columnName;
private final List<Integer> posInResult;
- private BatchData currentBatch;
- private final AbstractFileSeriesReader seriesReader;
- public MeasurementColumnContext(
+ public SingleMeasurementColumnContext(
String columnName,
List<Integer> posInResult,
BatchData currentBatch,
AbstractFileSeriesReader seriesReader) {
+ super(seriesReader, currentBatch);
this.columnName = columnName;
this.posInResult = posInResult;
- this.currentBatch = currentBatch;
- this.seriesReader = seriesReader;
+ }
+
+ @Override
+ void removeFrom(Map<String, MeasurementColumnContext> columnContextMap) {
+ columnContextMap.remove(columnName);
+ }
+
+ @Override
+ void fillInto(TsBlock block, int position) {
+ for (Integer pos : posInResult) {
+ final Column column = block.getColumn(pos);
+ fillSingleMeasurementColumn(column, currentBatch, position);
+ }
}
}
+ public static class VectorMeasurementColumnContext extends
MeasurementColumnContext {
+
+ private final List<List<Integer>> posInResult;
+
+ public VectorMeasurementColumnContext(
+ List<List<Integer>> posInResult,
+ BatchData currentBatch,
+ AbstractFileSeriesReader seriesReader) {
+ super(seriesReader, currentBatch);
+ this.posInResult = posInResult;
+ }
+
+ @Override
+ void removeFrom(Map<String, MeasurementColumnContext> columnContextMap) {
+ columnContextMap.remove("");
+ }
+
+ @Override
+ void fillInto(TsBlock block, int blockRowNum) {
+ final TsPrimitiveType[] vector = currentBatch.getVector();
+ for (int i = 0; i < vector.length; i++) {
+ final TsPrimitiveType value = vector[i];
+ final List<Integer> columnPositions = posInResult.get(i);
+ for (Integer pos : columnPositions) {
+ switch (value.getDataType()) {
+ case TEXT:
+ block.getColumn(pos).getBinaries()[blockRowNum] =
value.getBinary();
+ break;
+ case INT32:
+ block.getColumn(pos).getInts()[blockRowNum] = value.getInt();
+ break;
+ case INT64:
+ block.getColumn(pos).getLongs()[blockRowNum] = value.getLong();
+ break;
+ case BOOLEAN:
+ block.getColumn(pos).getBooleans()[blockRowNum] =
value.getBoolean();
+ break;
+ case FLOAT:
+ block.getColumn(pos).getFloats()[blockRowNum] = value.getFloat();
+ break;
+ case DOUBLE:
+ block.getColumn(pos).getDoubles()[blockRowNum] =
value.getDouble();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
value.getDataType());
+ }
+ }
+ }
+ }
+ }
+
+
public static class IdColumnContext {
private final List<Integer> posInResult;
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java
index edb8c05e..0b6a1493 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.read.reader.series;
+import java.util.ArrayList;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.controller.IChunkLoader;
@@ -35,6 +36,7 @@ public abstract class AbstractFileSeriesReader implements
IBatchReader {
protected IChunkLoader chunkLoader;
protected List<IChunkMetadata> chunkMetadataList;
protected IChunkReader chunkReader;
+ protected List<String> currentChunkMeasurementNames = new ArrayList<>();
private int chunkToRead;
protected Filter filter;
@@ -91,4 +93,8 @@ public abstract class AbstractFileSeriesReader implements
IBatchReader {
private IChunkMetadata nextChunkMeta() {
return chunkMetadataList.get(chunkToRead++);
}
+
+ public List<String> getCurrentChunkMeasurementNames() {
+ return currentChunkMeasurementNames;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
index 1704d553..c6cb00d9 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
@@ -45,9 +45,11 @@ public class FileSeriesReader extends
AbstractFileSeriesReader {
@Override
protected void initChunkReader(IChunkMetadata chunkMetaData) throws
IOException {
+ currentChunkMeasurementNames.clear();
if (chunkMetaData instanceof ChunkMetadata) {
Chunk chunk = chunkLoader.loadChunk((ChunkMetadata) chunkMetaData);
this.chunkReader = new ChunkReader(chunk, filter);
+ currentChunkMeasurementNames.add(chunkMetaData.getMeasurementUid());
} else {
AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata)
chunkMetaData;
Chunk timeChunk =
@@ -55,6 +57,7 @@ public class FileSeriesReader extends
AbstractFileSeriesReader {
List<Chunk> valueChunkList = new ArrayList<>();
for (IChunkMetadata metadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
valueChunkList.add(chunkLoader.loadChunk((ChunkMetadata) metadata));
+ currentChunkMeasurementNames.add(metadata.getMeasurementUid());
}
this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList,
filter);
}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index b9c5d0f3..de8d442c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -25,7 +25,6 @@ import
org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.exception.write.NoTableException;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
@@ -167,7 +166,7 @@ public class TsFileWriter implements AutoCloseable {
for (Map.Entry<Path, IMeasurementSchema> entry : schemaMap.entrySet()) {
IMeasurementSchema measurementSchema = entry.getValue();
if (measurementSchema instanceof VectorMeasurementSchema) {
- final IDeviceID deviceID =
IDeviceID.DEFAULT_FACTORY.create(entry.getKey().getDevice());
+ final IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(entry.getKey().getDevice());
MeasurementGroup group =
measurementGroupMap.getOrDefault(deviceID, new
MeasurementGroup(true));
List<String> measurementList =
measurementSchema.getSubMeasurementsList();
@@ -183,7 +182,7 @@ public class TsFileWriter implements AutoCloseable {
}
measurementGroupMap.put(deviceID, group);
} else {
- final IDeviceID deviceID =
IDeviceID.DEFAULT_FACTORY.create(entry.getKey().getDevice());
+ final IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(entry.getKey().getDevice());
MeasurementGroup group =
measurementGroupMap.getOrDefault(deviceID, new
MeasurementGroup(false));
group
@@ -219,7 +218,7 @@ public class TsFileWriter implements AutoCloseable {
*/
public void registerDevice(String deviceIdString, String templateName)
throws WriteProcessException {
- IDeviceID deviceID = IDeviceID.DEFAULT_FACTORY.create(deviceIdString);
+ IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdString);
if (!getSchema().getSchemaTemplates().containsKey(templateName)) {
throw new WriteProcessException("given template is not existed! " +
templateName);
}
@@ -232,6 +231,12 @@ public class TsFileWriter implements AutoCloseable {
getSchema().registerDevice(deviceID, templateName);
}
+ @Deprecated
+ public void registerTimeseries(Path devicePath, MeasurementSchema
measurementSchema)
+ throws WriteProcessException {
+
registerTimeseries(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()),
measurementSchema);
+ }
+
/** Register nonAligned timeseries by single. */
public void registerTimeseries(IDeviceID deviceID, MeasurementSchema
measurementSchema)
throws WriteProcessException {
@@ -258,11 +263,12 @@ public class TsFileWriter implements AutoCloseable {
getSchema().registerMeasurementGroup(deviceID, measurementGroup);
}
+ @Deprecated
/** Register nonAligned timeseries by groups. */
public void registerTimeseries(Path devicePath, List<MeasurementSchema>
measurementSchemas) {
for (MeasurementSchema schema : measurementSchemas) {
try {
-
registerTimeseries(IDeviceID.DEFAULT_FACTORY.create(devicePath.getDevice()),
schema);
+
registerTimeseries(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()),
schema);
} catch (WriteProcessException e) {
LOG.warn(e.getMessage());
}
@@ -272,7 +278,7 @@ public class TsFileWriter implements AutoCloseable {
public void registerAlignedTimeseries(Path devicePath,
List<MeasurementSchema> measurementSchemas)
throws WriteProcessException {
registerAlignedTimeseries(
- IDeviceID.DEFAULT_FACTORY.create(devicePath.getDevice()),
measurementSchemas);
+ IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()),
measurementSchemas);
}
/**
* Register aligned timeseries. Once the device is registered for aligned
timeseries, it cannot be
@@ -304,7 +310,7 @@ public class TsFileWriter implements AutoCloseable {
private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
throws WriteProcessException, IOException {
// initial ChunkGroupWriter of this device in the TSRecord
- final IDeviceID deviceID =
IDeviceID.DEFAULT_FACTORY.create(record.deviceId);
+ final IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(record.deviceId);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID,
isAligned);
// initial all SeriesWriters of measurements in this TSRecord
@@ -363,7 +369,7 @@ public class TsFileWriter implements AutoCloseable {
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
- final IDeviceID deviceID =
IDeviceID.DEFAULT_FACTORY.create(tablet.insertTargetName);
+ final IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.insertTargetName);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID,
isAligned);
List<MeasurementSchema> schemas = tablet.getSchemas();
@@ -488,7 +494,7 @@ public class TsFileWriter implements AutoCloseable {
checkIsTimeseriesExist(record, false);
recordCount +=
groupWriters
- .get(new PlainDeviceID(record.deviceId))
+ .get(IDeviceID.Factory.DEFAULT_FACTORY.create(record.deviceId))
.write(record.time, record.dataPointList);
return checkMemorySizeAndMayFlushChunks();
}
@@ -497,7 +503,7 @@ public class TsFileWriter implements AutoCloseable {
checkIsTimeseriesExist(record, true);
recordCount +=
groupWriters
- .get(new PlainDeviceID(record.deviceId))
+ .get(IDeviceID.Factory.DEFAULT_FACTORY.create(record.deviceId))
.write(record.time, record.dataPointList);
return checkMemorySizeAndMayFlushChunks();
}
@@ -513,7 +519,7 @@ public class TsFileWriter implements AutoCloseable {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, false);
// get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(new
PlainDeviceID(tablet.insertTargetName)).write(tablet);
+ recordCount +=
groupWriters.get(IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.insertTargetName)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
@@ -521,7 +527,7 @@ public class TsFileWriter implements AutoCloseable {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, true);
// get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(new
PlainDeviceID(tablet.insertTargetName)).write(tablet);
+ recordCount +=
groupWriters.get(IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.insertTargetName)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index ecf669f5..08054c22 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -25,7 +25,6 @@ import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
@@ -346,7 +345,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
if (time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
- + ((PlainDeviceID) deviceId).toStringID()
+ + deviceId
+ TsFileConstant.PATH_SEPARATOR
+ ""
+ ", time should later than "
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index da5a635f..3512b8ce 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -22,7 +22,6 @@ import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
@@ -191,7 +190,7 @@ public class NonAlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
- + ((PlainDeviceID) deviceId).toStringID()
+ + deviceId
+ TsFileConstant.PATH_SEPARATOR
+ measurementId
+ ", time should later than "
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
b/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
index c1139d79..aef03fb5 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
@@ -19,7 +19,6 @@
package org.apache.tsfile.write.record;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.StringContainer;
import org.apache.tsfile.write.record.datapoint.DataPoint;
@@ -52,7 +51,7 @@ public class TSRecord {
public TSRecord(long timestamp, IDeviceID deviceId) {
this.time = timestamp;
- this.deviceId = ((PlainDeviceID) deviceId).toStringID();
+ this.deviceId = deviceId.toString();
}
public void setTime(long timestamp) {
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
index 0e007bb7..65850cbf 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
@@ -95,20 +95,31 @@ public class Tablet {
this(insertTargetName, schemas, DEFAULT_SIZE);
}
- /**
- * Return a {@link Tablet} with the specified number of rows (maxBatchSize).
Only call this
- * constructor directly for testing purposes. {@link Tablet} should normally
always be default
- * size.
- *
- * @param insertTargetName the name of the device specified to be written in
- * @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
- * measurementId and type take effects
- * @param maxRowNumber the maximum number of rows for this tablet
- */
public Tablet(String insertTargetName, List<MeasurementSchema> schemas, int
maxRowNumber) {
+ this(insertTargetName, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT,
schemas.size()),
+ maxRowNumber);
+ }
+
+ public Tablet(String insertTargetName, List<MeasurementSchema> schemas,
+ List<ColumnType> columnTypes) {
+ this(insertTargetName, schemas, columnTypes, DEFAULT_SIZE);
+ }
+
+ /**
+ * Return a {@link Tablet} with the specified number of rows
(maxBatchSize). Only call this
+ * constructor directly for testing purposes. {@link Tablet} should
normally always be default
+ * size.
+ *
+ * @param insertTargetName the name of the device specified to be written
in
+ * @param schemas the list of {@link MeasurementSchema}s for creating the
row batch, only
+ * measurementId and type take effects
+ * @param maxRowNumber the maximum number of rows for this tablet
+ */
+ public Tablet(String insertTargetName, List<MeasurementSchema> schemas,
+ List<ColumnType> columnTypes, int maxRowNumber) {
this.insertTargetName = insertTargetName;
this.schemas = new ArrayList<>(schemas);
- setColumnTypes(ColumnType.nCopy(ColumnType.MEASUREMENT, schemas.size()));
+ setColumnTypes(columnTypes);
this.maxRowNumber = maxRowNumber;
measurementIndex = new HashMap<>();
constructMeasurementIndexMap();
@@ -858,10 +869,11 @@ public class Tablet {
* @return the IDeviceID of the i-th row.
*/
public IDeviceID getDeviceID(int i) {
- String[] idArray = new String[idColumnRange];
+ String[] idArray = new String[idColumnRange + 1];
+ idArray[0] = insertTargetName;
for (int j = 0; j < idColumnRange; j++) {
final Object value = getValue(i, j);
- idArray[j] = value != null ? value.toString() : null;
+ idArray[j + 1] = value != null ? value.toString() : null;
}
return new StringArrayDeviceID(idArray);
}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
index 9a4689b6..900b287b 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
@@ -22,6 +22,7 @@ import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.LogicalTableSchema;
import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
import java.io.Serializable;
@@ -54,6 +55,11 @@ public class Schema implements Serializable {
this.registeredTimeseries = knownSchema;
}
+ @Deprecated
+ public void registerTimeseries(Path devicePath, MeasurementSchema
measurementSchema) {
+
registerTimeseries(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()),
+ measurementSchema);
+ }
// This method can only register nonAligned timeseries.
public void registerTimeseries(IDeviceID deviceID, MeasurementSchema
measurementSchema) {
MeasurementGroup group =
@@ -62,6 +68,12 @@ public class Schema implements Serializable {
this.registeredTimeseries.put(deviceID, group);
}
+ @Deprecated
+ public void registerMeasurementGroup(Path devicePath, MeasurementGroup
measurementGroup) {
+
this.registeredTimeseries.put(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()),
+ measurementGroup);
+ }
+
public void registerMeasurementGroup(IDeviceID deviceID, MeasurementGroup
measurementGroup) {
this.registeredTimeseries.put(deviceID, measurementGroup);
}
@@ -89,6 +101,10 @@ public class Schema implements Serializable {
this.schemaTemplates.put(templateName, measurementGroup);
}
+ public void registerDevice(String deviceIdString, String templateName) {
+ registerDevice(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdString),
templateName);
+ }
+
public void registerDevice(IDeviceID deviceId, String templateName) {
if (!schemaTemplates.containsKey(templateName)) {
return;
@@ -99,6 +115,11 @@ public class Schema implements Serializable {
registerMeasurementGroup(deviceId, new MeasurementGroup(isAligned,
template));
}
+ @Deprecated
+ public MeasurementGroup getSeriesSchema(Path devicePath) {
+ return
registeredTimeseries.get(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath.getDevice()));
+ }
+
public MeasurementGroup getSeriesSchema(IDeviceID devicePath) {
return registeredTimeseries.get(devicePath);
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 289c92b9..01f03004 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -411,7 +411,6 @@ public class TsFileIOWriter implements AutoCloseable {
// build bloom filter
filter.add(currentPath.getFullPath());
// construct the index tree node for the series
-
currentDevice = currentPath.getIDeviceID();
if (!currentDevice.equals(prevDevice)) {
if (prevDevice != null) {
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriterEndFileTest.java
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriterEndFileTest.java
index 27e9e104..f10d82ed 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriterEndFileTest.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriterEndFileTest.java
@@ -20,7 +20,7 @@
package org.apache.tsfile.write.writer;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.write.chunk.ChunkWriterImpl;
@@ -32,7 +32,7 @@ public class TsFileIOWriterEndFileTest {
public static void main(String[] args) throws Exception {
try (TsFileIOWriter writer = new TsFileIOWriter(new File("test.tsfile"))) {
for (int deviceIndex = 0; deviceIndex < 1000; deviceIndex++) {
- writer.startChunkGroup(new PlainDeviceID("root.sg.d" + deviceIndex));
+
writer.startChunkGroup(IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d" +
deviceIndex));
for (int seriesIndex = 0; seriesIndex < 1000; seriesIndex++) {
ChunkWriterImpl chunkWriter =
new ChunkWriterImpl(
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/TSMIterator.java
b/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/TSMIterator.java
index 123618fb..530c8581 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/TSMIterator.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -23,7 +23,6 @@ import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Path;
@@ -122,7 +121,7 @@ public class TSMIterator {
.get(chunkGroupMetadata.getDevice())
.computeIfAbsent(
new Path(
- ((PlainDeviceID)
chunkGroupMetadata.getDevice()).toStringID(),
+ chunkGroupMetadata.getDevice(),
chunkMetadata.getMeasurementUid(),
false),
x -> new ArrayList<>())
@@ -135,7 +134,7 @@ public class TSMIterator {
.computeIfAbsent(currentDevice, x -> new TreeMap<>())
.computeIfAbsent(
new Path(
- ((PlainDeviceID) currentDevice).toStringID(),
+ currentDevice,
chunkMetadata.getMeasurementUid(),
false),
x -> new ArrayList<>())
diff --git a/tsfile/src/test/java/org/apache/tsfile/read/GetAllDevicesTest.java
b/tsfile/src/test/java/org/apache/tsfile/read/GetAllDevicesTest.java
index 15660d45..49c0ddb0 100644
--- a/tsfile/src/test/java/org/apache/tsfile/read/GetAllDevicesTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/read/GetAllDevicesTest.java
@@ -22,7 +22,6 @@ package org.apache.tsfile.read;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.FileGenerator;
import org.junit.After;
@@ -79,7 +78,7 @@ public class GetAllDevicesTest {
Assert.assertEquals(deviceNum, devices.size());
for (int i = 0; i < deviceNum; i++) {
Assert.assertEquals(
- new PlainDeviceID("d" + FileGenerator.generateIndexString(i,
deviceNum)),
+ IDeviceID.Factory.DEFAULT_FACTORY.create("d" +
FileGenerator.generateIndexString(i, deviceNum)),
devices.get(i));
}
diff --git
a/tsfile/src/test/java/org/apache/tsfile/read/MeasurementChunkMetadataListMapIteratorTest.java
b/tsfile/src/test/java/org/apache/tsfile/read/MeasurementChunkMetadataListMapIteratorTest.java
index 42a98d5d..d4fc4e0e 100644
---
a/tsfile/src/test/java/org/apache/tsfile/read/MeasurementChunkMetadataListMapIteratorTest.java
+++
b/tsfile/src/test/java/org/apache/tsfile/read/MeasurementChunkMetadataListMapIteratorTest.java
@@ -24,7 +24,6 @@ import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.FileGenerator;
@@ -158,7 +157,7 @@ public class MeasurementChunkMetadataListMapIteratorTest {
// test not exist device
Iterator<Map<String, List<ChunkMetadata>>> iterator =
- fileReader.getMeasurementChunkMetadataListMapIterator(new
PlainDeviceID("dd"));
+
fileReader.getMeasurementChunkMetadataListMapIterator(IDeviceID.Factory.DEFAULT_FACTORY.create("dd"));
Assert.assertFalse(iterator.hasNext());
}
diff --git a/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java
b/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java
index 19e48a0c..b6298665 100644
--- a/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/read/TsFileReaderTest.java
@@ -489,7 +489,7 @@ public class TsFileReaderTest {
String filePath = TsFileGeneratorForTest.alignedOutputDataFile;
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
// query for non-exist device
- IDeviceID d3 = new PlainDeviceID("d3");
+ IDeviceID d3 = IDeviceID.Factory.DEFAULT_FACTORY.create("d3");
try {
reader.getAlignedChunkMetadata(d3);
} catch (IOException e) {
@@ -497,7 +497,7 @@ public class TsFileReaderTest {
}
// query for non-aligned device
- IDeviceID d2 = new PlainDeviceID("d2");
+ IDeviceID d2 = IDeviceID.Factory.DEFAULT_FACTORY.create("d2");
try {
reader.getAlignedChunkMetadata(d2);
} catch (IOException e) {
@@ -507,7 +507,7 @@ public class TsFileReaderTest {
String[] expected = new String[] {"s1", "s2", "s3", "s4"};
List<AlignedChunkMetadata> chunkMetadataList =
- reader.getAlignedChunkMetadata(new PlainDeviceID("d1"));
+
reader.getAlignedChunkMetadata(IDeviceID.Factory.DEFAULT_FACTORY.create("d1"));
AlignedChunkMetadata alignedChunkMetadata = chunkMetadataList.get(0);
Assert.assertEquals("",
alignedChunkMetadata.getTimeChunkMetadata().getMeasurementUid());
int i = 0;
diff --git
a/tsfile/src/test/java/org/apache/tsfile/read/TsFileSequenceReaderTest.java
b/tsfile/src/test/java/org/apache/tsfile/read/TsFileSequenceReaderTest.java
index 0e4f5957..c9fe2663 100644
--- a/tsfile/src/test/java/org/apache/tsfile/read/TsFileSequenceReaderTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/read/TsFileSequenceReaderTest.java
@@ -119,7 +119,7 @@ public class TsFileSequenceReaderTest {
// test for exist device "d2"
Map<String, List<ChunkMetadata>> chunkMetadataMap =
- reader.readChunkMetadataInDevice(new PlainDeviceID("d2"));
+
reader.readChunkMetadataInDevice(IDeviceID.Factory.DEFAULT_FACTORY.create("d2"));
int[] res = new int[] {20, 75, 100, 13};
Assert.assertEquals(4, chunkMetadataMap.size());
@@ -134,7 +134,7 @@ public class TsFileSequenceReaderTest {
}
// test for non-exist device "d3"
- Assert.assertTrue(reader.readChunkMetadataInDevice(new
PlainDeviceID("d3")).isEmpty());
+
Assert.assertTrue(reader.readChunkMetadataInDevice(IDeviceID.Factory.DEFAULT_FACTORY.create("d3")).isEmpty());
reader.close();
}
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/MetadataIndexConstructorTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/MetadataIndexConstructorTest.java
index c286c73e..95b206e8 100644
---
a/tsfile/src/test/java/org/apache/tsfile/write/MetadataIndexConstructorTest.java
+++
b/tsfile/src/test/java/org/apache/tsfile/write/MetadataIndexConstructorTest.java
@@ -101,7 +101,7 @@ public class MetadataIndexConstructorTest {
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
- devices[i] = new PlainDeviceID("d" + i);
+ devices[i] = IDeviceID.Factory.DEFAULT_FACTORY.create("d" + i);
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
@@ -120,7 +120,7 @@ public class MetadataIndexConstructorTest {
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
- devices[i] = new PlainDeviceID("d" + i);
+ devices[i] = IDeviceID.Factory.DEFAULT_FACTORY.create("d" + i);
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
@@ -139,7 +139,7 @@ public class MetadataIndexConstructorTest {
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
- devices[i] = new PlainDeviceID("d" + generateIndexString(i, deviceNum));
+ devices[i] = IDeviceID.Factory.DEFAULT_FACTORY.create("d" +
generateIndexString(i, deviceNum));
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
@@ -158,7 +158,7 @@ public class MetadataIndexConstructorTest {
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
- devices[i] = new PlainDeviceID("d" + generateIndexString(i, deviceNum));
+ devices[i] = IDeviceID.Factory.DEFAULT_FACTORY.create("d" +
generateIndexString(i, deviceNum));
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
@@ -171,7 +171,7 @@ public class MetadataIndexConstructorTest {
/** Example 5: 1 entities with 1 vector containing 9 measurements */
@Test
public void vectorIndexTest() {
- IDeviceID[] devices = {new PlainDeviceID("d0")};
+ IDeviceID[] devices = {IDeviceID.Factory.DEFAULT_FACTORY.create("d0")};
int[][] vectorMeasurement = {{9}};
test(devices, vectorMeasurement, null);
}
@@ -183,7 +183,7 @@ public class MetadataIndexConstructorTest {
*/
@Test
public void compositeIndexTest() {
- IDeviceID[] devices = {new PlainDeviceID("d0"), new PlainDeviceID("d1")};
+ IDeviceID[] devices = {IDeviceID.Factory.DEFAULT_FACTORY.create("d0"),
IDeviceID.Factory.DEFAULT_FACTORY.create("d1")};
int[][] vectorMeasurement = {{}, {4}};
String[][] singleMeasurement = {
{"s0", "s1", "s2", "s3", "s4", "z0", "z1", "z2", "z3"},
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/TableViewWriteTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/TableViewWriteTest.java
index fb2db4bc..a9133af1 100644
--- a/tsfile/src/test/java/org/apache/tsfile/write/TableViewWriteTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/write/TableViewWriteTest.java
@@ -93,12 +93,17 @@ public class TableViewWriteTest {
final TsBlockReader reader =
tableQueryExecutor.query(testTableSchema.getTableName(), columns,
null, null, null);
assertTrue(reader.hasNext());
- final TsBlock result = reader.next();
- assertEquals(100, result.getPositionCount());
+ int cnt = 0;
+ while (reader.hasNext()) {
+ final TsBlock result = reader.next();
+ cnt += result.getPositionCount();
+ }
+ assertEquals(100, cnt);
}
private Tablet genTablet(TableSchema tableSchema, int offset, int num) {
- Tablet tablet = new Tablet(tableSchema.getTableName(),
tableSchema.getColumnSchemas());
+ Tablet tablet = new Tablet(tableSchema.getTableName(),
tableSchema.getColumnSchemas(),
+ tableSchema.getColumnTypes());
for (int i = 0; i < num; i++) {
tablet.addTimestamp(i, offset + i);
for (MeasurementSchema columnSchema : tableSchema.getColumnSchemas()) {
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
index e43839eb..8c81554e 100644
--- a/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
@@ -57,8 +57,8 @@ public class TsFileIOWriterTest {
private static final String FILE_PATH =
TestConstant.BASE_OUTPUT_PATH.concat("TsFileIOWriterTest.tsfile");
- private static final IDeviceID DEVICE_1 = new PlainDeviceID("device1");
- private static final IDeviceID DEVICE_2 = new PlainDeviceID("device2");
+ private static final IDeviceID DEVICE_1 =
IDeviceID.Factory.DEFAULT_FACTORY.create("device1");
+ private static final IDeviceID DEVICE_2 =
IDeviceID.Factory.DEFAULT_FACTORY.create("device2");
private static final String SENSOR_1 = "sensor1";
private static final int CHUNK_GROUP_NUM = 2;
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 4e5607a6..2bbcf95b 100644
--- a/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -26,6 +26,7 @@ import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
@@ -551,7 +552,7 @@ public class TsFileWriteApiTest {
writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
TsFileIOWriter tsFileIOWriter = tsFileWriter.getIOWriter();
- tsFileIOWriter.startChunkGroup(new PlainDeviceID(deviceId));
+
tsFileIOWriter.startChunkGroup(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId));
AlignedChunkWriterImpl alignedChunkWriter =
new AlignedChunkWriterImpl(writeMeasurementScheams);
@@ -630,9 +631,9 @@ public class TsFileWriteApiTest {
File file = FSFactoryProducer.getFSFactory().getFile("test.tsfile");
try (TsFileSequenceReader reader = new
TsFileSequenceReader(f.getAbsolutePath());
TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(file)) {
- tsFileIOWriter.startChunkGroup(new PlainDeviceID(deviceId));
+
tsFileIOWriter.startChunkGroup(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId));
for (List<ChunkMetadata> chunkMetadatas :
- reader.readChunkMetadataInDevice(new
PlainDeviceID(deviceId)).values()) {
+
reader.readChunkMetadataInDevice(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)).values())
{
for (ChunkMetadata chunkMetadata : chunkMetadatas) {
Chunk chunk = reader.readMemChunk(chunkMetadata);
ByteBuffer chunkDataBuffer = chunk.getData();
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 5752d564..6b12b38e 100644
---
a/tsfile/src/test/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++
b/tsfile/src/test/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotCompatibleTsFileException;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -132,7 +133,7 @@ public class RestorableTsFileIOWriterTest {
@Test
public void testOnlyOneChunkHeader() throws Exception {
TsFileWriter writer = new TsFileWriter(file);
- writer.getIOWriter().startChunkGroup(new PlainDeviceID("root.sg1.d1"));
+
writer.getIOWriter().startChunkGroup(IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg1.d1"));
writer
.getIOWriter()
.startFlushChunk(
diff --git
a/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
b/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
index 4b161ff8..128e9f36 100644
---
a/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
+++
b/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -72,7 +72,7 @@ public class TsFileIOWriterMemoryControlTest {
init = true;
for (int i = 0; i < 2048; ++i) {
sortedSeriesId.add("s" + i);
- sortedDeviceId.add(new PlainDeviceID("root.sg.d" + i));
+
sortedDeviceId.add(IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d" + i));
}
sortedSeriesId.sort((String::compareTo));
sortedDeviceId.sort((IDeviceID::compareTo));