This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_compression_by_type_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f52f5f0522e4878ef2b563585e52fd70e9510d26
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Aug 20 16:39:02 2025 +0800

    Fix that compression by type is not properly applied (#16211)
---
 .../iotdb/session/it/IoTDBSessionSimpleIT.java     | 180 ++++++++++++++++++++-
 .../persistence/schema/TemplateTable.java          |   2 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  25 +++
 .../analyze/schema/AutoCreateSchemaExecutor.java   |  13 +-
 .../plan/analyze/schema/TemplateSchemaFetcher.java |   2 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |   8 +-
 .../metrics/IoTDBInternalLocalReporter.java        |   3 +-
 .../FirstBatchCompactionAlignedChunkWriter.java    |   3 +-
 8 files changed, 224 insertions(+), 12 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index b3a231f3f63..e2ddbcdf259 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -2153,6 +2153,7 @@ public class IoTDBSessionSimpleIT {
   @Test
   public void testAlterDefaultCompression()
       throws IoTDBConnectionException, StatementExecutionException {
+    // auto-create
     try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
       List<TSDataType> types =
           Arrays.asList(
@@ -2219,7 +2220,7 @@ public class IoTDBSessionSimpleIT {
             String.format("SET CONFIGURATION '%s_compressor'='GZIP'", 
configName));
       }
 
-      String device2 = "root.test.d1";
+      String device2 = "root.test.d2";
       session.insertRecord(device2, 0, measurements, types, values);
 
       try (SessionDataSet dataSet =
@@ -2232,5 +2233,182 @@ public class IoTDBSessionSimpleIT {
         }
       }
     }
+
+    // manual create
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      List<TSDataType> types =
+          Arrays.asList(
+              TSDataType.BOOLEAN,
+              TSDataType.INT32,
+              TSDataType.DATE,
+              TSDataType.INT64,
+              TSDataType.TIMESTAMP,
+              TSDataType.FLOAT,
+              TSDataType.DOUBLE,
+              TSDataType.TEXT,
+              TSDataType.STRING,
+              TSDataType.BLOB);
+      List<String> measurements =
+          types.stream().map(dataType -> "__" + 
dataType.toString()).collect(Collectors.toList());
+
+      String device3 = "root.test.d3";
+      for (int i = 0; i < types.size(); i++) {
+        session.executeNonQueryStatement(
+            String.format(
+                "CREATE TIMESERIES %s.%s WITH DATATYPE=%s",
+                device3, measurements.get(i), types.get(i)));
+      }
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("SHOW TIMESERIES root.test.d3.**")) {
+        int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
+        while (dataSet.hasNext()) {
+          RowRecord rec = dataSet.next();
+          Field compressionField = rec.getFields().get(compressionIndex);
+          assertEquals("GZIP", compressionField.getStringValue());
+        }
+      }
+
+      for (TSDataType type : types) {
+        String configName = null;
+        switch (type) {
+          case INT32:
+          case INT64:
+          case FLOAT:
+          case DOUBLE:
+          case TEXT:
+          case BOOLEAN:
+            configName = type.name().toLowerCase();
+            break;
+          case STRING:
+          case BLOB:
+            configName = "text";
+            break;
+          case DATE:
+            configName = "int32";
+            break;
+          case TIMESTAMP:
+            configName = "int64";
+            break;
+        }
+        session.executeNonQueryStatement(
+            String.format("SET CONFIGURATION '%s_compressor'='LZ4'", 
configName));
+      }
+
+      String device4 = "root.test.d4";
+      for (int i = 0; i < types.size(); i++) {
+        session.executeNonQueryStatement(
+            String.format(
+                "CREATE TIMESERIES %s.%s WITH DATATYPE=%s",
+                device4, measurements.get(i), types.get(i)));
+      }
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("SHOW TIMESERIES root.test.d4.**")) {
+        int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
+        while (dataSet.hasNext()) {
+          RowRecord rec = dataSet.next();
+          Field compressionField = rec.getFields().get(compressionIndex);
+          assertEquals("LZ4", compressionField.getStringValue());
+        }
+      }
+    }
+
+    // template
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      List<TSDataType> types =
+          Arrays.asList(
+              TSDataType.BOOLEAN,
+              TSDataType.INT32,
+              TSDataType.DATE,
+              TSDataType.INT64,
+              TSDataType.TIMESTAMP,
+              TSDataType.FLOAT,
+              TSDataType.DOUBLE,
+              TSDataType.TEXT,
+              TSDataType.STRING,
+              TSDataType.BLOB);
+      List<String> measurements =
+          types.stream().map(dataType -> "__" + 
dataType.toString()).collect(Collectors.toList());
+      List<Object> values =
+          Arrays.asList(
+              false,
+              1,
+              LocalDate.of(1000, 1, 1),
+              1L,
+              1L,
+              1.0f,
+              1.0,
+              new Binary("1".getBytes(StandardCharsets.UTF_8)),
+              new Binary("1".getBytes(StandardCharsets.UTF_8)),
+              new Binary("1".getBytes(StandardCharsets.UTF_8)));
+
+      String createTemplateSql = "CREATE DEVICE TEMPLATE t1 (";
+      for (int i = 0; i < types.size(); i++) {
+        createTemplateSql += measurements.get(i) + " " + types.get(i).name();
+        if (i != types.size() - 1) {
+          createTemplateSql += ",";
+        }
+      }
+      createTemplateSql += ")";
+      session.executeNonQueryStatement(createTemplateSql);
+
+      session.executeNonQueryStatement("SET DEVICE TEMPLATE t1 TO 
root.test.d5");
+      String device5 = "root.test.d5";
+      session.insertRecord(device5, 0, measurements, types, values);
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("SHOW TIMESERIES root.test.d5.**")) {
+        int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
+        while (dataSet.hasNext()) {
+          RowRecord rec = dataSet.next();
+          Field compressionField = rec.getFields().get(compressionIndex);
+          assertEquals("LZ4", compressionField.getStringValue());
+        }
+      }
+
+      for (TSDataType type : types) {
+        String configName = null;
+        switch (type) {
+          case INT32:
+          case INT64:
+          case FLOAT:
+          case DOUBLE:
+          case TEXT:
+          case BOOLEAN:
+            configName = type.name().toLowerCase();
+            break;
+          case STRING:
+          case BLOB:
+            configName = "text";
+            break;
+          case DATE:
+            configName = "int32";
+            break;
+          case TIMESTAMP:
+            configName = "int64";
+            break;
+        }
+        session.executeNonQueryStatement(
+            String.format("SET CONFIGURATION '%s_compressor'='GZIP'", 
configName));
+      }
+
+      createTemplateSql = createTemplateSql.replace("t1", "t2");
+      session.executeNonQueryStatement(createTemplateSql);
+      session.executeNonQueryStatement("SET DEVICE TEMPLATE t2 TO 
root.test.d6");
+
+      String device6 = "root.test.d6";
+      session.insertRecord(device6, 0, measurements, types, values);
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("SHOW TIMESERIES root.test.d6.**")) {
+        int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
+        while (dataSet.hasNext()) {
+          RowRecord rec = dataSet.next();
+          Field compressionField = rec.getFields().get(compressionIndex);
+          assertEquals("GZIP", compressionField.getStringValue());
+        }
+      }
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
index 201e6eb4d2e..23cde4e9447 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java
@@ -156,7 +156,7 @@ public class TemplateTable {
               dataTypeList.get(i),
               encodingList == null ? getDefaultEncoding(dataTypeList.get(i)) : 
encodingList.get(i),
               compressionTypeList == null
-                  ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+                  ? 
TSFileDescriptor.getInstance().getConfig().getCompressor(dataTypeList.get(i))
                   : compressionTypeList.get(i));
         } else {
           if (!measurementSchema.getType().equals(dataTypeList.get(i))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 042e7969bf7..b8b29fad99c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1791,6 +1791,31 @@ public class IoTDBDescriptor {
                     "max_tsblock_line_number",
                     ConfigurationFileUtils.getConfigurationDefaultValue(
                         "max_tsblock_line_number"))));
