This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 78f6605de5dbcf16d071cebe65f0557a07d082c9 Author: Caideyipi <[email protected]> AuthorDate: Tue May 12 14:53:59 2026 +0800 Update TsFileInsertionEventParserTest.java --- .../pipe/event/TsFileInsertionEventParserTest.java | 224 +++++++++++++++++++++ 1 file changed, 224 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 8d02cc8a998..6d48cab4d09 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -51,6 +51,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -64,6 +65,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -80,6 +82,8 @@ public class TsFileInsertionEventParserTest { private static final String PREFIX_FORMAT = "prefix"; private static final String IOTDB_FORMAT = "iotdb"; + private static final String MANUAL_SCAN_PARSER_PERFORMANCE_TEST = + "iotdb.scan.parser.performance.enabled"; private File alignedTsFile; private File nonalignedTsFile; @@ -162,6 +166,106 @@ public class TsFileInsertionEventParserTest { } } + @Test + public void manualTestScanParserSplitPerformance() throws Exception { + Assume.assumeTrue( + "Set -D" + MANUAL_SCAN_PARSER_PERFORMANCE_TEST + "=true to run this manual test.", + Boolean.getBoolean(MANUAL_SCAN_PARSER_PERFORMANCE_TEST)); + + final int deviceCount = + getManualPerformanceIntProperty("iotdb.scan.parser.performance.device.count", 1); + final int measurementCount = + getManualPerformanceIntProperty("iotdb.scan.parser.performance.measurement.count", 256); + final int rowCountPerDevice = + getManualPerformanceIntProperty("iotdb.scan.parser.performance.row.count", 200_000); + final int tabletRowCount = + getManualPerformanceIntProperty("iotdb.scan.parser.performance.tablet.row.count", 1024); + final long pipeMaxReaderChunkSize = + getManualPerformanceLongProperty( + "iotdb.scan.parser.performance.reader.chunk.size", 1024 * 1024L); + final long expectedPointCount = (long) deviceCount * measurementCount * rowCountPerDevice; + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize); + + alignedTsFile = new File("scan-parser-split-performance.tsfile"); + try { + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add( + new MeasurementSchema( + "s" + i, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4)); + } + + final long writeStartTime = System.nanoTime(); + generateLargeAlignedTsFile( + alignedTsFile, schemaList, deviceCount, rowCountPerDevice, tabletRowCount); + final long writeElapsedNanos = System.nanoTime() - writeStartTime; + + long pointCount = 0; + long tabletRowCountSum = 0; + int tabletCount = 0; + int alignedTabletCount = 0; + int minMeasurementCountInTablet = Integer.MAX_VALUE; + int maxMeasurementCountInTablet = 0; + + final long parseStartTime = System.nanoTime(); + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + if (tabletWithIsAligned.getRight()) { + ++alignedTabletCount; + } + tabletRowCountSum += tablet.getRowSize(); + pointCount += getNonNullSize(tablet); + minMeasurementCountInTablet = + Math.min(minMeasurementCountInTablet, tablet.getSchemas().size()); + maxMeasurementCountInTablet = + Math.max(maxMeasurementCountInTablet, tablet.getSchemas().size()); + } + } + final long parseElapsedNanos = System.nanoTime() - parseStartTime; + + Assert.assertEquals(expectedPointCount, pointCount); + Assert.assertTrue( + "Expected TsFileInsertionEventScanParser to split tablets.", tabletCount > 1); + Assert.assertTrue( + "Expected measurement split by pipe max reader chunk size.", + maxMeasurementCountInTablet < measurementCount); + + printScanParserPerformanceResult( + alignedTsFile.length(), + deviceCount, + measurementCount, + rowCountPerDevice, + tabletRowCount, + pipeMaxReaderChunkSize, + expectedPointCount, + writeElapsedNanos, + parseElapsedNanos, + tabletCount, + alignedTabletCount, + tabletRowCountSum, + pointCount, + minMeasurementCountInTablet, + maxMeasurementCountInTablet); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@ -602,6 +706,126 @@ public class TsFileInsertionEventParserTest { alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, isQuery, 4); } + private void generateLargeAlignedTsFile( + final File tsFile, + final List<IMeasurementSchema> schemaList, + final int deviceCount, + final int rowCountPerDevice, + final int tabletRowCount) + throws Exception { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + + try (final TsFileWriter writer = new TsFileWriter(tsFile)) { + for (int deviceIndex = 0; deviceIndex < deviceCount; ++deviceIndex) { + final String device = "root.sg.performance.d" + deviceIndex; + writer.registerAlignedTimeseries(new PartialPath(device), schemaList); + + final Tablet tablet = new Tablet(device, schemaList, tabletRowCount); + for (int row = 0; row < rowCountPerDevice; ++row) { + int rowIndex = tablet.getRowSize(); + if (rowIndex == tablet.getMaxRowNumber()) { + writer.writeAligned(tablet); + tablet.reset(); + rowIndex = 0; + } + + tablet.addTimestamp(rowIndex, row); + for (int measurementIndex = 0; measurementIndex < schemaList.size(); ++measurementIndex) { + tablet.addValue( + rowIndex, + measurementIndex, + ((long) deviceIndex << 48) + (long) row * schemaList.size() + measurementIndex); + } + } + + if (tablet.getRowSize() > 0) { + writer.writeAligned(tablet); + } + } + } + } + + private int getManualPerformanceIntProperty(final String propertyName, final int defaultValue) { + final int value = Integer.getInteger(propertyName, defaultValue); + Assert.assertTrue(propertyName + " should be positive.", value > 0); + return value; + } + + private long getManualPerformanceLongProperty( + final String propertyName, final long defaultValue) { + final Long value = Long.getLong(propertyName, defaultValue); + Assert.assertTrue(propertyName + " should be positive.", value > 0); + return value; + } + + private void printScanParserPerformanceResult( + final long tsFileSizeInBytes, + final int deviceCount, + final int measurementCount, + final int rowCountPerDevice, + final int inputTabletRowCount, + final long pipeMaxReaderChunkSize, + final long expectedPointCount, + final long writeElapsedNanos, + final long parseElapsedNanos, + final int tabletCount, + final int alignedTabletCount, + final long parsedTabletRowCount, + final long pointCount, + final int minMeasurementCountInTablet, + final int maxMeasurementCountInTablet) { + final double writeElapsedSeconds = nanosToSeconds(writeElapsedNanos); + final double parseElapsedSeconds = nanosToSeconds(parseElapsedNanos); + final double pointThroughput = pointCount / Math.max(parseElapsedSeconds, 1.0e-9); + final double fileThroughputInMiBPerSecond = + tsFileSizeInBytes / 1024.0 / 1024.0 / Math.max(parseElapsedSeconds, 1.0e-9); + + System.out.printf( + Locale.ROOT, + "%nTsFileInsertionEventScanParser split performance:%n" + + " fileSize=%s%n" + + " deviceCount=%d, measurementCount=%d, rowCountPerDevice=%d, expectedPoints=%d%n" + + " inputTabletRowCount=%d, pipeMaxReaderChunkSize=%s%n" + + " writeTime=%.3fs, parseTime=%.3fs%n" + + " tablets=%d, alignedTablets=%d, parsedTabletRows=%d, points=%d%n" + + " measurementCountInTablet[min=%d, max=%d]%n" + + " pointThroughput=%.2f points/s, fileThroughput=%.2f MiB/s%n", + formatBytes(tsFileSizeInBytes), + deviceCount, + measurementCount, + rowCountPerDevice, + expectedPointCount, + inputTabletRowCount, + formatBytes(pipeMaxReaderChunkSize), + writeElapsedSeconds, + parseElapsedSeconds, + tabletCount, + alignedTabletCount, + parsedTabletRowCount, + pointCount, + minMeasurementCountInTablet, + maxMeasurementCountInTablet, + pointThroughput, + fileThroughputInMiBPerSecond); + } + + private double nanosToSeconds(final long nanos) { + return nanos / 1_000_000_000.0; + } + + private String formatBytes(final long bytes) { + double value = bytes; + final String[] units = {"B", "KiB", "MiB", "GiB"}; + int unitIndex = 0; + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + ++unitIndex; + } + return String.format(Locale.ROOT, "%.2f %s", value, units[unitIndex]); + } + private void testTsFilePointNum( final File tsFile, final TreePattern pattern,
