This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b04f8192231 Pipe: Fixed the unstable OPC UA IT (#16972)
b04f8192231 is described below
commit b04f81922319296d2e49370834a94e84a3345fa3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 31 13:56:02 2025 +0800
Pipe: Fixed the unstable OPC UA IT (#16972)
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 135 +++++++++++++++------
.../sink/protocol/opcua/client/ClientRunner.java | 4 +-
2 files changed, 103 insertions(+), 36 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 4c04b793197..d929f747ebb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
@@ -45,11 +46,14 @@ import
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.File;
+import java.net.ConnectException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -62,8 +66,25 @@ import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
+
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(1);
+ env = MultiEnvFactory.getEnv(0);
+ env.getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setPipeMemoryManagementEnabled(false)
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
+ env.initClusterEnvironment(1, 1);
+ }
+
@Test
public void testOPCUAServerSink() throws Exception {
+ int tcpPort = -1;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
@@ -75,44 +96,82 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
sinkAttributes.put("opcua.model", "client-server");
sinkAttributes.put("security-policy", "None");
- final int[] ports = EnvUtils.searchAvailablePorts();
- final int tcpPort = ports[0];
- final int httpsPort = ports[1];
- sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
- sinkAttributes.put("https.port", Integer.toString(httpsPort));
+ OpcUaClient opcUaClient;
+ DataValue value;
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ tcpPort = ports[0];
+ final int httpsPort = ports[1];
+ sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", sinkAttributes)
- .setExtractorAttributes(Collections.singletonMap("user",
"root"))
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+ .setProcessorAttributes(Collections.emptyMap()))
+ .getCode());
- final OpcUaClient opcUaClient =
- getOpcUaClient(
- "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
- DataValue value =
- opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
- Assert.assertEquals(new Variant(1.0), value.getValue());
- Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2,
"root/db/d1/s1")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)),
value.getSourceTime());
+ opcUaClient.disconnect().get();
+ break;
+ }
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- client
- .alterPipe(
- new TAlterPipeReq()
- .setPipeName("testPipe")
- .setIsReplaceAllConnectorAttributes(false)
-
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
- .setProcessorAttributes(Collections.emptyMap())
- .setExtractorAttributes(Collections.emptyMap()))
- .getCode());
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ tcpPort = ports[0];
+ final int httpsPort = ports[1];
+ sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
+ sinkAttributes.put("with-quality", "true");
- TestUtils.executeNonQuery(
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("testPipe")
+ .setIsReplaceAllConnectorAttributes(true)
+ .setConnectorAttributes(sinkAttributes)
+ .setProcessorAttributes(Collections.emptyMap())
+ .setExtractorAttributes(Collections.emptyMap()))
+ .getCode());
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ break;
+ }
+
+ // Create aligned timeSeries to avoid tsFile parsing
+ TestUtils.executeNonQueries(
env,
- "insert into root.db.opc(time, value, quality, other) values (1, 1,
false, 1)",
+ Arrays.asList(
+ "create aligned timeSeries root.db.opc(value double, quality
boolean, other int32)",
+ "insert into root.db.opc(time, value, quality, other) values (1,
1, false, 1)"),
null);
long startTime = System.currentTimeMillis();
@@ -168,11 +227,12 @@ public class IoTDBPipeOPCUAIT extends
AbstractPipeSingleIT {
.getCode());
// Banned none, only allows basic256sha256
+ final int finalTcpPort = tcpPort;
Assert.assertThrows(
PipeException.class,
() ->
getOpcUaClient(
- "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
+ "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
SecurityPolicy.None,
"root",
"root"));
@@ -188,6 +248,13 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT
{
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing
server with tcp port 12686 and https port 8443's password **** conflicts to the
new password ****, reject reusing.",
e.getMessage());
}
+ } finally {
+ if (tcpPort >= 0) {
+ final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+ if (!new File(lockPath).delete()) {
+ System.out.printf("Delete lock file %s failed%n", lockPath);
+ }
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
index 725ecbbae98..402091598f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -102,11 +102,11 @@ public class ClientRunner {
configurableUaClient.run(client);
} catch (final Exception e) {
throw new PipeException(
- "Error running opc client: " + e.getClass().getSimpleName() + ": "
+ e.getMessage());
+ "Error running opc client: " + e.getClass().getSimpleName() + ": "
+ e.getMessage(), e);
}
} catch (final Exception e) {
throw new PipeException(
- "Error getting opc client: " + e.getClass().getSimpleName() + ": " +
e.getMessage());
+ "Error getting opc client: " + e.getClass().getSimpleName() + ": " +
e.getMessage(), e);
}
}
}