This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b04f8192231 Pipe: Fixed the unstable OPC UA IT (#16972)
b04f8192231 is described below

commit b04f81922319296d2e49370834a94e84a3345fa3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 31 13:56:02 2025 +0800

    Pipe: Fixed the unstable OPC UA IT (#16972)
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 135 +++++++++++++++------
 .../sink/protocol/opcua/client/ClientRunner.java   |   4 +-
 2 files changed, 103 insertions(+), 36 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 4c04b793197..d929f747ebb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
@@ -45,11 +46,14 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.io.File;
+import java.net.ConnectException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -62,8 +66,25 @@ import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
+
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(1);
+    env = MultiEnvFactory.getEnv(0);
+    env.getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setPipeMemoryManagementEnabled(false)
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
+    env.initClusterEnvironment(1, 1);
+  }
+
   @Test
   public void testOPCUAServerSink() throws Exception {
+    int tcpPort = -1;
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
 
@@ -75,44 +96,82 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
       sinkAttributes.put("opcua.model", "client-server");
       sinkAttributes.put("security-policy", "None");
 
-      final int[] ports = EnvUtils.searchAvailablePorts();
-      final int tcpPort = ports[0];
-      final int httpsPort = ports[1];
-      sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
-      sinkAttributes.put("https.port", Integer.toString(httpsPort));
+      OpcUaClient opcUaClient;
+      DataValue value;
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        final int httpsPort = ports[1];
+        sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", sinkAttributes)
-                      .setExtractorAttributes(Collections.singletonMap("user", 
"root"))
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .createPipe(
+                    new TCreatePipeReq("testPipe", sinkAttributes)
+                        
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+                        .setProcessorAttributes(Collections.emptyMap()))
+                .getCode());
 
-      final OpcUaClient opcUaClient =
-          getOpcUaClient(
-              "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
-      DataValue value =
-          opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
-      Assert.assertEquals(new Variant(1.0), value.getValue());
-      Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        value =
+            opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+        Assert.assertEquals(new Variant(1.0), value.getValue());
+        Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        opcUaClient.disconnect().get();
+        break;
+      }
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .alterPipe(
-                  new TAlterPipeReq()
-                      .setPipeName("testPipe")
-                      .setIsReplaceAllConnectorAttributes(false)
-                      
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
-                      .setProcessorAttributes(Collections.emptyMap())
-                      .setExtractorAttributes(Collections.emptyMap()))
-              .getCode());
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        final int httpsPort = ports[1];
+        sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
+        sinkAttributes.put("with-quality", "true");
 
-      TestUtils.executeNonQuery(
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .alterPipe(
+                    new TAlterPipeReq()
+                        .setPipeName("testPipe")
+                        .setIsReplaceAllConnectorAttributes(true)
+                        .setConnectorAttributes(sinkAttributes)
+                        .setProcessorAttributes(Collections.emptyMap())
+                        .setExtractorAttributes(Collections.emptyMap()))
+                .getCode());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        break;
+      }
+
+      // Create aligned timeSeries to avoid tsFile parsing
+      TestUtils.executeNonQueries(
           env,
-          "insert into root.db.opc(time, value, quality, other) values (1, 1, 
false, 1)",
+          Arrays.asList(
+              "create aligned timeSeries root.db.opc(value double, quality 
boolean, other int32)",
+              "insert into root.db.opc(time, value, quality, other) values (1, 
1, false, 1)"),
           null);
 
       long startTime = System.currentTimeMillis();
@@ -168,11 +227,12 @@ public class IoTDBPipeOPCUAIT extends 
AbstractPipeSingleIT {
               .getCode());
 
       // Banned none, only allows basic256sha256
+      final int finalTcpPort = tcpPort;
       Assert.assertThrows(
           PipeException.class,
           () ->
               getOpcUaClient(
-                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
+                  "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
                   SecurityPolicy.None,
                   "root",
                   "root"));
@@ -188,6 +248,13 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT 
{
             "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing 
server with tcp port 12686 and https port 8443's password **** conflicts to the 
new password ****, reject reusing.",
             e.getMessage());
       }
+    } finally {
+      if (tcpPort >= 0) {
+        final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+        if (!new File(lockPath).delete()) {
+          System.out.printf("Delete lock file %s failed%n", lockPath);
+        }
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 725ecbbae98..402091598f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -102,11 +102,11 @@ public class ClientRunner {
         configurableUaClient.run(client);
       } catch (final Exception e) {
         throw new PipeException(
-            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage());
+            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage(), e);
       }
     } catch (final Exception e) {
       throw new PipeException(
-          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage());
+          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage(), e);
     }
   }
 }

Reply via email to