This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch cp-opc-client in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c89256db2378d90f2f742c26bcb5885789463883 Author: Caideyipi <[email protected]> AuthorDate: Thu Mar 26 16:58:23 2026 +0800 spt --- .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 336 ++++++++++----------- 1 file changed, 168 insertions(+), 168 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 640336d6f0d..a5443561ddc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -63,193 +63,193 @@ import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { - @Test - public void testOPCUAServerSink() throws Exception { - int tcpPort = -1; - int httpsPort = -1; - try (final SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { + @Test + public void testOPCUAServerSink() throws Exception { + int tcpPort = -1; + int httpsPort = -1; + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { - TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null); + TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null); - final Map<String, String> sinkAttributes = new HashMap<>(); + final Map<String, String> sinkAttributes = new HashMap<>(); - sinkAttributes.put("sink", "opc-ua-sink"); - sinkAttributes.put("model", "client-server"); - sinkAttributes.put("opcua.security-policy", "None"); + sinkAttributes.put("sink", "opc-ua-sink"); + sinkAttributes.put("model", "client-server"); + sinkAttributes.put("opcua.security-policy", "None"); - OpcUaClient opcUaClient; - DataValue value; - while (true) { - final int[] ports = EnvUtils.searchAvailablePorts(); - tcpPort = ports[0]; - httpsPort = ports[1]; - sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); - sinkAttributes.put("https.port", Integer.toString(httpsPort)); + OpcUaClient opcUaClient; + DataValue value; + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(Collections.singletonMap("user", "root")) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.singletonMap("user", "root")) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); - try { - opcUaClient = - getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); - } catch (final PipeException e) { - if (e.getCause() instanceof ConnectException) { - continue; - } else { - throw e; - } - } - value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); - Assert.assertEquals(new Variant(1.0), value.getValue()); - Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); - opcUaClient.disconnect().get(); - break; - } + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + opcUaClient.disconnect().get(); + break; + } - // Create the region first to avoid tsFile parsing - TestUtils.executeNonQueries( - env, - Arrays.asList( - "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", - "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)"), - null); + // Create the region first to avoid tsFile parsing + TestUtils.executeNonQueries( + env, + Arrays.asList( + "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", + "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)"), + null); - while (true) { - final int[] ports = EnvUtils.searchAvailablePorts(); - tcpPort = ports[0]; - httpsPort = ports[1]; - sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); - sinkAttributes.put("https.port", Integer.toString(httpsPort)); - sinkAttributes.put("with-quality", "true"); + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + sinkAttributes.put("with-quality", "true"); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .alterPipe( - new TAlterPipeReq() - .setPipeName("testPipe") - .setIsReplaceAllConnectorAttributes(true) - .setConnectorAttributes(sinkAttributes) - .setProcessorAttributes(Collections.emptyMap()) - .setExtractorAttributes(Collections.emptyMap())) - .getCode()); - try { - opcUaClient = - getOpcUaClient( - "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); - } catch (final PipeException e) { - if (e.getCause() instanceof ConnectException) { - continue; - } else { - throw e; - } - } - break; - } + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .alterPipe( + new TAlterPipeReq() + .setPipeName("testPipe") + .setIsReplaceAllConnectorAttributes(true) + .setConnectorAttributes(sinkAttributes) + .setProcessorAttributes(Collections.emptyMap()) + .setExtractorAttributes(Collections.emptyMap())) + .getCode()); + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + break; + } - TestUtils.executeNonQuery( - env, - "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", - null); + TestUtils.executeNonQuery( + env, + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + null); - long startTime = System.currentTimeMillis(); - while (true) { - try { - value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); - Assert.assertEquals(new Variant(1.0), value.getValue()); - Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); - Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); - break; - } catch (final Throwable t) { - if (System.currentTimeMillis() - startTime > 10_000L) { - throw t; - } - } - } + long startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } - TestUtils.executeNonQuery( - env, "insert into root.db.opc(time, quality) values (2, true)", null); - TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); + TestUtils.executeNonQuery( + env, "insert into root.db.opc(time, quality) values (2, true)", null); + TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); - startTime = System.currentTimeMillis(); - while (true) { - try { - value = - opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); - Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); - Assert.assertEquals(new Variant(2.0), value.getValue()); - Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); - break; - } catch (final Throwable t) { - if (System.currentTimeMillis() - startTime > 10_000L) { - throw t; - } - } - } + startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); + Assert.assertEquals(new Variant(2.0), value.getValue()); + Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } - opcUaClient.disconnect().get(); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); + opcUaClient.disconnect().get(); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); - // Test reconstruction - sinkAttributes.put("password", "test"); - sinkAttributes.put("security-policy", "basic256sha256"); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(Collections.emptyMap()) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + // Test reconstruction + sinkAttributes.put("password", "test"); + sinkAttributes.put("security-policy", "basic256sha256"); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.emptyMap()) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); - // Banned none, only allows basic256sha256 - final int finalTcpPort = tcpPort; - Assert.assertThrows( - PipeException.class, - () -> - getOpcUaClient( - "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb", - SecurityPolicy.None, - "root", - "root")); + // Banned none, only allows basic256sha256 + final int finalTcpPort = tcpPort; + Assert.assertThrows( + PipeException.class, + () -> + getOpcUaClient( + "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb", + SecurityPolicy.None, + "root", + "root")); - // Test conflict - sinkAttributes.put("password", "conflict"); - try { - TestUtils.executeNonQuery( - env, - String.format( - "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')", - tcpPort, httpsPort), - null); - Assert.fail(); - } catch (final Exception e) { - Assert.assertEquals( - String.format( - "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.", - tcpPort, httpsPort), - e.getMessage()); - } - } finally { - if (tcpPort >= 0) { - final String lockPath = EnvUtils.getLockFilePath(tcpPort); - if (!new File(lockPath).delete()) { - System.out.printf("Delete lock file %s failed%n", lockPath); - } - } + // Test conflict + sinkAttributes.put("password", "conflict"); + try { + TestUtils.executeNonQuery( + env, + String.format( + "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')", + tcpPort, httpsPort), + null); + Assert.fail(); + } catch (final Exception e) { + Assert.assertEquals( + String.format( + "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.", + tcpPort, httpsPort), + e.getMessage()); + } + } finally { + if (tcpPort >= 0) { + final String lockPath = EnvUtils.getLockFilePath(tcpPort); + if (!new File(lockPath).delete()) { + System.out.printf("Delete lock file %s failed%n", lockPath); } + } } + } private static OpcUaClient getOpcUaClient( final String nodeUrl,
