This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TableModelV1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d0067ec94f3e75c6194fa5a64330a94b8aa5b8c9 Author: Caideyipi <[email protected]> AuthorDate: Fri Aug 9 11:07:38 2024 +0800 Pipe: Implemented Client-Server model in OPC UA Sink & Enable anonymous access settings & Corrected date time format (#13100) (cherry picked from commit 78439da4127eb42bd398dd0aa87f30f629cd501a) --- iotdb-core/datanode/pom.xml | 4 + .../connector/protocol/opcua/OpcUaConnector.java | 207 +++--------- .../connector/protocol/opcua/OpcUaNameSpace.java | 374 +++++++++++++++++++++ .../protocol/opcua/OpcUaServerBuilder.java | 32 +- .../config/constant/PipeConnectorConstant.java | 13 + pom.xml | 11 + 6 files changed, 477 insertions(+), 164 deletions(-) diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 7ddf561b3fd..cc6aa284a6f 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -178,6 +178,10 @@ <groupId>org.eclipse.milo</groupId> <artifactId>stack-core</artifactId> </dependency> + <dependency> + <groupId>org.eclipse.milo</groupId> + <artifactId>sdk-core</artifactId> + </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java index ba134b5dcc9..390b2fe73bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.opcua; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.PipeConnector; @@ -30,25 +29,15 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.tsfile.common.constant.TsFileConstant; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; -import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode; -import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.UaException; -import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; -import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; -import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.LocalDate; import java.util.Arrays; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -56,15 +45,23 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_MODEL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY; @@ -77,15 +74,25 @@ public class OpcUaConnector implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class); - private static final Map<String, Pair<AtomicInteger, OpcUaServer>> - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>(); + private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>> + SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>(); private String serverKey; - private OpcUaServer server; + private OpcUaNameSpace nameSpace; @Override public void validate(final PipeParameterValidator validator) throws Exception { - // All the parameters are optional + validator + .validateAttributeValueRange( + CONNECTOR_OPC_UA_MODEL_KEY, + true, + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE, + CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE) + .validateAttributeValueRange( + SINK_OPC_UA_MODEL_KEY, + true, + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE, + CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE); } @Override @@ -113,12 +120,18 @@ public class OpcUaConnector implements PipeConnector { parameters.getStringOrDefault( Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY), CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); - - synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) { + final boolean enableAnonymousAccess = + parameters.getBooleanOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY, + SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY), + CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE); + + synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; - server = - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP + nameSpace = + SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP .computeIfAbsent( serverKey, key -> { @@ -130,15 +143,26 @@ public class OpcUaConnector implements PipeConnector { .setUser(user) .setPassword(password) .setSecurityDir(securityDir) + .setEnableAnonymousAccess(enableAnonymousAccess) .build(); - newServer.startup(); - return new Pair<>(new AtomicInteger(0), newServer); + nameSpace = + new OpcUaNameSpace( + newServer, + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) + .equals(CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)); + nameSpace.startup(); + newServer.startup().get(); + return new Pair<>(new AtomicInteger(0), nameSpace); } catch (final Exception e) { throw new PipeException("Failed to build and startup OpcUaServer", e); } }) .getRight(); - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet(); + SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet(); } } @@ -171,14 +195,13 @@ public class OpcUaConnector implements PipeConnector { } if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { - transferTabletWrapper(server, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); + transferTabletWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); } else { - transferTabletWrapper(server, (PipeRawTabletInsertionEvent) tabletInsertionEvent); + transferTabletWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent); } } private void transferTabletWrapper( - final OpcUaServer server, final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws UaException { try { @@ -188,7 +211,7 @@ public class OpcUaConnector implements PipeConnector { return; } for (final Tablet tablet : pipeInsertNodeTabletInsertionEvent.convertToTablets()) { - transferTablet(server, tablet); + nameSpace.transfer(tablet); } } finally { pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount( @@ -196,148 +219,28 @@ public class OpcUaConnector implements PipeConnector { } } - private void transferTabletWrapper( - final OpcUaServer server, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) + private void transferTabletWrapper(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws UaException { try { // We increase the reference count for this event to determine if the event may be released. if (!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName())) { return; } - transferTablet(server, pipeRawTabletInsertionEvent.convertToTablet()); + nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet()); } finally { pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(), false); } } - /** - * Transfer {@link Tablet} into eventNodes and post it on the eventBus, so that they will be heard - * at the subscribers. Notice that an eventNode is reused to reduce object creation costs. - * - * @param server OpcUaServer - * @param tablet the tablet to send - * @throws UaException if failed to create {@link Event} - */ - private void transferTablet(final OpcUaServer server, final Tablet tablet) throws UaException { - // There is no nameSpace, so that nameSpaceIndex is always 0 - final int pseudoNameSpaceIndex = 0; - final BaseEventTypeNode eventNode = - server - .getEventFactory() - .createEvent( - new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType); - // Use eventNode here because other nodes doesn't support values and times simultaneously - for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) { - - final TSDataType dataType = tablet.getSchemas().get(columnIndex).getType(); - - // Source name --> Sensor path, like root.test.d_0.s_0 - eventNode.setSourceName( - tablet.getDeviceId() - + TsFileConstant.PATH_SEPARATOR - + tablet.getSchemas().get(columnIndex).getMeasurementId()); - - // Source node --> Sensor type, like double - eventNode.setSourceNode(convertToOpcDataType(dataType)); - - for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) { - // Filter null value - if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) { - continue; - } - - // Time --> TimeStamp - eventNode.setTime(new DateTime(tablet.timestamps[rowIndex])); - - // Message --> Value - switch (dataType) { - case BOOLEAN: - eventNode.setMessage( - LocalizedText.english( - Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex]))); - break; - case INT32: - eventNode.setMessage( - LocalizedText.english( - Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex]))); - break; - case DATE: - eventNode.setMessage( - LocalizedText.english( - (((LocalDate[]) tablet.values[columnIndex])[rowIndex]).toString())); - break; - case INT64: - case TIMESTAMP: - eventNode.setMessage( - LocalizedText.english( - Long.toString(((long[]) tablet.values[columnIndex])[rowIndex]))); - break; - case FLOAT: - eventNode.setMessage( - LocalizedText.english( - Float.toString(((float[]) tablet.values[columnIndex])[rowIndex]))); - break; - case DOUBLE: - eventNode.setMessage( - LocalizedText.english( - Double.toString(((double[]) tablet.values[columnIndex])[rowIndex]))); - break; - case TEXT: - case BLOB: - case STRING: - eventNode.setMessage( - LocalizedText.english( - ((Binary[]) tablet.values[columnIndex])[rowIndex].toString())); - break; - case VECTOR: - case UNKNOWN: - default: - throw new PipeRuntimeNonCriticalException( - "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType()); - } - - // Send the event - server.getEventBus().post(eventNode); - } - } - eventNode.delete(); - } - - private NodeId convertToOpcDataType(final TSDataType type) { - switch (type) { - case BOOLEAN: - return Identifiers.Boolean; - case INT32: - return Identifiers.Int32; - case DATE: - return Identifiers.DateTime; - case INT64: - case TIMESTAMP: - return Identifiers.Int64; - case FLOAT: - return Identifiers.Float; - case DOUBLE: - return Identifiers.Double; - case TEXT: - case BLOB: - case STRING: - return Identifiers.String; - case VECTOR: - case UNKNOWN: - default: - throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type); - } - } - @Override public void close() throws Exception { if (serverKey == null) { return; } - synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) { - final Pair<AtomicInteger, OpcUaServer> pair = - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey); + synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { + final Pair<AtomicInteger, OpcUaNameSpace> pair = + SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey); if (pair == null) { return; } @@ -346,7 +249,7 @@ public class OpcUaConnector implements PipeConnector { try { pair.getRight().shutdown(); } finally { - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey); + SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.remove(serverKey); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java new file mode 100644 index 00000000000..52c07d49c50 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.protocol.opcua; + +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; +import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter; +import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.pipe.api.event.Event; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.eclipse.milo.opcua.sdk.core.AccessLevel; +import org.eclipse.milo.opcua.sdk.core.Reference; +import org.eclipse.milo.opcua.sdk.server.Lifecycle; +import org.eclipse.milo.opcua.sdk.server.OpcUaServer; +import org.eclipse.milo.opcua.sdk.server.api.DataItem; +import org.eclipse.milo.opcua.sdk.server.api.ManagedNamespaceWithLifecycle; +import org.eclipse.milo.opcua.sdk.server.api.MonitoredItem; +import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode; +import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode; +import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode; +import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel; +import org.eclipse.milo.opcua.stack.core.Identifiers; +import org.eclipse.milo.opcua.stack.core.UaException; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; + +import java.sql.Date; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { + public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; + private final boolean isClientServerModel; + private final SubscriptionModel subscriptionModel; + + OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) { + super(server, NAMESPACE_URI); + this.isClientServerModel = isClientServerModel; + + subscriptionModel = new SubscriptionModel(server, this); + getLifecycleManager().addLifecycle(subscriptionModel); + getLifecycleManager() + .addLifecycle( + new Lifecycle() { + @Override + public void startup() { + // Do nothing + } + + @Override + public void shutdown() { + getServer().shutdown(); + } + }); + } + + void transfer(final Tablet tablet) throws UaException { + if (isClientServerModel) { + transferTabletForClientServerModel(tablet); + } else { + transferTabletForPubSubModel(tablet); + } + } + + private void transferTabletForClientServerModel(final Tablet tablet) { + new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + + final String[] segments = tablet.getDeviceId().split("\\."); + if (segments.length == 0) { + throw new PipeRuntimeCriticalException("The segments of tablets must exist"); + } + final StringBuilder currentStr = new StringBuilder(); + UaFolderNode folderNode = null; + NodeId folderNodeId; + for (final String segment : segments) { + final UaFolderNode nextFolderNode; + + currentStr.append(segment); + folderNodeId = newNodeId(currentStr.toString()); + currentStr.append("/"); + + if (!getNodeManager().containsNode(folderNodeId)) { + nextFolderNode = + new UaFolderNode( + getNodeContext(), + folderNodeId, + newQualifiedName(segment), + LocalizedText.english(segment)); + getNodeManager().addNode(nextFolderNode); + if (Objects.nonNull(folderNode)) { + folderNode.addOrganizes(nextFolderNode); + } else { + nextFolderNode.addReference( + new Reference( + folderNodeId, + Identifiers.Organizes, + Identifiers.ObjectsFolder.expanded(), + false)); + } + folderNode = nextFolderNode; + } else { + folderNode = + (UaFolderNode) + getNodeManager() + .getNode(folderNodeId) + .orElseThrow( + () -> + new PipeRuntimeCriticalException( + String.format( + "The folder node for %s does not exist.", + tablet.getDeviceId()))); + } + } + + final String currentFolder = currentStr.toString(); + for (int i = 0; i < tablet.getSchemas().size(); ++i) { + final IMeasurementSchema measurementSchema = tablet.getSchemas().get(i); + final String name = measurementSchema.getMeasurementId(); + final TSDataType type = measurementSchema.getType(); + final NodeId nodeId = newNodeId(currentFolder + name); + final UaVariableNode measurementNode; + if (!getNodeManager().containsNode(nodeId)) { + measurementNode = + new UaVariableNode.UaVariableNodeBuilder(getNodeContext()) + .setNodeId(newNodeId(currentFolder + name)) + .setAccessLevel(AccessLevel.READ_WRITE) + .setUserAccessLevel(AccessLevel.READ_ONLY) + .setBrowseName(newQualifiedName(name)) + .setDisplayName(LocalizedText.english(name)) + .setDataType(convertToOpcDataType(type)) + .setTypeDefinition(Identifiers.BaseDataVariableType) + .build(); + getNodeManager().addNode(measurementNode); + folderNode.addOrganizes(measurementNode); + } else { + // This must exist + measurementNode = + (UaVariableNode) + getNodeManager() + .getNode(nodeId) + .orElseThrow( + () -> + new PipeRuntimeCriticalException( + String.format("The Node %s does not exist.", nodeId))); + } + + int lastNonnullIndex = -1; + for (int j = 0; j < tablet.rowSize; ++j) { + if (!tablet.bitMaps[i].isMarked(j)) { + lastNonnullIndex = j; + break; + } + } + + if (lastNonnullIndex != -1) { + final long utcTimestamp = timestampToUtc(tablet.timestamps[lastNonnullIndex]); + if (Objects.isNull(measurementNode.getValue()) + || Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime() + < utcTimestamp) { + measurementNode.setValue( + new DataValue( + new Variant(getTabletObjectValue4Opc(tablet.values[i], lastNonnullIndex, type)), + StatusCode.GOOD, + new DateTime(utcTimestamp), + new DateTime())); + } + } + } + } + + private static Object getTabletObjectValue4Opc( + final Object column, final int rowIndex, final TSDataType type) { + switch (type) { + case BOOLEAN: + return ((boolean[]) column)[rowIndex]; + case INT32: + return ((int[]) column)[rowIndex]; + case DATE: + return new DateTime(Date.valueOf(((LocalDate[]) column)[rowIndex])); + case INT64: + return ((long[]) column)[rowIndex]; + case TIMESTAMP: + return new DateTime(timestampToUtc(((long[]) column)[rowIndex])); + case FLOAT: + return ((float[]) column)[rowIndex]; + case DOUBLE: + return ((double[]) column)[rowIndex]; + case TEXT: + case BLOB: + case STRING: + return ((Binary[]) column)[rowIndex].toString(); + default: + throw new UnSupportedDataTypeException("UnSupported dataType " + type); + } + } + + private static long timestampToUtc(final long timeStamp) { + return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 116444736000000000L; + } + + /** + * Transfer {@link Tablet} into eventNodes and post it on the eventBus, so that they will be heard + * at the subscribers. Notice that an eventNode is reused to reduce object creation costs. + * + * @param tablet the tablet to send + * @throws UaException if failed to create {@link Event} + */ + private void transferTabletForPubSubModel(final Tablet tablet) throws UaException { + final BaseEventTypeNode eventNode = + getServer() + .getEventFactory() + .createEvent( + new NodeId(getNamespaceIndex(), UUID.randomUUID()), Identifiers.BaseEventType); + // Use eventNode here because other nodes doesn't support values and times simultaneously + for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) { + + final TSDataType dataType = tablet.getSchemas().get(columnIndex).getType(); + + // Source name --> Sensor path, like root.test.d_0.s_0 + eventNode.setSourceName( + tablet.getDeviceId() + + TsFileConstant.PATH_SEPARATOR + + tablet.getSchemas().get(columnIndex).getMeasurementId()); + + // Source node --> Sensor type, like double + eventNode.setSourceNode(convertToOpcDataType(dataType)); + + for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) { + // Filter null value + if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) { + continue; + } + + // Time --> TimeStamp + eventNode.setTime(new DateTime(timestampToUtc(tablet.timestamps[rowIndex]))); + + // Message --> Value + switch (dataType) { + case BOOLEAN: + eventNode.setMessage( + LocalizedText.english( + Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex]))); + break; + case INT32: + eventNode.setMessage( + LocalizedText.english( + Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex]))); + break; + case DATE: + eventNode.setMessage( + LocalizedText.english( + (((LocalDate[]) tablet.values[columnIndex])[rowIndex]) + .atStartOfDay(ZoneId.systemDefault()) + .toString())); + break; + case INT64: + eventNode.setMessage( + LocalizedText.english( + Long.toString(((long[]) tablet.values[columnIndex])[rowIndex]))); + break; + case TIMESTAMP: + eventNode.setMessage( + LocalizedText.english( + DateTimeUtils.convertLongToDate( + ((long[]) tablet.values[columnIndex])[rowIndex]))); + break; + case FLOAT: + eventNode.setMessage( + LocalizedText.english( + Float.toString(((float[]) tablet.values[columnIndex])[rowIndex]))); + break; + case DOUBLE: + eventNode.setMessage( + LocalizedText.english( + Double.toString(((double[]) tablet.values[columnIndex])[rowIndex]))); + break; + case TEXT: + case BLOB: + case STRING: + eventNode.setMessage( + LocalizedText.english( + ((Binary[]) tablet.values[columnIndex])[rowIndex].toString())); + break; + case VECTOR: + case UNKNOWN: + default: + throw new PipeRuntimeNonCriticalException( + "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType()); + } + + // Send the event + getServer().getEventBus().post(eventNode); + } + } + eventNode.delete(); + } + + private NodeId convertToOpcDataType(final TSDataType type) { + switch (type) { + case BOOLEAN: + return Identifiers.Boolean; + case INT32: + return Identifiers.Int32; + case DATE: + case TIMESTAMP: + return Identifiers.DateTime; + case INT64: + return Identifiers.Int64; + case FLOAT: + return Identifiers.Float; + case DOUBLE: + return Identifiers.Double; + case TEXT: + case BLOB: + case STRING: + return Identifiers.String; + case VECTOR: + case UNKNOWN: + default: + throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type); + } + } + + @Override + public void onDataItemsCreated(final List<DataItem> dataItems) { + subscriptionModel.onDataItemsCreated(dataItems); + } + + @Override + public void onDataItemsModified(final List<DataItem> dataItems) { + subscriptionModel.onDataItemsModified(dataItems); + } + + @Override + public void onDataItemsDeleted(final List<DataItem> dataItems) { + subscriptionModel.onDataItemsDeleted(dataItems); + } + + @Override + public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) { + subscriptionModel.onMonitoringModeChanged(monitoredItems); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java index 85982318404..ee7d0481ed8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java @@ -79,6 +79,7 @@ public class OpcUaServerBuilder { private String user; private String password; private Path securityDir; + private boolean enableAnonymousAccess; public OpcUaServerBuilder() { tcpBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; @@ -86,33 +87,40 @@ public class OpcUaServerBuilder { user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; password = PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; securityDir = Paths.get(PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); + enableAnonymousAccess = + PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; } - public OpcUaServerBuilder setTcpBindPort(int tcpBindPort) { + public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; return this; } - public OpcUaServerBuilder setHttpsBindPort(int httpsBindPort) { + public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { this.httpsBindPort = httpsBindPort; return this; } - public OpcUaServerBuilder setUser(String user) { + public OpcUaServerBuilder setUser(final String user) { this.user = user; return this; } - public OpcUaServerBuilder setPassword(String password) { + public OpcUaServerBuilder setPassword(final String password) { this.password = password; return this; } - public OpcUaServerBuilder setSecurityDir(String securityDir) { + public OpcUaServerBuilder setSecurityDir(final String securityDir) { this.securityDir = Paths.get(securityDir); return this; } + public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { + this.enableAnonymousAccess = enableAnonymousAccess; + return this; + } + public OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { @@ -133,7 +141,7 @@ public class OpcUaServerBuilder { new DefaultCertificateManager(loader.getServerKeyPair(), loader.getServerCertificate()); final OpcUaServerConfig serverConfig; - try (DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir)) { + try (final DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir)) { LOGGER.info( "Certificate directory is: {}, Please move certificates from the reject dir to the trusted directory to allow encrypted access", pkiDir.getAbsolutePath()); @@ -151,7 +159,7 @@ public class OpcUaServerBuilder { final UsernameIdentityValidator identityValidator = new UsernameIdentityValidator( - true, + enableAnonymousAccess, authChallenge -> { String inputUsername = authChallenge.getUsername(); String inputPassword = authChallenge.getPassword(); @@ -215,7 +223,7 @@ public class OpcUaServerBuilder { } private Set<EndpointConfiguration> createEndpointConfigurations( - X509Certificate certificate, int tcpBindPort, int httpsBindPort) { + final X509Certificate certificate, final int tcpBindPort, final int httpsBindPort) { final Set<EndpointConfiguration> endpointConfigurations = new LinkedHashSet<>(); final List<String> bindAddresses = newArrayList(); @@ -225,8 +233,8 @@ public class OpcUaServerBuilder { hostnames.add(HostnameUtil.getHostname()); hostnames.addAll(HostnameUtil.getHostnames(WILD_CARD_ADDRESS)); - for (String bindAddress : bindAddresses) { - for (String hostname : hostnames) { + for (final String bindAddress : bindAddresses) { + for (final String hostname : hostnames) { final EndpointConfiguration.Builder builder = EndpointConfiguration.newBuilder() .setBindAddress(bindAddress) @@ -279,7 +287,7 @@ public class OpcUaServerBuilder { } private EndpointConfiguration buildTcpEndpoint( - EndpointConfiguration.Builder base, int tcpBindPort) { + final EndpointConfiguration.Builder base, final int tcpBindPort) { return base.copy() .setTransportProfile(TransportProfile.TCP_UASC_UABINARY) .setBindPort(tcpBindPort) @@ -287,7 +295,7 @@ public class OpcUaServerBuilder { } private EndpointConfiguration buildHttpsEndpoint( - EndpointConfiguration.Builder base, int httpsBindPort) { + final EndpointConfiguration.Builder base, final int httpsBindPort) { return base.copy() .setTransportProfile(TransportProfile.HTTPS_UABINARY) .setBindPort(httpsBindPort) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 61e773da3e4..62af58eece4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -130,6 +130,13 @@ public class PipeConnectorConstant { public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port"; public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080; + public static final String CONNECTOR_OPC_UA_MODEL_KEY = "connector.opcua.model"; + public static final String SINK_OPC_UA_MODEL_KEY = "sink.opcua.model"; + public static final String CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE = "client-server"; + public static final String CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE = "pub-sub"; + public static final String CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE = + CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE; + public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = "connector.opcua.tcp.port"; public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY = "sink.opcua.tcp.port"; public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686; @@ -145,6 +152,12 @@ public class PipeConnectorConstant { ? CommonDescriptor.getInstance().getConfDir() + File.separatorChar + "opc_security" : System.getProperty("user.home") + File.separatorChar + "iotdb_opc_security"; + public static final String CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY = + "connector.opcua.enable-anonymous-access"; + public static final String SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY = + "sink.opcua.enable-anonymous-access"; + public static final boolean CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE = true; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true; diff --git a/pom.xml b/pom.xml index aee4489a0f6..4e2194eaf3f 100644 --- a/pom.xml +++ b/pom.xml @@ -370,6 +370,17 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.eclipse.milo</groupId> + <artifactId>sdk-core</artifactId> + <version>${milo.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.activation</groupId> + <artifactId>jakarta.activation</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.eclipse.milo</groupId> <artifactId>stack-server</artifactId>
