This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a3a0c6193f8 Pipe: Converted source database / table name pattern to 
lower case (#15285)
a3a0c6193f8 is described below

commit a3a0c6193f8a31574c43bff7c8dcced4402342c0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 10 11:38:50 2025 +0800

    Pipe: Converted source database / table name pattern to lower case (#15285)
---
 .../tablemodel/manual/basic/IoTDBPipeAlterIT.java  |  6 ++--
 .../api/customizer/parameter/PipeParameters.java   | 18 ++++++++++-
 .../execution/config/TableConfigTaskVisitor.java   | 35 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
index 72806ccbb2a..61b29c30cc2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
@@ -59,9 +59,10 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelDualManualIT {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     // Create pipe
+    // The database & table name will be converted to lower case
     final String sql =
         String.format(
-            "create pipe a2b with source ('source'='iotdb-source', 
'database-name'='test', 'table-name'='test1', 'mode.streaming'='true') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            "create pipe a2b with source ('source'='iotdb-source', 
'database-name'='Test', 'table-name'='Test1', 'mode.streaming'='true') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
             receiverDataNode.getIpAndPortString());
     try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
         final Statement statement = connection.createStatement()) {
@@ -114,10 +115,11 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeTableModelDualManualIT {
     }
 
     // Alter pipe (modify)
+    // The database & table name will be converted to lower case
     try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
         final Statement statement = connection.createStatement()) {
       statement.execute(
-          "alter pipe a2b modify source 
('table-name'='test1','database-name'='test1')");
+          "alter pipe a2b modify source 
('table-name'='Test1','database-name'='Test1')");
     } catch (SQLException e) {
       fail(e.getMessage());
     }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 3dcb2d19b0a..afee5f5abba 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 /**
@@ -71,10 +72,25 @@ public class PipeParameters {
     return false;
   }
 
-  public void addAttribute(final String key, String values) {
+  public void addAttribute(final String key, final String values) {
     attributes.put(KeyReducer.reduce(key), values);
   }
 
+  public void computeAttributeIfExists(
+      final BiFunction<String, String, String> valueComputer, final String... 
keys) {
+    for (String key : keys) {
+      if (attributes.containsKey(key)) {
+        attributes.compute(key, valueComputer);
+        return;
+      }
+      key = KeyReducer.reduce(key);
+      if (attributes.containsKey(key)) {
+        attributes.compute(key, valueComputer);
+        return;
+      }
+    }
+  }
+
   public String getString(final String key) {
     final String value = attributes.get(key);
     return value != null ? value : attributes.get(KeyReducer.reduce(key));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 5291fbd4b67..4bcbca3102c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -909,6 +909,8 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
     checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName, 
false);
     checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName, false);
 
+    mayChangeSourcePattern(extractorAttributes);
+
     return new CreatePipeTask(node);
   }
 
@@ -947,6 +949,38 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
     }
   }
 
+  private static void mayChangeSourcePattern(final Map<String, String> 
extractorAttributes) {
+    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
+
+    final String pluginName =
+        extractorParameters
+            .getStringOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
+
+    if 
(!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+        && 
!pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+      return;
+    }
+
+    // Use lower case because database + table name are all in lower cases
+    extractorParameters.computeAttributeIfExists(
+        (k, v) -> v.toLowerCase(Locale.ENGLISH),
+        PipeExtractorConstant.EXTRACTOR_DATABASE_KEY,
+        PipeExtractorConstant.SOURCE_DATABASE_KEY,
+        PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
+        PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY);
+
+    extractorParameters.computeAttributeIfExists(
+        (k, v) -> v.toLowerCase(Locale.ENGLISH),
+        PipeExtractorConstant.EXTRACTOR_TABLE_KEY,
+        PipeExtractorConstant.SOURCE_TABLE_KEY,
+        PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
+        PipeExtractorConstant.SOURCE_TABLE_NAME_KEY);
+  }
+
   public static void checkAndEnrichSinkUserName(
       final String pipeName,
       final Map<String, String> connectorAttributes,
@@ -1005,6 +1039,7 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
           SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
       checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName, 
true);
     }
+    mayChangeSourcePattern(extractorAttributes);
 
     if (node.isReplaceAllConnectorAttributes()) {
       checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName, true);

Reply via email to