This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 27af40628 Add tsblock tsfile writer (#753)
27af40628 is described below
commit 27af4062824c81164cbf0c7701b1f758e5a8e8eb
Author: shuwenwei <[email protected]>
AuthorDate: Fri Mar 27 11:27:16 2026 +0800
Add tsblock tsfile writer (#753)
---
.../write/chunk/AlignedChunkGroupWriterImpl.java | 24 +-
.../tsfile/write/v4/TableTsBlock2TsFileWriter.java | 270 +++++++++++++++++++++
.../writer/TableTsBlock2TsFileWriterTest.java | 210 ++++++++++++++++
3 files changed, 492 insertions(+), 12 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 5fd9dde7d..87464239f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -51,20 +51,20 @@ import java.util.Set;
import java.util.stream.Collectors;
public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
- private static final Logger LOG =
LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
- private final IDeviceID deviceId;
+ protected final IDeviceID deviceId;
// measurementID -> ValueChunkWriter
- private final Map<String, ValueChunkWriter> valueChunkWriterMap = new
LinkedHashMap<>();
+ protected final Map<String, ValueChunkWriter> valueChunkWriterMap = new
LinkedHashMap<>();
- private final TimeChunkWriter timeChunkWriter;
+ protected final TimeChunkWriter timeChunkWriter;
- private final EncryptParameter encryprParam;
+ protected final EncryptParameter encryprParam;
- private long lastTime = Long.MIN_VALUE;
- private boolean isInitLastTime = false;
- private boolean convertColumnNameToLowerCase = false;
+ protected long lastTime = Long.MIN_VALUE;
+ protected boolean isInitLastTime = false;
+ protected boolean convertColumnNameToLowerCase = false;
public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
@@ -392,7 +392,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
* check occupied memory size, if it exceeds the PageSize threshold,
construct a page and put it
* to pageBuffer
*/
- private boolean checkPageSizeAndMayOpenANewPage() {
+ protected boolean checkPageSizeAndMayOpenANewPage() {
if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
return true;
}
@@ -404,21 +404,21 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
return false;
}
- private void writePageToPageBuffer() {
+ protected void writePageToPageBuffer() {
timeChunkWriter.writePageToPageBuffer();
for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
valueChunkWriter.writePageToPageBuffer();
}
}
- private void sealAllChunks() {
+ protected void sealAllChunks() {
timeChunkWriter.sealCurrentPage();
for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
valueChunkWriter.sealCurrentPage();
}
}
- private void checkIsHistoryData(long time) throws WriteProcessException {
+ protected void checkIsHistoryData(long time) throws WriteProcessException {
if (isInitLastTime && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
new file mode 100644
index 000000000..17c610e8d
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1.
Split TsBlock by
+ * device (based on tag columns) 2. Optionally generate a new time column per
device 3. Dispatch
+ * rows to corresponding ChunkGroupWriter
+ */
+public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter {
+
+ private final boolean generateNewTimeColumn;
+ private final int timeColumnIndexInTsBlock;
+ private final int[] tagColumnIndexInTsBlock;
+ private final int[] fieldColumnIndexInTsBlock;
+ private final IMeasurementSchema[] fieldColumnSchemas;
+
+ private final String tableName;
+ private final Map<IDeviceID, Long> deviceRowCountMap;
+
+ private int rowCount = 0;
+
+ public TableTsBlock2TsFileWriter(
+ File file,
+ TableSchema tableSchema,
+ long memoryThreshold,
+ boolean generateNewTimeColumn,
+ int timeColumnIndexInTsBlock,
+ int[] tagColumnIndexesInTsBlock,
+ int[] fieldColumnIndexesInTsBlock,
+ IMeasurementSchema[] fieldColumnSchemas)
+ throws IOException {
+ super(file, tableSchema, memoryThreshold);
+ this.tableName = tableSchema.getTableName();
+ this.generateNewTimeColumn = generateNewTimeColumn;
+ this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock;
+ this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock;
+ this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock;
+ this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null;
+ this.fieldColumnSchemas = fieldColumnSchemas;
+ }
+
+ public void write(TsBlock tsBlock) throws IOException, WriteProcessException
{
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ return;
+ }
+ // Split TsBlock into device partitions and prepare time column
+ Pair<Column, List<Pair<IDeviceID, Integer>>>
timeColumnAndDeviceIdEndIndexPairs =
+ splitTsBlockByDeviceAndGetTimeColumn(tsBlock);
+ Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left;
+ // Extract value columns according to schema mapping
+ Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length];
+ for (int i = 0; i < valueColumns.length; i++) {
+ valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]);
+ }
+ List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs =
timeColumnAndDeviceIdEndIndexPairs.right;
+ int startIndex = 0;
+
+ // Iterate each device segment and write data into its ChunkGroup
+ for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+ TableTsBlockChunkGroupWriterImpl chunkGroupWriter =
+ (TableTsBlockChunkGroupWriterImpl)
tryToInitialGroupWriter(pair.left, true, true);
+ int writeCount = chunkGroupWriter.write(timeColumn, valueColumns,
startIndex, pair.right);
+ rowCount += writeCount;
+ recordCount += writeCount;
+ startIndex = pair.right;
+ }
+
+ this.checkMemorySizeAndMayFlushChunks();
+ }
+
+ /**
+ * Split TsBlock by device boundary. If generateNewTimeColumn is true,
generate a monotonically
+ * increasing time column per device using deviceRowCountMap.
+ *
+ * @return Pair of (time column, device -> end index list)
+ */
+ private Pair<Column, List<Pair<IDeviceID, Integer>>>
splitTsBlockByDeviceAndGetTimeColumn(
+ TsBlock tsBlock) {
+ long[] timestamps = null;
+ if (generateNewTimeColumn) {
+ timestamps = new long[tsBlock.getPositionCount()];
+ }
+ List<Pair<IDeviceID, Integer>> deviceSplitResult = new ArrayList<>();
+ IDeviceID lastDeviceID = null;
+ long lastDeviceCount = 0;
+
+ // Iterate rows and detect device boundary changes
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ IDeviceID currDeviceID = getDeviceId(tsBlock, i);
+ // Device changed, flush previous segment
+ if (!currDeviceID.equals(lastDeviceID)) {
+ if (lastDeviceID != null) {
+ deviceSplitResult.add(new Pair(lastDeviceID, i));
+ if (generateNewTimeColumn) {
+ deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+ }
+ }
+ lastDeviceID = currDeviceID;
+ if (generateNewTimeColumn) {
+ lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L);
+ }
+ }
+ // Generate synthetic time if required
+ if (generateNewTimeColumn) {
+ timestamps[i] = lastDeviceCount++;
+ }
+ }
+
+ deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount()));
+ if (generateNewTimeColumn) {
+ deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+ return new Pair<>(new TimeColumn(timestamps.length, timestamps),
deviceSplitResult);
+ } else {
+ return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock),
deviceSplitResult);
+ }
+ }
+
+ private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) {
+ String[] segments = new String[tagColumnIndexInTsBlock.length + 1];
+ segments[0] = tableName;
+ for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) {
+ Column tagColumn = tsBlock.getColumn(tagColumnIndexInTsBlock[i]);
+ if (tagColumn.isNull(rowIdx)) {
+ segments[i + 1] = null;
+ } else {
+ segments[i + 1] =
tagColumn.getBinary(rowIdx).getStringValue(TSFileConfig.STRING_CHARSET);
+ }
+ }
+ return new StringArrayDeviceID(segments);
+ }
+
+ @Override
+ protected IChunkGroupWriter tryToInitialGroupWriter(
+ IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws
IOException {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
+ groupWriter = new TableTsBlockChunkGroupWriterImpl(deviceId);
+ ((AlignedChunkGroupWriterImpl) groupWriter)
+ .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ groupWriters.put(deviceId, groupWriter);
+ }
+ return groupWriter;
+ }
+
+ public int getDeviceCount() {
+ return alignedDeviceLastTimeMap.size();
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ /**
+ * ChunkGroup writer for a single device. Responsible for writing time
column and multiple value
+ * columns.
+ */
+ private class TableTsBlockChunkGroupWriterImpl extends
TableChunkGroupWriterImpl {
+ private final ValueChunkWriter[] valueChunkWriters;
+
+ public TableTsBlockChunkGroupWriterImpl(IDeviceID deviceId) throws
IOException {
+ super(deviceId);
+ // Initialize ValueChunkWriter for each measurement
+ this.valueChunkWriters = new ValueChunkWriter[fieldColumnSchemas.length];
+ for (int i = 0; i < fieldColumnSchemas.length; i++) {
+ valueChunkWriters[i] =
tryToAddSeriesWriterInternal(fieldColumnSchemas[i]);
+ }
+ }
+
+ /**
+ * Write a range of rows into chunk group.
+ *
+ * @param startRowIndex inclusive
+ * @param endRowIndex exclusive
+ */
+ public int write(Column timeColumn, Column[] valueColumns, int
startRowIndex, int endRowIndex)
+ throws WriteProcessException {
+ int pointCount = 0;
+ for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) {
+ if (timeColumn.isNull(rowIndex)) {
+ throw new WriteProcessException("All values in time column should
not be null");
+ }
+ long time = timeColumn.getLong(rowIndex);
+ checkIsHistoryData(time);
+ for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length;
valueColumnIndex++) {
+ Column valueColumn = valueColumns[valueColumnIndex];
+ ValueChunkWriter valueChunkWriter =
valueChunkWriters[valueColumnIndex];
+ boolean isNull = valueColumn.isNull(rowIndex);
+ switch (valueChunkWriter.getDataType()) {
+ case BOOLEAN:
+ valueChunkWriter.write(
+ time, isNull ? false : valueColumn.getBoolean(rowIndex),
isNull);
+ break;
+ case INT32:
+ case DATE:
+ valueChunkWriter.write(time, isNull ? 0 :
valueColumn.getInt(rowIndex), isNull);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ valueChunkWriter.write(time, isNull ? 0 :
valueColumn.getLong(rowIndex), isNull);
+ break;
+ case FLOAT:
+ valueChunkWriter.write(time, isNull ? 0 :
valueColumn.getFloat(rowIndex), isNull);
+ break;
+ case DOUBLE:
+ valueChunkWriter.write(time, isNull ? 0 :
valueColumn.getDouble(rowIndex), isNull);
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ valueChunkWriter.write(time, isNull ? null :
valueColumn.getBinary(rowIndex), isNull);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Data type %s is not supported.",
valueChunkWriter.getDataType().getType()));
+ }
+ }
+ timeChunkWriter.write(time);
+ lastTime = time;
+ isInitLastTime = true;
+ if (checkPageSizeAndMayOpenANewPage()) {
+ writePageToPageBuffer();
+ }
+ pointCount++;
+ }
+ return pointCount;
+ }
+ }
+}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
new file mode 100644
index 000000000..80320d13d
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.TsFileGeneratorForTest;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class TableTsBlock2TsFileWriterTest {
+
+ private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0,
0, 0);
+
+ @After
+ public void tearDown() throws Exception {
+ Files.deleteIfExists(Paths.get(filePath));
+ }
+
+ @Test
+ public void testWriteWithExistingTimeColumnAndTagColumns()
+ throws IOException, WriteProcessException {
+ TableSchema tableSchema = getTableSchema();
+ TsBlock tsBlock = getTsBlock();
+ TableTsBlock2TsFileWriter writer =
+ new TableTsBlock2TsFileWriter(
+ new File(filePath),
+ tableSchema,
+ 32 * 1024 * 1024,
+ false,
+ 0,
+ new int[] {1},
+ new int[] {2, 3, 4},
+ new IMeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32),
+ });
+ writer.write(tsBlock);
+ writer.close();
+
+ Assert.assertEquals(100, writer.getRowCount());
+ Assert.assertEquals(10, writer.getDeviceCount());
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+ Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+ reader.getAllTimeseriesMetadata(false);
+ Assert.assertEquals(10, deviceTimeseriesMetadataMap.size());
+ for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+ deviceTimeseriesMetadataMap.entrySet()) {
+ Assert.assertEquals(4, entry.getValue().size());
+ Assert.assertEquals(0,
entry.getValue().get(0).getStatistics().getStartTime());
+ Assert.assertEquals(9,
entry.getValue().get(1).getStatistics().getEndTime());
+ }
+ }
+ }
+
+ @Test
+ public void testWriteWithExistingTagColumns() throws IOException,
WriteProcessException {
+ TableSchema tableSchema = getTableSchema();
+ TableTsBlock2TsFileWriter writer =
+ new TableTsBlock2TsFileWriter(
+ new File(filePath),
+ tableSchema,
+ 32 * 1024 * 1024,
+ true,
+ -1,
+ new int[] {1},
+ new int[] {0, 2, 3, 4},
+ new IMeasurementSchema[] {
+ new MeasurementSchema("t", TSDataType.TIMESTAMP),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32),
+ });
+ writer.write(getTsBlock());
+ writer.close();
+
+ Assert.assertEquals(100, writer.getRowCount());
+ Assert.assertEquals(10, writer.getDeviceCount());
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+ Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+ reader.getAllTimeseriesMetadata(false);
+ Assert.assertEquals(10, deviceTimeseriesMetadataMap.size());
+ for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+ deviceTimeseriesMetadataMap.entrySet()) {
+ Assert.assertEquals(5, entry.getValue().size());
+ Assert.assertEquals(0,
entry.getValue().get(0).getStatistics().getStartTime());
+ Assert.assertEquals(9,
entry.getValue().get(1).getStatistics().getEndTime());
+ }
+ }
+ }
+
+ @Test
+ public void testWriteWithoutTimeColumnAndTagColumns() throws IOException,
WriteProcessException {
+ TableSchema tableSchema = getTableSchema();
+ TableTsBlock2TsFileWriter writer =
+ new TableTsBlock2TsFileWriter(
+ new File(filePath),
+ tableSchema,
+ 32 * 1024 * 1024,
+ true,
+ -1,
+ new int[0],
+ new int[] {0, 1, 2, 3, 4},
+ new IMeasurementSchema[] {
+ new MeasurementSchema("t", TSDataType.TIMESTAMP),
+ new MeasurementSchema("device", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32),
+ });
+ writer.write(getTsBlock());
+ writer.close();
+
+ Assert.assertEquals(100, writer.getRowCount());
+ Assert.assertEquals(1, writer.getDeviceCount());
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+ Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+ reader.getAllTimeseriesMetadata(false);
+ Assert.assertEquals(1, deviceTimeseriesMetadataMap.size());
+ for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+ deviceTimeseriesMetadataMap.entrySet()) {
+ Assert.assertEquals(6, entry.getValue().size());
+ Assert.assertEquals(0,
entry.getValue().get(0).getStatistics().getStartTime());
+ Assert.assertEquals(99,
entry.getValue().get(1).getStatistics().getEndTime());
+ }
+ }
+ }
+
+ private TableSchema getTableSchema() {
+ return new TableSchema(
+ "t1",
+ Arrays.asList(
+ new MeasurementSchema("device", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32)),
+ Arrays.asList(
+ ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD,
ColumnCategory.FIELD));
+ }
+
+ private TsBlock getTsBlock() {
+ TsBlockBuilder tsBlockBuilder =
+ new TsBlockBuilder(
+ Arrays.asList(
+ TSDataType.TIMESTAMP,
+ TSDataType.STRING,
+ TSDataType.INT32,
+ TSDataType.INT32,
+ TSDataType.INT32));
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0);
+ tsBlockBuilder.getValueColumnBuilders()[0].writeLong(j);
+ tsBlockBuilder.getValueColumnBuilders()[1].writeBinary(
+ new Binary("device" + i, TSFileConfig.STRING_CHARSET));
+ tsBlockBuilder.getValueColumnBuilders()[2].writeInt(j);
+ tsBlockBuilder.getValueColumnBuilders()[3].writeInt(j);
+ tsBlockBuilder.getValueColumnBuilders()[4].writeInt(j);
+ tsBlockBuilder.declarePosition();
+ }
+ }
+ return tsBlockBuilder.build();
+ }
+}