This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-IT-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/opc-IT-fix by this push:
     new 798e656a6d1 fix
798e656a6d1 is described below

commit 798e656a6d12b14a71b3682de0e3668775a34eea
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 31 10:25:17 2025 +0800

    fix
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 98 +++++++++++++++-------
 .../sink/protocol/opcua/client/ClientRunner.java   |  4 +-
 2 files changed, 69 insertions(+), 33 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 4c04b793197..2cb9bebbe0a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -50,6 +50,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.io.File;
+import java.net.ConnectException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -75,40 +76,75 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
       sinkAttributes.put("opcua.model", "client-server");
       sinkAttributes.put("security-policy", "None");
 
-      final int[] ports = EnvUtils.searchAvailablePorts();
-      final int tcpPort = ports[0];
-      final int httpsPort = ports[1];
-      sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
-      sinkAttributes.put("https.port", Integer.toString(httpsPort));
+      OpcUaClient opcUaClient;
+      DataValue value;
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        final int tcpPort = ports[0];
+        final int httpsPort = ports[1];
+        sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", sinkAttributes)
-                      .setExtractorAttributes(Collections.singletonMap("user", 
"root"))
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .createPipe(
+                    new TCreatePipeReq("testPipe", sinkAttributes)
+                        
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+                        .setProcessorAttributes(Collections.emptyMap()))
+                .getCode());
 
-      final OpcUaClient opcUaClient =
-          getOpcUaClient(
-              "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
-      DataValue value =
-          opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
-      Assert.assertEquals(new Variant(1.0), value.getValue());
-      Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        value =
+            opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+        Assert.assertEquals(new Variant(1.0), value.getValue());
+        Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        opcUaClient.disconnect().get();
+        break;
+      }
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .alterPipe(
-                  new TAlterPipeReq()
-                      .setPipeName("testPipe")
-                      .setIsReplaceAllConnectorAttributes(false)
-                      
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
-                      .setProcessorAttributes(Collections.emptyMap())
-                      .setExtractorAttributes(Collections.emptyMap()))
-              .getCode());
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        final int tcpPort = ports[0];
+        final int httpsPort = ports[1];
+        sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
+        sinkAttributes.put("with-quality", "true");
+
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .alterPipe(
+                    new TAlterPipeReq()
+                        .setPipeName("testPipe")
+                        .setIsReplaceAllConnectorAttributes(true)
+                        .setConnectorAttributes(sinkAttributes)
+                        .setProcessorAttributes(Collections.emptyMap())
+                        .setExtractorAttributes(Collections.emptyMap()))
+                .getCode());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        break;
+      }
 
       TestUtils.executeNonQuery(
           env,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 725ecbbae98..402091598f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -102,11 +102,11 @@ public class ClientRunner {
         configurableUaClient.run(client);
       } catch (final Exception e) {
         throw new PipeException(
-            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage());
+            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage(), e);
       }
     } catch (final Exception e) {
       throw new PipeException(
-          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage());
+          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage(), e);
     }
   }
 }

Reply via email to