This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new d3eb61a4 Init all series writer for AlignedChunkGroupWriter
d3eb61a4 is described below
commit d3eb61a43ece95f52240903a2f8d4b8fd4edbf42
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 18 11:12:12 2025 +0800
Init all series writer for AlignedChunkGroupWriter
---
.../java/org/apache/tsfile/write/TsFileWriter.java | 23 ++-
.../write/v4/AbstractTableModelTsFileWriter.java | 6 +-
.../tsfile/write/v4/DeviceTableModelWriter.java | 9 ++
.../read/TsFileV4ReadWriteInterfacesTest.java | 102 ++++++++++++
.../apache/tsfile/write/TsFileWriteApiTest.java | 179 +++++++++++++++++++++
5 files changed, 317 insertions(+), 2 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index d1a13e7c..ff37612e 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -233,6 +233,10 @@ public class TsFileWriter implements AutoCloseable {
}
}
+ public void setChunkGroupSizeThreshold(long chunkGroupSizeThreshold) {
+ this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+ }
+
public void registerSchemaTemplate(
String templateName, Map<String, IMeasurementSchema> template, boolean
isAligned) {
getSchema().registerSchemaTemplate(templateName, new
MeasurementGroup(isAligned, template));
@@ -501,7 +505,7 @@ public class TsFileWriter implements AutoCloseable {
}
private IChunkGroupWriter tryToInitialGroupWriter(
- IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
+ IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws
IOException {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
@@ -509,6 +513,8 @@ public class TsFileWriter implements AutoCloseable {
isTableModel
? new TableChunkGroupWriterImpl(deviceId, encryptParam)
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+ initAllSeriesWriterForAlignedSeries(
+ (AlignedChunkGroupWriterImpl) groupWriter, deviceId, isTableModel);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
@@ -526,6 +532,21 @@ public class TsFileWriter implements AutoCloseable {
return groupWriter;
}
+ private void initAllSeriesWriterForAlignedSeries(
+ AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID,
boolean isTableModel)
+ throws IOException {
+ Schema schema = getSchema();
+ if (isTableModel) {
+ alignedChunkGroupWriter.tryToAddSeriesWriter(
+
schema.getTableSchemaMap().get(deviceID.getTableName()).getColumnSchemas());
+ } else {
+ MeasurementGroup deviceSchema = schema.getSeriesSchema(deviceID);
+ for (IMeasurementSchema measurementSchema :
deviceSchema.getMeasurementSchemaMap().values()) {
+
alignedChunkGroupWriter.tryToAddSeriesWriterInternal(measurementSchema);
+ }
+ }
+ }
+
/**
* write a record in type of T.
*
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
index 92f4c102..3120bb40 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
@@ -146,7 +146,7 @@ abstract class AbstractTableModelTsFileWriter implements
ITsFileWriter {
}
protected IChunkGroupWriter tryToInitialGroupWriter(
- IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
+ IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws
IOException {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
@@ -156,6 +156,7 @@ abstract class AbstractTableModelTsFileWriter implements
ITsFileWriter {
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+ initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl)
groupWriter);
} else {
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId,
encryptParam);
((NonAlignedChunkGroupWriterImpl) groupWriter)
@@ -167,6 +168,9 @@ abstract class AbstractTableModelTsFileWriter implements
ITsFileWriter {
return groupWriter;
}
+ protected abstract void initAllSeriesWriterForAlignedSeries(
+ AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException;
+
/**
* calculate total memory size occupied by all ChunkGroupWriter instances
currently.
*
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
index 66fca2cf..f64f285f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -40,6 +41,7 @@ import java.util.List;
public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter {
private String tableName;
+ private TableSchema tableSchema;
private boolean isTableWriteAligned = true;
public DeviceTableModelWriter(File file, TableSchema tableSchema, long
memoryThreshold)
@@ -74,6 +76,12 @@ public class DeviceTableModelWriter extends
AbstractTableModelTsFileWriter {
checkMemorySizeAndMayFlushChunks();
}
+ @Override
+ protected void initAllSeriesWriterForAlignedSeries(
+ AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException {
+
alignedChunkGroupWriter.tryToAddSeriesWriter(tableSchema.getColumnSchemas());
+ }
+
private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
throws WriteProcessException {
String tabletTableName = tablet.getTableName();
@@ -102,6 +110,7 @@ public class DeviceTableModelWriter extends
AbstractTableModelTsFileWriter {
private void registerTableSchema(TableSchema tableSchema) {
this.tableName = tableSchema.getTableName();
+ this.tableSchema = tableSchema;
getSchema().registerTableSchema(tableSchema);
}
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
index d9b7147f..51aa64ce 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
@@ -21,10 +21,13 @@ package org.apache.tsfile.read;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.v4.DeviceTableModelReader;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorForTest;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.record.Tablet;
@@ -37,6 +40,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -45,6 +49,104 @@ import java.util.List;
public class TsFileV4ReadWriteInterfacesTest {
+ @Test
+ public void testWriteSomeColumns() throws IOException, WriteProcessException
{
+ String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0);
+
+ TableSchema tableSchema =
+ new TableSchema(
+ "t1",
+ Arrays.asList(
+ new MeasurementSchema("device", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32)),
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet1 =
+ new Tablet(
+ tableSchema.getTableName(),
+ Arrays.asList("device", "s1"),
+ Arrays.asList(TSDataType.STRING, TSDataType.INT32),
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
+ for (int i = 0; i < 1000; i++) {
+ tablet1.addTimestamp(i, i);
+ tablet1.addValue("device", i, "d1");
+ tablet1.addValue("s1", i, 0);
+ }
+ Tablet tablet2 =
+ new Tablet(
+ tableSchema.getTableName(),
+
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
+ IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
+ tableSchema.getColumnTypes());
+ for (int i = 0; i < 1000; i++) {
+ tablet2.addTimestamp(i, 1005 + i);
+ tablet2.addValue("device", i, "d1");
+ tablet2.addValue("s1", i, 1);
+ tablet2.addValue("s2", i, 1);
+ tablet2.addValue("s3", i, 1);
+ }
+ try (ITsFileWriter writer =
+ new TsFileWriterBuilder()
+ .file(new File(filePath))
+ .tableSchema(tableSchema)
+ .memoryThreshold(1)
+ .build()) {
+ writer.write(tablet1);
+ writer.write(tablet2);
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+ TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIterator.hasNext()) {
+ Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+ List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadataByMetadataIndexNode(
+ pair.getLeft(),
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+ Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1));
+
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2));
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(1)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(2)
+ .getStatistics()
+ .getCount());
+ }
+ }
+ }
+
@Test
public void testGetTableDeviceMethods() throws Exception {
String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg",
0, 0, 0);
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 17a0d0f1..264bf9e5 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -26,12 +26,15 @@ import
org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.ColumnSchema;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
@@ -41,6 +44,7 @@ import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.read.v4.ITsFileReader;
import org.apache.tsfile.read.v4.TsFileReaderBuilder;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.ChunkWriterImpl;
@@ -899,6 +903,181 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void testWriteSomeColumnsOfTree() throws IOException,
WriteProcessException {
+ List<IMeasurementSchema> fullMeasurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ List<IMeasurementSchema> measurementSchemas1 =
+ Arrays.asList(new MeasurementSchema("s1", TSDataType.INT32));
+ IDeviceID device = new StringArrayDeviceID("root.test.d1");
+ Tablet tablet1 =
+ new Tablet(
+ device,
+ IMeasurementSchema.getMeasurementNameList(fullMeasurementSchemas),
+ IMeasurementSchema.getDataTypeList(fullMeasurementSchemas));
+ Tablet tablet2 =
+ new Tablet(
+ device,
+ IMeasurementSchema.getMeasurementNameList(measurementSchemas1),
+ IMeasurementSchema.getDataTypeList(measurementSchemas1));
+ for (int i = 0; i < 1000; i++) {
+ tablet1.addTimestamp(i, i);
+ tablet1.addValue("s1", i, 1);
+ tablet1.addValue("s2", i, 1);
+ tablet1.addValue("s3", i, 1);
+ }
+ for (int i = 0; i < 1000; i++) {
+ tablet2.addTimestamp(i, i + 1005);
+ tablet2.addValue("s1", i, 0);
+ }
+ try (TsFileWriter writer = new TsFileWriter(f)) {
+ writer.registerAlignedTimeseries(device, fullMeasurementSchemas);
+ writer.setChunkGroupSizeThreshold(1);
+ writer.writeTree(tablet1);
+ writer.writeTree(tablet2);
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
+ TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIterator.hasNext()) {
+ Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+ List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadataByMetadataIndexNode(
+ pair.getLeft(),
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+ Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(1)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(2)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(1));
+
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(2));
+ }
+ }
+ }
+
+ @Test
+ public void testWriteSomeColumnsOfTable() throws IOException,
WriteProcessException {
+ TableSchema tableSchema =
+ new TableSchema(
+ "t1",
+ Arrays.asList(
+ new MeasurementSchema("device", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32)),
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet1 =
+ new Tablet(
+ tableSchema.getTableName(),
+ Arrays.asList("device", "s1"),
+ Arrays.asList(TSDataType.STRING, TSDataType.INT32),
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
+ for (int i = 0; i < 1000; i++) {
+ tablet1.addTimestamp(i, i);
+ tablet1.addValue("s1", i, 0);
+ }
+ Tablet tablet2 =
+ new Tablet(
+ tableSchema.getTableName(),
+
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
+ IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
+ tableSchema.getColumnTypes());
+ for (int i = 0; i < 1000; i++) {
+ tablet2.addTimestamp(i, 1005 + i);
+ tablet2.addValue("s1", i, 1);
+ tablet2.addValue("s2", i, 1);
+ tablet2.addValue("s3", i, 1);
+ }
+ try (TsFileWriter writer = new TsFileWriter(f)) {
+ writer.registerTableSchema(tableSchema);
+ writer.setChunkGroupSizeThreshold(1);
+ writer.writeTable(tablet1);
+ writer.writeTable(tablet2);
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
+ TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIterator.hasNext()) {
+ Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+ List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadataByMetadataIndexNode(
+ pair.getLeft(),
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+ Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1));
+
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2));
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(1)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(2)
+ .getStatistics()
+ .getCount());
+ }
+ }
+ }
+
@Test
public void writeTableTsFileWithUpperCaseColumns() throws IOException,
WriteProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);