This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-IT-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-IT-fix by this push:
new 798e656a6d1 fix
798e656a6d1 is described below
commit 798e656a6d12b14a71b3682de0e3668775a34eea
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 31 10:25:17 2025 +0800
fix
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 98 +++++++++++++++-------
.../sink/protocol/opcua/client/ClientRunner.java | 4 +-
2 files changed, 69 insertions(+), 33 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 4c04b793197..2cb9bebbe0a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -50,6 +50,7 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.File;
+import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -75,40 +76,75 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
sinkAttributes.put("opcua.model", "client-server");
sinkAttributes.put("security-policy", "None");
- final int[] ports = EnvUtils.searchAvailablePorts();
- final int tcpPort = ports[0];
- final int httpsPort = ports[1];
- sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
- sinkAttributes.put("https.port", Integer.toString(httpsPort));
+ OpcUaClient opcUaClient;
+ DataValue value;
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ final int tcpPort = ports[0];
+ final int httpsPort = ports[1];
+ sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", sinkAttributes)
- .setExtractorAttributes(Collections.singletonMap("user",
"root"))
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+ .setProcessorAttributes(Collections.emptyMap()))
+ .getCode());
- final OpcUaClient opcUaClient =
- getOpcUaClient(
- "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
- DataValue value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
- Assert.assertEquals(new Variant(1.0), value.getValue());
- Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+ opcUaClient.disconnect().get();
+ break;
+ }
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- client
- .alterPipe(
- new TAlterPipeReq()
- .setPipeName("testPipe")
- .setIsReplaceAllConnectorAttributes(false)
-
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
- .setProcessorAttributes(Collections.emptyMap())
- .setExtractorAttributes(Collections.emptyMap()))
- .getCode());
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ final int tcpPort = ports[0];
+ final int httpsPort = ports[1];
+ sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
+ sinkAttributes.put("with-quality", "true");
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("testPipe")
+ .setIsReplaceAllConnectorAttributes(true)
+ .setConnectorAttributes(sinkAttributes)
+ .setProcessorAttributes(Collections.emptyMap())
+ .setExtractorAttributes(Collections.emptyMap()))
+ .getCode());
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ break;
+ }
TestUtils.executeNonQuery(
env,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 725ecbbae98..402091598f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -102,11 +102,11 @@ public class ClientRunner {
configurableUaClient.run(client);
} catch (final Exception e) {
throw new PipeException(
- "Error running opc client: " + e.getClass().getSimpleName() + ": "
+ e.getMessage());
+ "Error running opc client: " + e.getClass().getSimpleName() + ": "
+ e.getMessage(), e);
}
} catch (final Exception e) {
throw new PipeException(
- "Error getting opc client: " + e.getClass().getSimpleName() + ": " +
e.getMessage());
+ "Error getting opc client: " + e.getClass().getSimpleName() + ": " +
e.getMessage(), e);
}
}
}