This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 18d31d8008a Pipe: Added conflict detection of params for reusing of
OPCUA server (#13458) (#13467)
18d31d8008a is described below
commit 18d31d8008a26f72e8e5d5a1c5a32590b982c918
Author: Caideyipi <[email protected]>
AuthorDate: Wed Sep 11 10:16:34 2024 +0800
Pipe: Added conflict detection of params for reusing of OPCUA server
(#13458) (#13467)
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 13 ++++-
.../connector/protocol/opcua/OpcUaConnector.java | 57 +++++++++++++---------
.../connector/protocol/opcua/OpcUaNameSpace.java | 18 ++++++-
.../protocol/opcua/OpcUaServerBuilder.java | 43 +++++++++++++---
4 files changed, 98 insertions(+), 33 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 932c8bd6c29..14567b01f11 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -63,7 +63,18 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe",
Collections.singletonMap("sink", "opc-ua-sink"))
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(Collections.emptyMap())
+ .setProcessorAttributes(Collections.emptyMap()))
+ .getCode());
+
+ // Test conflict
+ connectorAttributes.put("password", "conflict");
+ Assert.assertEquals(
+ TSStatusCode.PIPE_ERROR.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index a406eb0d1f4..02d2017ba11 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -137,31 +138,41 @@ public class OpcUaConnector implements PipeConnector {
nameSpace =
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP
- .computeIfAbsent(
+ .compute(
serverKey,
- key -> {
+ (key, oldValue) -> {
try {
- final OpcUaServer newServer =
- new OpcUaServerBuilder()
- .setTcpBindPort(tcpBindPort)
- .setHttpsBindPort(httpsBindPort)
- .setUser(user)
- .setPassword(password)
- .setSecurityDir(securityDir)
- .setEnableAnonymousAccess(enableAnonymousAccess)
- .build();
- nameSpace =
- new OpcUaNameSpace(
- newServer,
- parameters
- .getStringOrDefault(
- Arrays.asList(
- CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
- CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE));
- nameSpace.startup();
- newServer.startup().get();
- return new Pair<>(new AtomicInteger(0), nameSpace);
+ if (Objects.isNull(oldValue)) {
+ final OpcUaServerBuilder builder =
+ new OpcUaServerBuilder()
+ .setTcpBindPort(tcpBindPort)
+ .setHttpsBindPort(httpsBindPort)
+ .setUser(user)
+ .setPassword(password)
+ .setSecurityDir(securityDir)
+
.setEnableAnonymousAccess(enableAnonymousAccess);
+ final OpcUaServer newServer = builder.build();
+ nameSpace =
+ new OpcUaNameSpace(
+ newServer,
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+ builder);
+ nameSpace.startup();
+ newServer.startup().get();
+ return new Pair<>(new AtomicInteger(0), nameSpace);
+ } else {
+ oldValue
+ .getRight()
+ .checkEquals(user, password, securityDir,
enableAnonymousAccess);
+ return oldValue;
+ }
+ } catch (final PipeException e) {
+ throw e;
} catch (final Exception e) {
throw new PipeException("Failed to build and startup
OpcUaServer", e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index db7dcb1bfe3..38bb9fb1534 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -52,6 +52,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import java.nio.file.Paths;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneId;
@@ -63,10 +64,15 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
private final boolean isClientServerModel;
private final SubscriptionModel subscriptionModel;
+ private final OpcUaServerBuilder builder;
- OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) {
+ OpcUaNameSpace(
+ final OpcUaServer server,
+ final boolean isClientServerModel,
+ final OpcUaServerBuilder builder) {
super(server, NAMESPACE_URI);
this.isClientServerModel = isClientServerModel;
+ this.builder = builder;
subscriptionModel = new SubscriptionModel(server, this);
getLifecycleManager().addLifecycle(subscriptionModel);
@@ -370,4 +376,14 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
public void onMonitoringModeChanged(final List<MonitoredItem>
monitoredItems) {
subscriptionModel.onMonitoringModeChanged(monitoredItems);
}
+
+ /////////////////////////////// Conflict detection
///////////////////////////////
+
+ void checkEquals(
+ final String user,
+ final String password,
+ final String securityDir,
+ final boolean enableAnonymousAccess) {
+ builder.checkEquals(user, password, Paths.get(securityDir),
enableAnonymousAccess);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
index 144d7c9dc2a..316a4fb72b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -57,6 +58,7 @@ import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import static com.google.common.collect.Lists.newArrayList;
@@ -81,7 +83,7 @@ public class OpcUaServerBuilder {
private Path securityDir;
private boolean enableAnonymousAccess;
- public OpcUaServerBuilder() {
+ OpcUaServerBuilder() {
tcpBindPort =
PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
httpsBindPort =
PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -91,37 +93,37 @@ public class OpcUaServerBuilder {
PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
}
- public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+ OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
return this;
}
- public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+ OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
this.httpsBindPort = httpsBindPort;
return this;
}
- public OpcUaServerBuilder setUser(final String user) {
+ OpcUaServerBuilder setUser(final String user) {
this.user = user;
return this;
}
- public OpcUaServerBuilder setPassword(final String password) {
+ OpcUaServerBuilder setPassword(final String password) {
this.password = password;
return this;
}
- public OpcUaServerBuilder setSecurityDir(final String securityDir) {
+ OpcUaServerBuilder setSecurityDir(final String securityDir) {
this.securityDir = Paths.get(securityDir);
return this;
}
- public OpcUaServerBuilder setEnableAnonymousAccess(final boolean
enableAnonymousAccess) {
+ OpcUaServerBuilder setEnableAnonymousAccess(final boolean
enableAnonymousAccess) {
this.enableAnonymousAccess = enableAnonymousAccess;
return this;
}
- public OpcUaServer build() throws Exception {
+ OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
throw new PipeException("Unable to create security dir: " + securityDir);
@@ -298,4 +300,29 @@ public class OpcUaServerBuilder {
.setBindPort(httpsBindPort)
.build();
}
+
+ /////////////////////////////// Conflict detection
///////////////////////////////
+
+ void checkEquals(
+ final String user,
+ final String password,
+ final Path securityDir,
+ final boolean enableAnonymousAccess) {
+ checkEquals("user", this.user, user);
+ checkEquals("password", this.password, password);
+ checkEquals(
+ "security dir",
+
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+ checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess,
enableAnonymousAccess);
+ }
+
+ private void checkEquals(final String attrName, final Object thisAttr, final
Object thatAttr) {
+ if (!Objects.equals(thisAttr, thatAttr)) {
+ throw new PipeException(
+ String.format(
+ "The existing server with tcp port %s and https port %s's %s %s
conflicts to the new %s %s, reject reusing.",
+ tcpBindPort, httpsBindPort, attrName, thisAttr, attrName,
thatAttr));
+ }
+ }
}