This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 18d31d8008a Pipe: Added conflict detection of params for reusing of 
OPCUA server (#13458) (#13467)
18d31d8008a is described below

commit 18d31d8008a26f72e8e5d5a1c5a32590b982c918
Author: Caideyipi <[email protected]>
AuthorDate: Wed Sep 11 10:16:34 2024 +0800

    Pipe: Added conflict detection of params for reusing of OPCUA server 
(#13458) (#13467)
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 13 ++++-
 .../connector/protocol/opcua/OpcUaConnector.java   | 57 +++++++++++++---------
 .../connector/protocol/opcua/OpcUaNameSpace.java   | 18 ++++++-
 .../protocol/opcua/OpcUaServerBuilder.java         | 43 +++++++++++++---
 4 files changed, 98 insertions(+), 33 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 932c8bd6c29..14567b01f11 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -63,7 +63,18 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", 
Collections.singletonMap("sink", "opc-ua-sink"))
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(Collections.emptyMap())
+                      .setProcessorAttributes(Collections.emptyMap()))
+              .getCode());
+
+      // Test conflict
+      connectorAttributes.put("password", "conflict");
+      Assert.assertEquals(
+          TSStatusCode.PIPE_ERROR.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
                       .setExtractorAttributes(Collections.emptyMap())
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index a406eb0d1f4..02d2017ba11 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -137,31 +138,41 @@ public class OpcUaConnector implements PipeConnector {
 
       nameSpace =
           SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP
-              .computeIfAbsent(
+              .compute(
                   serverKey,
-                  key -> {
+                  (key, oldValue) -> {
                     try {
-                      final OpcUaServer newServer =
-                          new OpcUaServerBuilder()
-                              .setTcpBindPort(tcpBindPort)
-                              .setHttpsBindPort(httpsBindPort)
-                              .setUser(user)
-                              .setPassword(password)
-                              .setSecurityDir(securityDir)
-                              .setEnableAnonymousAccess(enableAnonymousAccess)
-                              .build();
-                      nameSpace =
-                          new OpcUaNameSpace(
-                              newServer,
-                              parameters
-                                  .getStringOrDefault(
-                                      Arrays.asList(
-                                          CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
-                                      CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-                                  
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE));
-                      nameSpace.startup();
-                      newServer.startup().get();
-                      return new Pair<>(new AtomicInteger(0), nameSpace);
+                      if (Objects.isNull(oldValue)) {
+                        final OpcUaServerBuilder builder =
+                            new OpcUaServerBuilder()
+                                .setTcpBindPort(tcpBindPort)
+                                .setHttpsBindPort(httpsBindPort)
+                                .setUser(user)
+                                .setPassword(password)
+                                .setSecurityDir(securityDir)
+                                
.setEnableAnonymousAccess(enableAnonymousAccess);
+                        final OpcUaServer newServer = builder.build();
+                        nameSpace =
+                            new OpcUaNameSpace(
+                                newServer,
+                                parameters
+                                    .getStringOrDefault(
+                                        Arrays.asList(
+                                            CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
+                                        CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+                                    
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+                                builder);
+                        nameSpace.startup();
+                        newServer.startup().get();
+                        return new Pair<>(new AtomicInteger(0), nameSpace);
+                      } else {
+                        oldValue
+                            .getRight()
+                            .checkEquals(user, password, securityDir, 
enableAnonymousAccess);
+                        return oldValue;
+                      }
+                    } catch (final PipeException e) {
+                      throw e;
                     } catch (final Exception e) {
                       throw new PipeException("Failed to build and startup 
OpcUaServer", e);
                     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index db7dcb1bfe3..38bb9fb1534 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -52,6 +52,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
 
+import java.nio.file.Paths;
 import java.sql.Date;
 import java.time.LocalDate;
 import java.time.ZoneId;
@@ -63,10 +64,15 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
   private final boolean isClientServerModel;
   private final SubscriptionModel subscriptionModel;
+  private final OpcUaServerBuilder builder;
 
-  OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) {
+  OpcUaNameSpace(
+      final OpcUaServer server,
+      final boolean isClientServerModel,
+      final OpcUaServerBuilder builder) {
     super(server, NAMESPACE_URI);
     this.isClientServerModel = isClientServerModel;
+    this.builder = builder;
 
     subscriptionModel = new SubscriptionModel(server, this);
     getLifecycleManager().addLifecycle(subscriptionModel);
@@ -370,4 +376,14 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
   public void onMonitoringModeChanged(final List<MonitoredItem> 
monitoredItems) {
     subscriptionModel.onMonitoringModeChanged(monitoredItems);
   }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  void checkEquals(
+      final String user,
+      final String password,
+      final String securityDir,
+      final boolean enableAnonymousAccess) {
+    builder.checkEquals(user, password, Paths.get(securityDir), 
enableAnonymousAccess);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
index 144d7c9dc2a..316a4fb72b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -57,6 +58,7 @@ import java.security.KeyPair;
 import java.security.cert.X509Certificate;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 import static com.google.common.collect.Lists.newArrayList;
@@ -81,7 +83,7 @@ public class OpcUaServerBuilder {
   private Path securityDir;
   private boolean enableAnonymousAccess;
 
-  public OpcUaServerBuilder() {
+  OpcUaServerBuilder() {
     tcpBindPort = 
PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
     httpsBindPort = 
PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
     user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -91,37 +93,37 @@ public class OpcUaServerBuilder {
         
PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
   }
 
-  public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+  OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
     return this;
   }
 
-  public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+  OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
     this.httpsBindPort = httpsBindPort;
     return this;
   }
 
-  public OpcUaServerBuilder setUser(final String user) {
+  OpcUaServerBuilder setUser(final String user) {
     this.user = user;
     return this;
   }
 
-  public OpcUaServerBuilder setPassword(final String password) {
+  OpcUaServerBuilder setPassword(final String password) {
     this.password = password;
     return this;
   }
 
-  public OpcUaServerBuilder setSecurityDir(final String securityDir) {
+  OpcUaServerBuilder setSecurityDir(final String securityDir) {
     this.securityDir = Paths.get(securityDir);
     return this;
   }
 
-  public OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
+  OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
     this.enableAnonymousAccess = enableAnonymousAccess;
     return this;
   }
 
-  public OpcUaServer build() throws Exception {
+  OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
       throw new PipeException("Unable to create security dir: " + securityDir);
@@ -298,4 +300,29 @@ public class OpcUaServerBuilder {
         .setBindPort(httpsBindPort)
         .build();
   }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  void checkEquals(
+      final String user,
+      final String password,
+      final Path securityDir,
+      final boolean enableAnonymousAccess) {
+    checkEquals("user", this.user, user);
+    checkEquals("password", this.password, password);
+    checkEquals(
+        "security dir",
+        
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+        
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+    checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, 
enableAnonymousAccess);
+  }
+
+  private void checkEquals(final String attrName, final Object thisAttr, final 
Object thatAttr) {
+    if (!Objects.equals(thisAttr, thatAttr)) {
+      throw new PipeException(
+          String.format(
+              "The existing server with tcp port %s and https port %s's %s %s 
conflicts to the new %s %s, reject reusing.",
+              tcpBindPort, httpsBindPort, attrName, thisAttr, attrName, 
thatAttr));
+    }
+  }
 }

Reply via email to