This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 0b37a9de calculate table size map during write process (#717)
0b37a9de is described below
commit 0b37a9de41f071eaad0a8e775376f39ae4edbe70
Author: shuwenwei <[email protected]>
AuthorDate: Tue Feb 3 20:06:05 2026 +0800
calculate table size map during write process (#717)
---
.../apache/tsfile/read/TsFileSequenceReader.java | 95 ++++++++++++++++++++++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 60 ++++++++++++++
.../apache/tsfile/write/TsFileWriteApiTest.java | 43 ++++++++++
3 files changed, 198 insertions(+)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 35a39dc9..481a4d6c 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -158,6 +158,101 @@ public class TsFileSequenceReader implements
AutoCloseable {
this(file, true, null);
}
+ public Map<IDeviceID, Integer> countChunksPerChunkGroup() throws IOException
{
+ Map<IDeviceID, Integer> result = new LinkedHashMap<>();
+
+ File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+ if (!checkFile.exists()) {
+ return result;
+ }
+
+ int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length +
Byte.BYTES;
+ if (checkFile.length() < headerLength) {
+ return result;
+ }
+
+ if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())) {
+ return result;
+ }
+
+ readVersionNumber();
+ checkFileVersion();
+
+ tsFileInput.position(headerLength);
+
+ IDeviceID currentDevice = null;
+ int currentChunkCount = 0;
+
+ try {
+ byte marker;
+ while ((marker = readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ // finish last chunk group
+ if (currentDevice != null) {
+ result.put(currentDevice, currentChunkCount);
+ }
+
+ // start new chunk group
+ ChunkGroupHeader chunkGroupHeader = readChunkGroupHeader();
+ currentDevice = chunkGroupHeader.getDeviceID();
+ currentChunkCount = 0;
+ break;
+
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ // count chunk
+ currentChunkCount++;
+
+ // skip chunk content safely
+ ChunkHeader chunkHeader = readChunkHeader(marker);
+ skipChunkData(chunkHeader, marker);
+ break;
+
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ readPlanIndex();
+ break;
+
+ default:
+ throw new IOException("Unexpected marker " + marker);
+ }
+ }
+
+ // last chunk group
+ if (currentDevice != null) {
+ result.put(currentDevice, currentChunkCount);
+ }
+ } catch (Exception e) {
+ }
+
+ return result;
+ }
+
+ private void skipChunkData(ChunkHeader chunkHeader, byte marker) throws
IOException {
+ int dataSize = chunkHeader.getDataSize();
+
+ if (dataSize <= 0) {
+ return;
+ }
+
+ if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.CHUNK_HEADER) {
+ // multi-page chunk
+ while (dataSize > 0) {
+ PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(),
true);
+ skipPageData(pageHeader);
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ } else {
+ // single-page chunk
+ PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(), false);
+ skipPageData(pageHeader);
+ }
+ }
+
public TsFileSequenceReader(String file, EncryptParameter firstEncryptParam)
throws IOException {
this(file, true, null);
this.firstEncryptParam = firstEncryptParam;
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 377ef90f..d75fb572 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -136,6 +136,12 @@ public class TsFileIOWriter implements AutoCloseable {
private final List<FlushChunkMetadataListener> flushListeners = new
ArrayList<>();
+ protected String currentTable;
+
+ protected long currentTableStartOffset;
+
+ protected Map<String, Long> tableSizeMap = new HashMap<>();
+
/** empty construct function. */
protected TsFileIOWriter() {
setEncryptParam(
@@ -260,6 +266,7 @@ public class TsFileIOWriter implements AutoCloseable {
}
public int startChunkGroup(IDeviceID deviceId) throws IOException {
+ updateTableSize(deviceId);
this.currentChunkGroupDeviceId = deviceId;
if (logger.isDebugEnabled()) {
logger.debug("start chunk group:{}, file position {}", deviceId,
out.getPosition());
@@ -427,6 +434,7 @@ public class TsFileIOWriter implements AutoCloseable {
if (!canWrite) {
return;
}
+ updateTableSize(null);
checkInMemoryPathCount();
readChunkMetadataAndConstructIndexTree();
@@ -474,6 +482,8 @@ public class TsFileIOWriter implements AutoCloseable {
TSMIterator tsmIterator = getTSMIterator();
Map<IDeviceID, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
Queue<MetadataIndexNode> measurementMetadataIndexQueue = new
ArrayDeque<>();
+ String prevTableName = null;
+ long prevTableMetadataStartOffset = metaOffset;
IDeviceID currentDevice = null;
IDeviceID prevDevice = null;
Path currentPath = null;
@@ -495,6 +505,7 @@ public class TsFileIOWriter implements AutoCloseable {
filter.add(currentPath);
// construct the index tree node for the series
currentDevice = currentPath.getIDeviceID();
+ boolean isTableModel =
schema.getTableSchemaMap().containsKey(currentDevice.getTableName());
if (!currentDevice.equals(prevDevice)) {
if (prevDevice != null) {
addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
@@ -503,6 +514,16 @@ public class TsFileIOWriter implements AutoCloseable {
generateRootNode(
measurementMetadataIndexQueue, out,
MetadataIndexNodeType.INTERNAL_MEASUREMENT));
currentIndexNode = new
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ String currentTableName = isTableModel ?
currentDevice.getTableName() : null;
+ if (!Objects.equals(currentTableName, prevTableName)) {
+ if (prevTableName != null) {
+ long currentTableSize = out.getPosition() -
prevTableMetadataStartOffset;
+ tableSizeMap.compute(
+ prevTableName, (k, v) -> v == null ? currentTableSize : v +
currentTableSize);
+ }
+ prevTableName = currentTableName;
+ prevTableMetadataStartOffset = out.getPosition();
+ }
}
measurementMetadataIndexQueue = new ArrayDeque<>();
seriesIdxForCurrDevice = 0;
@@ -533,6 +554,15 @@ public class TsFileIOWriter implements AutoCloseable {
prevDevice,
generateRootNode(
measurementMetadataIndexQueue, out,
MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ prevTableName =
+ schema.getTableSchemaMap().containsKey(prevDevice.getTableName())
+ ? prevDevice.getTableName()
+ : null;
+ if (prevTableName != null) {
+ long currentTableSize = out.getPosition() -
prevTableMetadataStartOffset;
+ tableSizeMap.compute(
+ prevTableName, (k, v) -> v == null ? currentTableSize : v +
currentTableSize);
+ }
}
Map<String, Map<IDeviceID, MetadataIndexNode>> tableDeviceNodesMap =
@@ -541,7 +571,14 @@ public class TsFileIOWriter implements AutoCloseable {
// build an index root for each table
Map<String, MetadataIndexNode> tableNodesMap = new TreeMap<>();
for (Entry<String, Map<IDeviceID, MetadataIndexNode>> entry :
tableDeviceNodesMap.entrySet()) {
+ long tableDeviceMetadataNodeStartOffset = out.getPosition();
tableNodesMap.put(entry.getKey(),
checkAndBuildLevelIndex(entry.getValue(), out));
+ long tableDeviceMetadataNodeSize = out.getPosition() -
tableDeviceMetadataNodeStartOffset;
+ if (schema.getTableSchemaMap().containsKey(entry.getKey())) {
+ tableSizeMap.compute(
+ entry.getKey(),
+ (k, v) -> v == null ? tableDeviceMetadataNodeSize : v +
tableDeviceMetadataNodeSize);
+ }
}
TsFileMetadata tsFileMetadata = new TsFileMetadata();
@@ -863,4 +900,27 @@ public class TsFileIOWriter implements AutoCloseable {
public void setGenerateTableSchema(boolean generateTableSchema) {
this.generateTableSchema = generateTableSchema;
}
+
+ public Map<String, Long> getTableSizeMap() {
+ return tableSizeMap;
+ }
+
+ private void updateTableSize(IDeviceID currentStartChunkGroupDeviceId)
throws IOException {
+ long currentPosition = out.getPosition();
+ // endFile
+ boolean endFile = currentStartChunkGroupDeviceId == null;
+ if (endFile
+ || (currentStartChunkGroupDeviceId.isTableModel()
+ &&
!currentStartChunkGroupDeviceId.getTableName().equals(currentTable))) {
+ if (currentTable != null) {
+ long size = currentPosition - currentTableStartOffset;
+ tableSizeMap.compute(currentTable, (k, v) -> (v == null ? size : v +
size));
+ }
+ currentTableStartOffset = currentPosition;
+ currentTable =
+ currentStartChunkGroupDeviceId == null
+ ? null
+ : currentStartChunkGroupDeviceId.getTableName();
+ }
+ }
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 6449b52d..dd6e74bc 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -1151,6 +1151,49 @@ public class TsFileWriteApiTest {
}
}
+ @Test
+ public void calculateTableSize() throws IOException, WriteProcessException {
+ TableSchema tableSchema1 =
+ new TableSchema(
+ "table1",
+ Arrays.asList(
+ new ColumnSchema("device", TSDataType.STRING,
ColumnCategory.TAG),
+ new ColumnSchema("s1", TSDataType.BLOB,
ColumnCategory.FIELD)));
+ TableSchema tableSchema2 =
+ new TableSchema(
+ "table2",
+ Arrays.asList(
+ new ColumnSchema("device", TSDataType.STRING,
ColumnCategory.TAG),
+ new ColumnSchema("s1", TSDataType.BLOB,
ColumnCategory.FIELD)));
+ Tablet tablet1 =
+ new Tablet(
+ "table1",
+
IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()),
+
IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()),
+ tableSchema1.getColumnTypes());
+ tablet1.addTimestamp(0, 0);
+ tablet1.addValue(0, 0, new byte[1024]);
+ Tablet tablet2 =
+ new Tablet(
+ "table2",
+
IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()),
+
IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()),
+ tableSchema2.getColumnTypes());
+ tablet2.addTimestamp(0, 0);
+ tablet2.addValue(0, 0, new byte[1024 * 1024]);
+ Map<String, Long> tableSizeMap = null;
+ try (TsFileWriter writer = new TsFileWriter(f)) {
+ writer.registerTableSchema(tableSchema1);
+ writer.registerTableSchema(tableSchema2);
+ writer.writeTable(tablet1);
+ writer.writeTable(tablet2);
+ tableSizeMap = writer.getIOWriter().getTableSizeMap();
+ }
+ Assert.assertTrue(tableSizeMap.get("table1") < 1024 * 1024);
+ Assert.assertTrue(tableSizeMap.get("table1") > 1024);
+ Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024);
+ }
+
@Test
public void writeRecord() throws IOException, WriteProcessException,
ReadProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);