This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_compression_by_type_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f52f5f0522e4878ef2b563585e52fd70e9510d26 Author: Jiang Tian <[email protected]> AuthorDate: Wed Aug 20 16:39:02 2025 +0800 Fix that compression by type is not properly applied (#16211) --- .../iotdb/session/it/IoTDBSessionSimpleIT.java | 180 ++++++++++++++++++++- .../persistence/schema/TemplateTable.java | 2 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 25 +++ .../analyze/schema/AutoCreateSchemaExecutor.java | 13 +- .../plan/analyze/schema/TemplateSchemaFetcher.java | 2 +- .../db/queryengine/plan/parser/ASTVisitor.java | 8 +- .../metrics/IoTDBInternalLocalReporter.java | 3 +- .../FirstBatchCompactionAlignedChunkWriter.java | 3 +- 8 files changed, 224 insertions(+), 12 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java index b3a231f3f63..e2ddbcdf259 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java @@ -2153,6 +2153,7 @@ public class IoTDBSessionSimpleIT { @Test public void testAlterDefaultCompression() throws IoTDBConnectionException, StatementExecutionException { + // auto-create try (ISession session = EnvFactory.getEnv().getSessionConnection()) { List<TSDataType> types = Arrays.asList( @@ -2219,7 +2220,7 @@ public class IoTDBSessionSimpleIT { String.format("SET CONFIGURATION '%s_compressor'='GZIP'", configName)); } - String device2 = "root.test.d1"; + String device2 = "root.test.d2"; session.insertRecord(device2, 0, measurements, types, values); try (SessionDataSet dataSet = @@ -2232,5 +2233,182 @@ public class IoTDBSessionSimpleIT { } } } + + // manual create + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + List<TSDataType> types = + Arrays.asList( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.DATE, + TSDataType.INT64, + TSDataType.TIMESTAMP, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.STRING, + TSDataType.BLOB); + List<String> measurements = + types.stream().map(dataType -> "__" + dataType.toString()).collect(Collectors.toList()); + + String device3 = "root.test.d3"; + for (int i = 0; i < types.size(); i++) { + session.executeNonQueryStatement( + String.format( + "CREATE TIMESERIES %s.%s WITH DATATYPE=%s", + device3, measurements.get(i), types.get(i))); + } + + try (SessionDataSet dataSet = + session.executeQueryStatement("SHOW TIMESERIES root.test.d3.**")) { + int compressionIndex = dataSet.getColumnNames().indexOf("Compression"); + while (dataSet.hasNext()) { + RowRecord rec = dataSet.next(); + Field compressionField = rec.getFields().get(compressionIndex); + assertEquals("GZIP", compressionField.getStringValue()); + } + } + + for (TSDataType type : types) { + String configName = null; + switch (type) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case TEXT: + case BOOLEAN: + configName = type.name().toLowerCase(); + break; + case STRING: + case BLOB: + configName = "text"; + break; + case DATE: + configName = "int32"; + break; + case TIMESTAMP: + configName = "int64"; + break; + } + session.executeNonQueryStatement( + String.format("SET CONFIGURATION '%s_compressor'='LZ4'", configName)); + } + + String device4 = "root.test.d4"; + for (int i = 0; i < types.size(); i++) { + session.executeNonQueryStatement( + String.format( + "CREATE TIMESERIES %s.%s WITH DATATYPE=%s", + device4, measurements.get(i), types.get(i))); + } + + try (SessionDataSet dataSet = + session.executeQueryStatement("SHOW TIMESERIES root.test.d4.**")) { + int compressionIndex = dataSet.getColumnNames().indexOf("Compression"); + while (dataSet.hasNext()) { + RowRecord rec = dataSet.next(); + Field compressionField = rec.getFields().get(compressionIndex); + assertEquals("LZ4", compressionField.getStringValue()); + } + } + } + + // template + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + List<TSDataType> types = + Arrays.asList( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.DATE, + TSDataType.INT64, + TSDataType.TIMESTAMP, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.STRING, + TSDataType.BLOB); + List<String> measurements = + types.stream().map(dataType -> "__" + dataType.toString()).collect(Collectors.toList()); + List<Object> values = + Arrays.asList( + false, + 1, + LocalDate.of(1000, 1, 1), + 1L, + 1L, + 1.0f, + 1.0, + new Binary("1".getBytes(StandardCharsets.UTF_8)), + new Binary("1".getBytes(StandardCharsets.UTF_8)), + new Binary("1".getBytes(StandardCharsets.UTF_8))); + + String createTemplateSql = "CREATE DEVICE TEMPLATE t1 ("; + for (int i = 0; i < types.size(); i++) { + createTemplateSql += measurements.get(i) + " " + types.get(i).name(); + if (i != types.size() - 1) { + createTemplateSql += ","; + } + } + createTemplateSql += ")"; + session.executeNonQueryStatement(createTemplateSql); + + session.executeNonQueryStatement("SET DEVICE TEMPLATE t1 TO root.test.d5"); + String device5 = "root.test.d5"; + session.insertRecord(device5, 0, measurements, types, values); + + try (SessionDataSet dataSet = + session.executeQueryStatement("SHOW TIMESERIES root.test.d5.**")) { + int compressionIndex = dataSet.getColumnNames().indexOf("Compression"); + while (dataSet.hasNext()) { + RowRecord rec = dataSet.next(); + Field compressionField = rec.getFields().get(compressionIndex); + assertEquals("LZ4", compressionField.getStringValue()); + } + } + + for (TSDataType type : types) { + String configName = null; + switch (type) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case TEXT: + case BOOLEAN: + configName = type.name().toLowerCase(); + break; + case STRING: + case BLOB: + configName = "text"; + break; + case DATE: + configName = "int32"; + break; + case TIMESTAMP: + configName = "int64"; + break; + } + session.executeNonQueryStatement( + String.format("SET CONFIGURATION '%s_compressor'='GZIP'", configName)); + } + + createTemplateSql = createTemplateSql.replace("t1", "t2"); + session.executeNonQueryStatement(createTemplateSql); + session.executeNonQueryStatement("SET DEVICE TEMPLATE t2 TO root.test.d6"); + + String device6 = "root.test.d6"; + session.insertRecord(device6, 0, measurements, types, values); + + try (SessionDataSet dataSet = + session.executeQueryStatement("SHOW TIMESERIES root.test.d6.**")) { + int compressionIndex = dataSet.getColumnNames().indexOf("Compression"); + while (dataSet.hasNext()) { + RowRecord rec = dataSet.next(); + Field compressionField = rec.getFields().get(compressionIndex); + assertEquals("GZIP", compressionField.getStringValue()); + } + } + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java index 201e6eb4d2e..23cde4e9447 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java @@ -156,7 +156,7 @@ public class TemplateTable { dataTypeList.get(i), encodingList == null ? getDefaultEncoding(dataTypeList.get(i)) : encodingList.get(i), compressionTypeList == null - ? TSFileDescriptor.getInstance().getConfig().getCompressor() + ? TSFileDescriptor.getInstance().getConfig().getCompressor(dataTypeList.get(i)) : compressionTypeList.get(i)); } else { if (!measurementSchema.getType().equals(dataTypeList.get(i)) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 042e7969bf7..b8b29fad99c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1791,6 +1791,31 @@ public class IoTDBDescriptor { "max_tsblock_line_number", ConfigurationFileUtils.getConfigurationDefaultValue( "max_tsblock_line_number")))); + + String booleanCompressor = properties.getProperty("boolean_compressor"); + if (booleanCompressor != null) { + TSFileDescriptor.getInstance().getConfig().setBooleanCompression(booleanCompressor); + } + String int32Compressor = properties.getProperty("int32_compressor"); + if (int32Compressor != null) { + TSFileDescriptor.getInstance().getConfig().setInt32Compression(int32Compressor); + } + String int64Compressor = properties.getProperty("int64_compressor"); + if (int64Compressor != null) { + TSFileDescriptor.getInstance().getConfig().setInt64Compression(int64Compressor); + } + String floatCompressor = properties.getProperty("float_compressor"); + if (floatCompressor != null) { + TSFileDescriptor.getInstance().getConfig().setFloatCompression(floatCompressor); + } + String doubleCompressor = properties.getProperty("double_compressor"); + if (doubleCompressor != null) { + TSFileDescriptor.getInstance().getConfig().setDoubleCompression(doubleCompressor); + } + String textCompressor = properties.getProperty("text_compressor"); + if (textCompressor != null) { + TSFileDescriptor.getInstance().getConfig().setTextCompression(textCompressor); + } } // Mqtt related diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java index 38d0f8942e5..0cfa03dbda4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -124,7 +124,7 @@ class AutoCreateSchemaExecutor { dataTypesOfMissingMeasurement.add(tsDataType); encodingsOfMissingMeasurement.add(getDefaultEncoding(tsDataType)); compressionTypesOfMissingMeasurement.add( - TSFileDescriptor.getInstance().getConfig().getCompressor()); + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType)); } }); @@ -181,7 +181,9 @@ class AutoCreateSchemaExecutor { measurements[measurementIndex], tsDataTypes[measurementIndex], getDefaultEncoding(tsDataTypes[measurementIndex]), - TSFileDescriptor.getInstance().getConfig().getCompressor()); + TSFileDescriptor.getInstance() + .getConfig() + .getCompressor(tsDataTypes[measurementIndex])); } return v; }); @@ -348,7 +350,9 @@ class AutoCreateSchemaExecutor { ? getDefaultEncoding(tsDataTypes[measurementIndex]) : encodings[measurementIndex], compressionTypes == null - ? TSFileDescriptor.getInstance().getConfig().getCompressor() + ? TSFileDescriptor.getInstance() + .getConfig() + .getCompressor(tsDataTypes[measurementIndex]) : compressionTypes[measurementIndex]); } return v; @@ -392,7 +396,8 @@ class AutoCreateSchemaExecutor { && compressionTypesList.get(finalDeviceIndex1) != null) { compressionType = compressionTypesList.get(finalDeviceIndex1)[index]; } else { - compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); + compressionType = + TSFileDescriptor.getInstance().getConfig().getCompressor(dataType); } templateExtendInfo.addMeasurement( measurement, dataType, encoding, compressionType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java index 351bb52b67a..5967afceada 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java @@ -155,7 +155,7 @@ class TemplateSchemaFetcher { measurements[j], dataType, getDefaultEncoding(dataType), - TSFileDescriptor.getInstance().getConfig().getCompressor()); + TSFileDescriptor.getInstance().getConfig().getCompressor(dataType)); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index d33eaa953d3..aa59bc78659 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -429,7 +429,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { } createTimeSeriesStatement.setCompressor( - TSFileDescriptor.getInstance().getConfig().getCompressor()); + TSFileDescriptor.getInstance() + .getConfig() + .getCompressor(createTimeSeriesStatement.getDataType())); if (props != null && props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION.toLowerCase())) { String compressionString = @@ -491,7 +493,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { createAlignedTimeSeriesStatement.addEncoding(encoding); } - CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(dataType); if (props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) { String compressorString = props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase(); @@ -3637,7 +3639,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { encodings.add(encoding); } - CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(dataType); if (props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) { String compressorString = props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index 2dc54e564f2..d31b7d11244 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -223,7 +223,8 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { TSDataType type = inferType(entry.getValue()); types.add(type.ordinal()); encodings.add((int) getDefaultEncoding(type).serialize()); - compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize()); + compressors.add( + (int) TSFileDescriptor.getInstance().getConfig().getCompressor(type).serialize()); } request.setPaths(paths); request.setDataTypes(types); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java index c3b6ae31747..58080a83613 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java @@ -101,7 +101,8 @@ public class FirstBatchCompactionAlignedChunkWriter extends AlignedChunkWriterIm TSEncoding timeEncoding = TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); - CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType timeCompression = + TSFileDescriptor.getInstance().getConfig().getCompressor(timeType); timeChunkWriter = new FirstBatchCompactionTimeChunkWriter( "",