+
+    String booleanCompressor = properties.getProperty("boolean_compressor");
+    if (booleanCompressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setBooleanCompression(booleanCompressor);
+    }
+    String int32Compressor = properties.getProperty("int32_compressor");
+    if (int32Compressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setInt32Compression(int32Compressor);
+    }
+    String int64Compressor = properties.getProperty("int64_compressor");
+    if (int64Compressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setInt64Compression(int64Compressor);
+    }
+    String floatCompressor = properties.getProperty("float_compressor");
+    if (floatCompressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setFloatCompression(floatCompressor);
+    }
+    String doubleCompressor = properties.getProperty("double_compressor");
+    if (doubleCompressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setDoubleCompression(doubleCompressor);
+    }
+    String textCompressor = properties.getProperty("text_compressor");
+    if (textCompressor != null) {
+      
TSFileDescriptor.getInstance().getConfig().setTextCompression(textCompressor);
+    }
   }
 
   // Mqtt related
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 38d0f8942e5..0cfa03dbda4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -124,7 +124,7 @@ class AutoCreateSchemaExecutor {
             dataTypesOfMissingMeasurement.add(tsDataType);
             encodingsOfMissingMeasurement.add(getDefaultEncoding(tsDataType));
             compressionTypesOfMissingMeasurement.add(
-                TSFileDescriptor.getInstance().getConfig().getCompressor());
+                
TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType));
           }
         });
 
