This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch ger-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ger-fix by this push:
new 3235551dd98 f
3235551dd98 is described below
commit 3235551dd98317e8ef9864e4aec32232ec407299
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jan 22 17:00:53 2026 +0800
f
---
.../apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 6 +++---
.../pipe/api/customizer/parameter/PipeParameters.java | 19 ++++++++++++++++++-
.../sink/protocol/opcua/client/IoTDBOpcUaClient.java | 4 +++-
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 67060c6002b..1ae4ec03fb2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -94,8 +94,8 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put("sink", "opc-ua-sink");
- sinkAttributes.put("opcua.model", "client-server");
- sinkAttributes.put("security-policy", "None");
+ sinkAttributes.put("model", "client-server");
+ sinkAttributes.put("opcua.security-policy", "None");
OpcUaClient opcUaClient;
DataValue value;
@@ -103,7 +103,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
final int[] ports = EnvUtils.searchAvailablePorts();
tcpPort = ports[0];
httpsPort = ports[1];
- sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
sinkAttributes.put("https.port", Integer.toString(httpsPort));
Assert.assertEquals(
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 989590f40a6..09b70db1f02 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -92,7 +92,11 @@ public class PipeParameters {
}
public String getString(final String key) {
- final String value = attributes.get(key);
+ String value = attributes.get(key);
+ if (Objects.nonNull(value)) {
+ return value;
+ }
+ value = attributes.get(KeyReducer.shallowReduce(key));
return value != null ? value : attributes.get(KeyReducer.reduce(key));
}
@@ -380,6 +384,19 @@ public class PipeParameters {
SECOND_PREFIXES.add("opcua.");
}
+ static String shallowReduce(String key) {
+ if (key == null) {
+ return null;
+ }
+ final String lowerCaseKey = key.toLowerCase();
+ for (final String prefix : FIRST_PREFIXES) {
+ if (lowerCaseKey.startsWith(prefix)) {
+ return key.substring(prefix.length());
+ }
+ }
+ return key;
+ }
+
static String reduce(String key) {
if (key == null) {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index bf96d988180..c6d8da47878 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -242,7 +242,9 @@ public class IoTDBOpcUaClient {
}
public void disconnect() throws Exception {
- client.disconnect().get();
+ if (Objects.nonNull(client)) {
+ client.disconnect().get();
+ }
}
/////////////////////////////// Getter ///////////////////////////////