This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a3a0c6193f8 Pipe: Converted source database / table name pattern to
lower case (#15285)
a3a0c6193f8 is described below
commit a3a0c6193f8a31574c43bff7c8dcced4402342c0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 10 11:38:50 2025 +0800
Pipe: Converted source database / table name pattern to lower case (#15285)
---
.../tablemodel/manual/basic/IoTDBPipeAlterIT.java | 6 ++--
.../api/customizer/parameter/PipeParameters.java | 18 ++++++++++-
.../execution/config/TableConfigTaskVisitor.java | 35 ++++++++++++++++++++++
3 files changed, 56 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
index 72806ccbb2a..61b29c30cc2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java
@@ -59,9 +59,10 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
// Create pipe
+ // The database & table name will be converted to lower case
final String sql =
String.format(
- "create pipe a2b with source ('source'='iotdb-source',
'database-name'='test', 'table-name'='test1', 'mode.streaming'='true') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ "create pipe a2b with source ('source'='iotdb-source',
'database-name'='Test', 'table-name'='Test1', 'mode.streaming'='true') with
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
@@ -114,10 +115,11 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelDualManualIT {
}
// Alter pipe (modify)
+ // The database & table name will be converted to lower case
try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
- "alter pipe a2b modify source
('table-name'='test1','database-name'='test1')");
+ "alter pipe a2b modify source
('table-name'='Test1','database-name'='Test1')");
} catch (SQLException e) {
fail(e.getMessage());
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 3dcb2d19b0a..afee5f5abba 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
@@ -71,10 +72,25 @@ public class PipeParameters {
return false;
}
- public void addAttribute(final String key, String values) {
+ public void addAttribute(final String key, final String values) {
attributes.put(KeyReducer.reduce(key), values);
}
+ public void computeAttributeIfExists(
+ final BiFunction<String, String, String> valueComputer, final String...
keys) {
+ for (String key : keys) {
+ if (attributes.containsKey(key)) {
+ attributes.compute(key, valueComputer);
+ return;
+ }
+ key = KeyReducer.reduce(key);
+ if (attributes.containsKey(key)) {
+ attributes.compute(key, valueComputer);
+ return;
+ }
+ }
+ }
+
public String getString(final String key) {
final String value = attributes.get(key);
return value != null ? value : attributes.get(KeyReducer.reduce(key));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 5291fbd4b67..4bcbca3102c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -909,6 +909,8 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName,
false);
checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName, false);
+ mayChangeSourcePattern(extractorAttributes);
+
return new CreatePipeTask(node);
}
@@ -947,6 +949,38 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
}
}
+ private static void mayChangeSourcePattern(final Map<String, String>
extractorAttributes) {
+ final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
+
+ final String pluginName =
+ extractorParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ if
(!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ &&
!pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+ return;
+ }
+
+ // Use lower case because database + table name are all in lower cases
+ extractorParameters.computeAttributeIfExists(
+ (k, v) -> v.toLowerCase(Locale.ENGLISH),
+ PipeExtractorConstant.EXTRACTOR_DATABASE_KEY,
+ PipeExtractorConstant.SOURCE_DATABASE_KEY,
+ PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
+ PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY);
+
+ extractorParameters.computeAttributeIfExists(
+ (k, v) -> v.toLowerCase(Locale.ENGLISH),
+ PipeExtractorConstant.EXTRACTOR_TABLE_KEY,
+ PipeExtractorConstant.SOURCE_TABLE_KEY,
+ PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
+ PipeExtractorConstant.SOURCE_TABLE_NAME_KEY);
+ }
+
public static void checkAndEnrichSinkUserName(
final String pipeName,
final Map<String, String> connectorAttributes,
@@ -1005,6 +1039,7 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName,
true);
}
+ mayChangeSourcePattern(extractorAttributes);
if (node.isReplaceAllConnectorAttributes()) {
checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName, true);