@@ -181,7 +181,9 @@ class AutoCreateSchemaExecutor {
                   measurements[measurementIndex],
                   tsDataTypes[measurementIndex],
                   getDefaultEncoding(tsDataTypes[measurementIndex]),
-                  TSFileDescriptor.getInstance().getConfig().getCompressor());
+                  TSFileDescriptor.getInstance()
+                      .getConfig()
+                      .getCompressor(tsDataTypes[measurementIndex]));
             }
             return v;
           });
@@ -348,7 +350,9 @@ class AutoCreateSchemaExecutor {
                         ? getDefaultEncoding(tsDataTypes[measurementIndex])
                         : encodings[measurementIndex],
                     compressionTypes == null
-                        ? 
TSFileDescriptor.getInstance().getConfig().getCompressor()
+                        ? TSFileDescriptor.getInstance()
+                            .getConfig()
+                            .getCompressor(tsDataTypes[measurementIndex])
                         : compressionTypes[measurementIndex]);
               }
               return v;
@@ -392,7 +396,8 @@ class AutoCreateSchemaExecutor {
                       && compressionTypesList.get(finalDeviceIndex1) != null) {
                     compressionType = 
compressionTypesList.get(finalDeviceIndex1)[index];
                   } else {
-                    compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+                    compressionType =
+                        
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
                   }
                   templateExtendInfo.addMeasurement(
                       measurement, dataType, encoding, compressionType);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java
index 351bb52b67a..5967afceada 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java
@@ -155,7 +155,7 @@ class TemplateSchemaFetcher {
                   measurements[j],
                   dataType,
                   getDefaultEncoding(dataType),
-                  TSFileDescriptor.getInstance().getConfig().getCompressor());
+                  
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType));
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index d33eaa953d3..aa59bc78659 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -429,7 +429,9 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     }
 
     createTimeSeriesStatement.setCompressor(
-        TSFileDescriptor.getInstance().getConfig().getCompressor());
+        TSFileDescriptor.getInstance()
+            .getConfig()
+            .getCompressor(createTimeSeriesStatement.getDataType()));
     if (props != null
         && 
props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION.toLowerCase())) {
       String compressionString =
@@ -491,7 +493,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       createAlignedTimeSeriesStatement.addEncoding(encoding);
     }
 
-    CompressionType compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    CompressionType compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
     if 
(props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) {
       String compressorString =
           
props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase();
@@ -3637,7 +3639,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       encodings.add(encoding);
     }
 
-    CompressionType compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    CompressionType compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
     if 
(props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) {
       String compressorString =
           
props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index 2dc54e564f2..d31b7d11244 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -223,7 +223,8 @@ public class IoTDBInternalLocalReporter extends 
IoTDBInternalReporter {
       TSDataType type = inferType(entry.getValue());
       types.add(type.ordinal());
       encodings.add((int) getDefaultEncoding(type).serialize());
-      compressors.add((int) 
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize());
+      compressors.add(
+          (int) 
TSFileDescriptor.getInstance().getConfig().getCompressor(type).serialize());
     }
     request.setPaths(paths);
     request.setDataTypes(types);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java
index c3b6ae31747..58080a83613 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java
@@ -101,7 +101,8 @@ public class FirstBatchCompactionAlignedChunkWriter extends 
AlignedChunkWriterIm
     TSEncoding timeEncoding =
         
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
     TSDataType timeType = 
TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
-    CompressionType timeCompression = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    CompressionType timeCompression =
+        TSFileDescriptor.getInstance().getConfig().getCompressor(timeType);
     timeChunkWriter =
         new FirstBatchCompactionTimeChunkWriter(
             "",

Reply via email to