This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 27af40628 Add tsblock tsfile writer (#753)
27af40628 is described below

commit 27af4062824c81164cbf0c7701b1f758e5a8e8eb
Author: shuwenwei <[email protected]>
AuthorDate: Fri Mar 27 11:27:16 2026 +0800

    Add tsblock tsfile writer (#753)
---
 .../write/chunk/AlignedChunkGroupWriterImpl.java   |  24 +-
 .../tsfile/write/v4/TableTsBlock2TsFileWriter.java | 270 +++++++++++++++++++++
 .../writer/TableTsBlock2TsFileWriterTest.java      | 210 ++++++++++++++++
 3 files changed, 492 insertions(+), 12 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 5fd9dde7d..87464239f 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -51,20 +51,20 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
 
-  private final IDeviceID deviceId;
+  protected final IDeviceID deviceId;
 
   // measurementID -> ValueChunkWriter
-  private final Map<String, ValueChunkWriter> valueChunkWriterMap = new 
LinkedHashMap<>();
+  protected final Map<String, ValueChunkWriter> valueChunkWriterMap = new 
LinkedHashMap<>();
 
-  private final TimeChunkWriter timeChunkWriter;
+  protected final TimeChunkWriter timeChunkWriter;
 
-  private final EncryptParameter encryprParam;
+  protected final EncryptParameter encryprParam;
 
-  private long lastTime = Long.MIN_VALUE;
-  private boolean isInitLastTime = false;
-  private boolean convertColumnNameToLowerCase = false;
+  protected long lastTime = Long.MIN_VALUE;
+  protected boolean isInitLastTime = false;
+  protected boolean convertColumnNameToLowerCase = false;
 
   public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
     this.deviceId = deviceId;
@@ -392,7 +392,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
    * check occupied memory size, if it exceeds the PageSize threshold, 
construct a page and put it
    * to pageBuffer
    */
-  private boolean checkPageSizeAndMayOpenANewPage() {
+  protected boolean checkPageSizeAndMayOpenANewPage() {
     if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
       return true;
     }
@@ -404,21 +404,21 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
     return false;
   }
 
-  private void writePageToPageBuffer() {
+  protected void writePageToPageBuffer() {
     timeChunkWriter.writePageToPageBuffer();
     for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
       valueChunkWriter.writePageToPageBuffer();
     }
   }
 
-  private void sealAllChunks() {
+  protected void sealAllChunks() {
     timeChunkWriter.sealCurrentPage();
     for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
       valueChunkWriter.sealCurrentPage();
     }
   }
 
-  private void checkIsHistoryData(long time) throws WriteProcessException {
+  protected void checkIsHistoryData(long time) throws WriteProcessException {
     if (isInitLastTime && time <= lastTime) {
       throw new WriteProcessException(
           "Not allowed to write out-of-order data in timeseries "
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
new file mode 100644
index 000000000..17c610e8d
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/TableTsBlock2TsFileWriter.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.v4;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkGroupWriter;
+import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert TsBlock (table model) into TsFile format. Core responsibilities: 1. 
Split TsBlock by
+ * device (based on tag columns) 2. Optionally generate a new time column per 
device 3. Dispatch
+ * rows to corresponding ChunkGroupWriter
+ */
+public class TableTsBlock2TsFileWriter extends DeviceTableModelWriter {
+
+  private final boolean generateNewTimeColumn;
+  private final int timeColumnIndexInTsBlock;
+  private final int[] tagColumnIndexInTsBlock;
+  private final int[] fieldColumnIndexInTsBlock;
+  private final IMeasurementSchema[] fieldColumnSchemas;
+
+  private final String tableName;
+  private final Map<IDeviceID, Long> deviceRowCountMap;
+
+  private int rowCount = 0;
+
+  public TableTsBlock2TsFileWriter(
+      File file,
+      TableSchema tableSchema,
+      long memoryThreshold,
+      boolean generateNewTimeColumn,
+      int timeColumnIndexInTsBlock,
+      int[] tagColumnIndexesInTsBlock,
+      int[] fieldColumnIndexesInTsBlock,
+      IMeasurementSchema[] fieldColumnSchemas)
+      throws IOException {
+    super(file, tableSchema, memoryThreshold);
+    this.tableName = tableSchema.getTableName();
+    this.generateNewTimeColumn = generateNewTimeColumn;
+    this.timeColumnIndexInTsBlock = timeColumnIndexInTsBlock;
+    this.tagColumnIndexInTsBlock = tagColumnIndexesInTsBlock;
+    this.fieldColumnIndexInTsBlock = fieldColumnIndexesInTsBlock;
+    this.deviceRowCountMap = generateNewTimeColumn ? new HashMap<>() : null;
+    this.fieldColumnSchemas = fieldColumnSchemas;
+  }
+
+  public void write(TsBlock tsBlock) throws IOException, WriteProcessException 
{
+    if (tsBlock == null || tsBlock.isEmpty()) {
+      return;
+    }
+    // Split TsBlock into device partitions and prepare time column
+    Pair<Column, List<Pair<IDeviceID, Integer>>> 
timeColumnAndDeviceIdEndIndexPairs =
+        splitTsBlockByDeviceAndGetTimeColumn(tsBlock);
+    Column timeColumn = timeColumnAndDeviceIdEndIndexPairs.left;
+    // Extract value columns according to schema mapping
+    Column[] valueColumns = new Column[fieldColumnIndexInTsBlock.length];
+    for (int i = 0; i < valueColumns.length; i++) {
+      valueColumns[i] = tsBlock.getColumn(fieldColumnIndexInTsBlock[i]);
+    }
+    List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs = 
timeColumnAndDeviceIdEndIndexPairs.right;
+    int startIndex = 0;
+
+    // Iterate each device segment and write data into its ChunkGroup
+    for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
+      TableTsBlockChunkGroupWriterImpl chunkGroupWriter =
+          (TableTsBlockChunkGroupWriterImpl) 
tryToInitialGroupWriter(pair.left, true, true);
+      int writeCount = chunkGroupWriter.write(timeColumn, valueColumns, 
startIndex, pair.right);
+      rowCount += writeCount;
+      recordCount += writeCount;
+      startIndex = pair.right;
+    }
+
+    this.checkMemorySizeAndMayFlushChunks();
+  }
+
+  /**
+   * Split TsBlock by device boundary. If generateNewTimeColumn is true, 
generate a monotonically
+   * increasing time column per device using deviceRowCountMap.
+   *
+   * @return Pair of (time column, device -> end index list)
+   */
+  private Pair<Column, List<Pair<IDeviceID, Integer>>> 
splitTsBlockByDeviceAndGetTimeColumn(
+      TsBlock tsBlock) {
+    long[] timestamps = null;
+    if (generateNewTimeColumn) {
+      timestamps = new long[tsBlock.getPositionCount()];
+    }
+    List<Pair<IDeviceID, Integer>> deviceSplitResult = new ArrayList<>();
+    IDeviceID lastDeviceID = null;
+    long lastDeviceCount = 0;
+
+    // Iterate rows and detect device boundary changes
+    for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+      IDeviceID currDeviceID = getDeviceId(tsBlock, i);
+      // Device changed, flush previous segment
+      if (!currDeviceID.equals(lastDeviceID)) {
+        if (lastDeviceID != null) {
+          deviceSplitResult.add(new Pair(lastDeviceID, i));
+          if (generateNewTimeColumn) {
+            deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+          }
+        }
+        lastDeviceID = currDeviceID;
+        if (generateNewTimeColumn) {
+          lastDeviceCount = deviceRowCountMap.getOrDefault(lastDeviceID, 0L);
+        }
+      }
+      // Generate synthetic time if required
+      if (generateNewTimeColumn) {
+        timestamps[i] = lastDeviceCount++;
+      }
+    }
+
+    deviceSplitResult.add(new Pair(lastDeviceID, tsBlock.getPositionCount()));
+    if (generateNewTimeColumn) {
+      deviceRowCountMap.put(lastDeviceID, lastDeviceCount);
+      return new Pair<>(new TimeColumn(timestamps.length, timestamps), 
deviceSplitResult);
+    } else {
+      return new Pair<>(tsBlock.getColumn(timeColumnIndexInTsBlock), 
deviceSplitResult);
+    }
+  }
+
+  private IDeviceID getDeviceId(TsBlock tsBlock, int rowIdx) {
+    String[] segments = new String[tagColumnIndexInTsBlock.length + 1];
+    segments[0] = tableName;
+    for (int i = 0; i < tagColumnIndexInTsBlock.length; i++) {
+      Column tagColumn = tsBlock.getColumn(tagColumnIndexInTsBlock[i]);
+      if (tagColumn.isNull(rowIdx)) {
+        segments[i + 1] = null;
+      } else {
+        segments[i + 1] = 
tagColumn.getBinary(rowIdx).getStringValue(TSFileConfig.STRING_CHARSET);
+      }
+    }
+    return new StringArrayDeviceID(segments);
+  }
+
+  @Override
+  protected IChunkGroupWriter tryToInitialGroupWriter(
+      IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws 
IOException {
+    IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+    if (groupWriter == null) {
+      groupWriter = new TableTsBlockChunkGroupWriterImpl(deviceId);
+      ((AlignedChunkGroupWriterImpl) groupWriter)
+          .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+      groupWriters.put(deviceId, groupWriter);
+    }
+    return groupWriter;
+  }
+
+  public int getDeviceCount() {
+    return alignedDeviceLastTimeMap.size();
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  /**
+   * ChunkGroup writer for a single device. Responsible for writing time 
column and multiple value
+   * columns.
+   */
+  private class TableTsBlockChunkGroupWriterImpl extends 
TableChunkGroupWriterImpl {
+    private final ValueChunkWriter[] valueChunkWriters;
+
+    public TableTsBlockChunkGroupWriterImpl(IDeviceID deviceId) throws 
IOException {
+      super(deviceId);
+      // Initialize ValueChunkWriter for each measurement
+      this.valueChunkWriters = new ValueChunkWriter[fieldColumnSchemas.length];
+      for (int i = 0; i < fieldColumnSchemas.length; i++) {
+        valueChunkWriters[i] = 
tryToAddSeriesWriterInternal(fieldColumnSchemas[i]);
+      }
+    }
+
+    /**
+     * Write a range of rows into chunk group.
+     *
+     * @param startRowIndex inclusive
+     * @param endRowIndex exclusive
+     */
+    public int write(Column timeColumn, Column[] valueColumns, int 
startRowIndex, int endRowIndex)
+        throws WriteProcessException {
+      int pointCount = 0;
+      for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) {
+        if (timeColumn.isNull(rowIndex)) {
+          throw new WriteProcessException("All values in time column should 
not be null");
+        }
+        long time = timeColumn.getLong(rowIndex);
+        checkIsHistoryData(time);
+        for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; 
valueColumnIndex++) {
+          Column valueColumn = valueColumns[valueColumnIndex];
+          ValueChunkWriter valueChunkWriter = 
valueChunkWriters[valueColumnIndex];
+          boolean isNull = valueColumn.isNull(rowIndex);
+          switch (valueChunkWriter.getDataType()) {
+            case BOOLEAN:
+              valueChunkWriter.write(
+                  time, isNull ? false : valueColumn.getBoolean(rowIndex), 
isNull);
+              break;
+            case INT32:
+            case DATE:
+              valueChunkWriter.write(time, isNull ? 0 : 
valueColumn.getInt(rowIndex), isNull);
+              break;
+            case INT64:
+            case TIMESTAMP:
+              valueChunkWriter.write(time, isNull ? 0 : 
valueColumn.getLong(rowIndex), isNull);
+              break;
+            case FLOAT:
+              valueChunkWriter.write(time, isNull ? 0 : 
valueColumn.getFloat(rowIndex), isNull);
+              break;
+            case DOUBLE:
+              valueChunkWriter.write(time, isNull ? 0 : 
valueColumn.getDouble(rowIndex), isNull);
+              break;
+            case TEXT:
+            case BLOB:
+            case STRING:
+              valueChunkWriter.write(time, isNull ? null : 
valueColumn.getBinary(rowIndex), isNull);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format(
+                      "Data type %s is not supported.", 
valueChunkWriter.getDataType().getType()));
+          }
+        }
+        timeChunkWriter.write(time);
+        lastTime = time;
+        isInitLastTime = true;
+        if (checkPageSizeAndMayOpenANewPage()) {
+          writePageToPageBuffer();
+        }
+        pointCount++;
+      }
+      return pointCount;
+    }
+  }
+}
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
new file mode 100644
index 000000000..80320d13d
--- /dev/null
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TableTsBlock2TsFileWriterTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.TsFileGeneratorForTest;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class TableTsBlock2TsFileWriterTest {
+
+  private String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 
0, 0);
+
+  @After
+  public void tearDown() throws Exception {
+    Files.deleteIfExists(Paths.get(filePath));
+  }
+
+  @Test
+  public void testWriteWithExistingTimeColumnAndTagColumns()
+      throws IOException, WriteProcessException {
+    TableSchema tableSchema = getTableSchema();
+    TsBlock tsBlock = getTsBlock();
+    TableTsBlock2TsFileWriter writer =
+        new TableTsBlock2TsFileWriter(
+            new File(filePath),
+            tableSchema,
+            32 * 1024 * 1024,
+            false,
+            0,
+            new int[] {1},
+            new int[] {2, 3, 4},
+            new IMeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT32),
+              new MeasurementSchema("s3", TSDataType.INT32),
+            });
+    writer.write(tsBlock);
+    writer.close();
+
+    Assert.assertEquals(100, writer.getRowCount());
+    Assert.assertEquals(10, writer.getDeviceCount());
+
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+      Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+      Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+          reader.getAllTimeseriesMetadata(false);
+      Assert.assertEquals(10, deviceTimeseriesMetadataMap.size());
+      for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+          deviceTimeseriesMetadataMap.entrySet()) {
+        Assert.assertEquals(4, entry.getValue().size());
+        Assert.assertEquals(0, 
entry.getValue().get(0).getStatistics().getStartTime());
+        Assert.assertEquals(9, 
entry.getValue().get(1).getStatistics().getEndTime());
+      }
+    }
+  }
+
+  @Test
+  public void testWriteWithExistingTagColumns() throws IOException, 
WriteProcessException {
+    TableSchema tableSchema = getTableSchema();
+    TableTsBlock2TsFileWriter writer =
+        new TableTsBlock2TsFileWriter(
+            new File(filePath),
+            tableSchema,
+            32 * 1024 * 1024,
+            true,
+            -1,
+            new int[] {1},
+            new int[] {0, 2, 3, 4},
+            new IMeasurementSchema[] {
+              new MeasurementSchema("t", TSDataType.TIMESTAMP),
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT32),
+              new MeasurementSchema("s3", TSDataType.INT32),
+            });
+    writer.write(getTsBlock());
+    writer.close();
+
+    Assert.assertEquals(100, writer.getRowCount());
+    Assert.assertEquals(10, writer.getDeviceCount());
+
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+      Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+      Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+          reader.getAllTimeseriesMetadata(false);
+      Assert.assertEquals(10, deviceTimeseriesMetadataMap.size());
+      for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+          deviceTimeseriesMetadataMap.entrySet()) {
+        Assert.assertEquals(5, entry.getValue().size());
+        Assert.assertEquals(0, 
entry.getValue().get(0).getStatistics().getStartTime());
+        Assert.assertEquals(9, 
entry.getValue().get(1).getStatistics().getEndTime());
+      }
+    }
+  }
+
+  @Test
+  public void testWriteWithoutTimeColumnAndTagColumns() throws IOException, 
WriteProcessException {
+    TableSchema tableSchema = getTableSchema();
+    TableTsBlock2TsFileWriter writer =
+        new TableTsBlock2TsFileWriter(
+            new File(filePath),
+            tableSchema,
+            32 * 1024 * 1024,
+            true,
+            -1,
+            new int[0],
+            new int[] {0, 1, 2, 3, 4},
+            new IMeasurementSchema[] {
+              new MeasurementSchema("t", TSDataType.TIMESTAMP),
+              new MeasurementSchema("device", TSDataType.STRING),
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT32),
+              new MeasurementSchema("s3", TSDataType.INT32),
+            });
+    writer.write(getTsBlock());
+    writer.close();
+
+    Assert.assertEquals(100, writer.getRowCount());
+    Assert.assertEquals(1, writer.getDeviceCount());
+
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+      Assert.assertEquals(tableSchema, reader.getTableSchemaMap().get("t1"));
+      Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
+          reader.getAllTimeseriesMetadata(false);
+      Assert.assertEquals(1, deviceTimeseriesMetadataMap.size());
+      for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
+          deviceTimeseriesMetadataMap.entrySet()) {
+        Assert.assertEquals(6, entry.getValue().size());
+        Assert.assertEquals(0, 
entry.getValue().get(0).getStatistics().getStartTime());
+        Assert.assertEquals(99, 
entry.getValue().get(1).getStatistics().getEndTime());
+      }
+    }
+  }
+
+  private TableSchema getTableSchema() {
+    return new TableSchema(
+        "t1",
+        Arrays.asList(
+            new MeasurementSchema("device", TSDataType.STRING),
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32)),
+        Arrays.asList(
+            ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD, 
ColumnCategory.FIELD));
+  }
+
+  private TsBlock getTsBlock() {
+    TsBlockBuilder tsBlockBuilder =
+        new TsBlockBuilder(
+            Arrays.asList(
+                TSDataType.TIMESTAMP,
+                TSDataType.STRING,
+                TSDataType.INT32,
+                TSDataType.INT32,
+                TSDataType.INT32));
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        tsBlockBuilder.getTimeColumnBuilder().writeLong(0);
+        tsBlockBuilder.getValueColumnBuilders()[0].writeLong(j);
+        tsBlockBuilder.getValueColumnBuilders()[1].writeBinary(
+            new Binary("device" + i, TSFileConfig.STRING_CHARSET));
+        tsBlockBuilder.getValueColumnBuilders()[2].writeInt(j);
+        tsBlockBuilder.getValueColumnBuilders()[3].writeInt(j);
+        tsBlockBuilder.getValueColumnBuilders()[4].writeInt(j);
+        tsBlockBuilder.declarePosition();
+      }
+    }
+    return tsBlockBuilder.build();
+  }
+}

Reply via email to