This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 72bf2a8f6c57e8b8ae6325a509d66207eed20721 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Sep 27 10:18:45 2023 +0800 Revert "[IOTDB-6142] Pipe: Implemented IoTDBOpcUaConnector to enable transfer using Opc Ua protocol (#11081)" This reverts commit 4c7095c2b35ec2ad191253cbcf80ed7e7872a4d4. --- iotdb-core/datanode/pom.xml | 5 - .../config/constant/PipeConnectorConstant.java | 6 - .../connector/protocol/opcua/OpcUaConnector.java | 246 ------------------ .../protocol/opcua/OpcUaKeyStoreLoader.java | 120 --------- .../protocol/opcua/OpcUaServerBuilder.java | 287 --------------------- .../websocket/WebSocketConnectorServer.java | 1 - ...ocketConnector.java => WebsocketConnector.java} | 5 +- .../connector/PipeConnectorSubtaskManager.java | 3 - .../plan/planner/plan/node/write/InsertNode.java | 1 - .../pipe/plugin/builtin/BuiltinPipePlugin.java | 2 - .../plugin/builtin/connector/OpcUaConnector.java | 28 -- pom.xml | 2 - 12 files changed, 2 insertions(+), 704 deletions(-) diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index fcc6bce7e5b..ee237484028 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -113,11 +113,6 @@ <artifactId>openapi</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.eclipse.milo</groupId> - <artifactId>sdk-server</artifactId> - <version>${milo.version}</version> - </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 5f09012c00a..a55bbbe4716 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -54,12 +54,6 @@ public class PipeConnectorConstant { public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port"; public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080; - public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = "connector.opcua.tcp.port"; - public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686; - - public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.opcua.https.port"; - public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443; - private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java deleted file mode 100644 index 703e4652d99..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.connector.protocol.opcua; - -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.record.Tablet; - -import org.eclipse.milo.opcua.sdk.server.OpcUaServer; -import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode; -import org.eclipse.milo.opcua.stack.core.Identifiers; -import org.eclipse.milo.opcua.stack.core.UaException; -import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; -import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; -import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; - -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; -import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; - -/** - * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into - * tablets, then eventNodes to send to the subscriber clients. Notice that there is no namespace - * since the eventNodes do not need to be saved. - */ -public class OpcUaConnector implements PipeConnector { - - private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class); - - private OpcUaServer server; - - @Override - public void validate(PipeParameterValidator validator) throws Exception { - // All the parameters are optional - } - - @Override - public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) - throws Exception { - int tcpBindPort = - parameters.getIntOrDefault( - CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE); - int httpsBindPort = - parameters.getIntOrDefault( - CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY, CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE); - - String user = - parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE); - String password = - parameters.getStringOrDefault( - CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); - - server = - new OpcUaServerBuilder() - .setTcpBindPort(tcpBindPort) - .setHttpsBindPort(httpsBindPort) - .setUser(user) - .setPassword(password) - .build(); - server.startup(); - } - - @Override - public void handshake() throws Exception { - // Server side, do nothing - } - - @Override - public void heartbeat() throws Exception { - // Server side, do nothing - } - - @Override - public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { - // PipeProcessor can change the type of TabletInsertionEvent - if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) - && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { - LOGGER.warn( - "IoTDBThriftSyncConnector only support " - + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. " - + "Ignore {}.", - tabletInsertionEvent); - return; - } - - if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { - transferTablet( - server, ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablet()); - } else { - transferTablet( - server, ((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet()); - } - } - - /** - * Transfer tablet into eventNodes and post it on the eventBus, so that they will be heard at the - * subscribers. Notice that an eventNode is reused to reduce object creation costs. - * - * @param server OpcUaServer - * @param tablet the tablet to send - * @throws UaException if failed to create event - */ - private void transferTablet(OpcUaServer server, Tablet tablet) throws UaException { - // There is no nameSpace, so that nameSpaceIndex is always 0 - int pseudoNameSpaceIndex = 0; - BaseEventTypeNode eventNode = - server - .getEventFactory() - .createEvent( - new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), Identifiers.BaseEventType); - // Use eventNode here because other nodes doesn't support values and times simultaneously - for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) { - - TSDataType dataType = tablet.getSchemas().get(columnIndex).getType(); - - // Source name --> Sensor path, like root.test.d_0.s_0 - eventNode.setSourceName( - tablet.deviceId - + TsFileConstant.PATH_SEPARATOR - + tablet.getSchemas().get(columnIndex).getMeasurementId()); - - // Source node --> Sensor type, like double - eventNode.setSourceNode(convertToOpcDataType(dataType)); - - for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) { - // Filter null value - if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) { - continue; - } - - // time --> timeStamp - eventNode.setTime(new DateTime(tablet.timestamps[rowIndex])); - - // Message --> Value - switch (dataType) { - case BOOLEAN: - eventNode.setMessage( - LocalizedText.english( - Boolean.toString(((boolean[]) tablet.values[columnIndex])[rowIndex]))); - break; - case INT32: - eventNode.setMessage( - LocalizedText.english( - Integer.toString(((int[]) tablet.values[columnIndex])[rowIndex]))); - break; - case INT64: - eventNode.setMessage( - LocalizedText.english( - Long.toString(((long[]) tablet.values[columnIndex])[rowIndex]))); - break; - case FLOAT: - eventNode.setMessage( - LocalizedText.english( - Float.toString(((float[]) tablet.values[columnIndex])[rowIndex]))); - break; - case DOUBLE: - eventNode.setMessage( - LocalizedText.english( - Double.toString(((double[]) tablet.values[columnIndex])[rowIndex]))); - break; - case TEXT: - eventNode.setMessage( - LocalizedText.english( - ((Binary[]) tablet.values[columnIndex])[rowIndex].toString())); - break; - case VECTOR: - case UNKNOWN: - default: - throw new PipeRuntimeNonCriticalException( - "Unsupported data type: " + tablet.getSchemas().get(columnIndex).getType()); - } - - // Send the event - server.getEventBus().post(eventNode); - } - } - eventNode.delete(); - } - - private NodeId convertToOpcDataType(TSDataType type) { - switch (type) { - case BOOLEAN: - return Identifiers.Boolean; - case INT32: - return Identifiers.Int32; - case INT64: - return Identifiers.Int64; - case FLOAT: - return Identifiers.Float; - case DOUBLE: - return Identifiers.Double; - case TEXT: - return Identifiers.String; - case VECTOR: - case UNKNOWN: - default: - throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type); - } - } - - @Override - public void transfer(Event event) throws Exception { - // Do nothing when receive heartbeat or other events - } - - @Override - public void close() throws Exception { - server.shutdown(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java deleted file mode 100644 index 3f87e46acb8..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaKeyStoreLoader.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.connector.protocol.opcua; - -import com.google.common.collect.Sets; -import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil; -import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder; -import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.nio.file.Path; -import java.security.Key; -import java.security.KeyPair; -import java.security.KeyStore; -import java.security.PrivateKey; -import java.security.PublicKey; -import java.security.cert.X509Certificate; -import java.util.Set; -import java.util.UUID; -import java.util.regex.Pattern; - -class OpcUaKeyStoreLoader { - - private static final Pattern IP_ADDR_PATTERN = - Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$"); - - private static final String SERVER_ALIAS = "server-ai"; - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private X509Certificate serverCertificate; - private KeyPair serverKeyPair; - - OpcUaKeyStoreLoader load(Path baseDir, char[] password) throws Exception { - KeyStore keyStore = KeyStore.getInstance("PKCS12"); - - File serverKeyStore = baseDir.resolve("iotdb-server.pfx").toFile(); - - logger.info("Loading KeyStore at {}", serverKeyStore); - - if (!serverKeyStore.exists()) { - keyStore.load(null, password); - - KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048); - - String applicationUri = "urn:apache:iotdb:opc-ua-server:" + UUID.randomUUID(); - - SelfSignedCertificateBuilder builder = - new SelfSignedCertificateBuilder(keyPair) - .setCommonName("Apache IoTDB OPC UA server") - .setOrganization("Apache") - .setOrganizationalUnit("dev") - .setLocalityName("Beijing") - .setStateName("China") - .setCountryCode("CN") - .setApplicationUri(applicationUri); - - // Get as many hostnames and IP addresses as we can listed in the certificate. - Set<String> hostnames = - Sets.union( - Sets.newHashSet(HostnameUtil.getHostname()), - HostnameUtil.getHostnames("0.0.0.0", false)); - - for (String hostname : hostnames) { - if (IP_ADDR_PATTERN.matcher(hostname).matches()) { - builder.addIpAddress(hostname); - } else { - builder.addDnsName(hostname); - } - } - - X509Certificate certificate = builder.build(); - - keyStore.setKeyEntry( - SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate}); - keyStore.store(new FileOutputStream(serverKeyStore), password); - } else { - keyStore.load(new FileInputStream(serverKeyStore), password); - } - - Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password); - if (serverPrivateKey instanceof PrivateKey) { - serverCertificate = (X509Certificate) keyStore.getCertificate(SERVER_ALIAS); - - PublicKey serverPublicKey = serverCertificate.getPublicKey(); - serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey); - } - - return this; - } - - X509Certificate getServerCertificate() { - return serverCertificate; - } - - KeyPair getServerKeyPair() { - return serverKeyPair; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java deleted file mode 100644 index 60e577dba51..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.connector.protocol.opcua; - -import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; -import org.apache.iotdb.pipe.api.exception.PipeException; - -import org.eclipse.milo.opcua.sdk.server.OpcUaServer; -import org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig; -import org.eclipse.milo.opcua.sdk.server.identity.CompositeValidator; -import org.eclipse.milo.opcua.sdk.server.identity.UsernameIdentityValidator; -import org.eclipse.milo.opcua.sdk.server.identity.X509IdentityValidator; -import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.ServerTypeNode; -import org.eclipse.milo.opcua.sdk.server.nodes.UaNode; -import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil; -import org.eclipse.milo.opcua.stack.core.Identifiers; -import org.eclipse.milo.opcua.stack.core.StatusCodes; -import org.eclipse.milo.opcua.stack.core.UaRuntimeException; -import org.eclipse.milo.opcua.stack.core.security.DefaultCertificateManager; -import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager; -import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; -import org.eclipse.milo.opcua.stack.core.transport.TransportProfile; -import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; -import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; -import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode; -import org.eclipse.milo.opcua.stack.core.types.structured.BuildInfo; -import org.eclipse.milo.opcua.stack.core.util.CertificateUtil; -import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator; -import org.eclipse.milo.opcua.stack.core.util.SelfSignedHttpsCertificateBuilder; -import org.eclipse.milo.opcua.stack.server.EndpointConfiguration; -import org.eclipse.milo.opcua.stack.server.security.DefaultServerCertificateValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.security.KeyPair; -import java.security.cert.X509Certificate; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -import static com.google.common.collect.Lists.newArrayList; -import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_ANONYMOUS; -import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_USERNAME; -import static org.eclipse.milo.opcua.sdk.server.api.config.OpcUaServerConfig.USER_TOKEN_POLICY_X509; -import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ubyte; - -/** - * OPC UA Server builder for IoTDB to send data. The coding style referenced ExampleServer.java in - * Eclipse Milo. - */ -public class OpcUaServerBuilder { - - private static final String WILD_CARD_ADDRESS = "0.0.0.0"; - private final Logger logger = LoggerFactory.getLogger(OpcUaServerBuilder.class); - - private int tcpBindPort; - private int httpsBindPort; - private String user; - private String password; - - public OpcUaServerBuilder() { - tcpBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; - httpsBindPort = PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; - user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; - password = PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; - } - - public OpcUaServerBuilder setTcpBindPort(int tcpBindPort) { - this.tcpBindPort = tcpBindPort; - return this; - } - - public OpcUaServerBuilder setHttpsBindPort(int httpsBindPort) { - this.httpsBindPort = httpsBindPort; - return this; - } - - public OpcUaServerBuilder setUser(String user) { - this.user = user; - return this; - } - - public OpcUaServerBuilder setPassword(String password) { - this.password = password; - return this; - } - - public OpcUaServer build() throws Exception { - Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "iotdb", "security"); - Files.createDirectories(securityTempDir); - if (!Files.exists(securityTempDir)) { - throw new PipeException("Unable to create security temp dir: " + securityTempDir); - } - - File pkiDir = securityTempDir.resolve("pki").toFile(); - - LoggerFactory.getLogger(OpcUaServerBuilder.class) - .info("Security dir: {}", securityTempDir.toAbsolutePath()); - LoggerFactory.getLogger(OpcUaServerBuilder.class) - .info("Security pki dir: {}", pkiDir.getAbsolutePath()); - - OpcUaKeyStoreLoader loader = - new OpcUaKeyStoreLoader().load(securityTempDir, password.toCharArray()); - - DefaultCertificateManager certificateManager = - new DefaultCertificateManager(loader.getServerKeyPair(), loader.getServerCertificate()); - - DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir); - logger.info( - "Certificate directory is: {}, Please move certificates from the reject dir to the trusted directory to allow encrypted access", - pkiDir.getAbsolutePath()); - - KeyPair httpsKeyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048); - - SelfSignedHttpsCertificateBuilder httpsCertificateBuilder = - new SelfSignedHttpsCertificateBuilder(httpsKeyPair); - httpsCertificateBuilder.setCommonName(HostnameUtil.getHostname()); - HostnameUtil.getHostnames(WILD_CARD_ADDRESS).forEach(httpsCertificateBuilder::addDnsName); - X509Certificate httpsCertificate = httpsCertificateBuilder.build(); - - DefaultServerCertificateValidator certificateValidator = - new DefaultServerCertificateValidator(trustListManager); - - UsernameIdentityValidator identityValidator = - new UsernameIdentityValidator( - true, - authChallenge -> { - String inputUsername = authChallenge.getUsername(); - String inputPassword = authChallenge.getPassword(); - - return inputUsername.equals(user) && inputPassword.equals(password); - }); - - X509IdentityValidator x509IdentityValidator = new X509IdentityValidator(c -> true); - - X509Certificate certificate = - certificateManager.getCertificates().stream() - .findFirst() - .orElseThrow( - () -> - new UaRuntimeException( - StatusCodes.Bad_ConfigurationError, "No certificate found")); - - String applicationUri = - CertificateUtil.getSanUri(certificate) - .orElseThrow( - () -> - new UaRuntimeException( - StatusCodes.Bad_ConfigurationError, - "Certificate is missing the application URI")); - - Set<EndpointConfiguration> endpointConfigurations = - createEndpointConfigurations(certificate, tcpBindPort, httpsBindPort); - - OpcUaServerConfig serverConfig = - OpcUaServerConfig.builder() - .setApplicationUri(applicationUri) - .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA server")) - .setEndpoints(endpointConfigurations) - .setBuildInfo( - new BuildInfo( - "urn:apache:iotdb:opc-ua-server", - "apache", - "Apache IoTDB OPC UA server", - OpcUaServer.SDK_VERSION, - "", - DateTime.now())) - .setCertificateManager(certificateManager) - .setTrustListManager(trustListManager) - .setCertificateValidator(certificateValidator) - .setHttpsKeyPair(httpsKeyPair) - .setHttpsCertificateChain(new X509Certificate[] {httpsCertificate}) - .setIdentityValidator(new CompositeValidator(identityValidator, x509IdentityValidator)) - .setProductUri("urn:apache:iotdb:opc-ua-server") - .build(); - - // Setup server to enable event posting - OpcUaServer server = new OpcUaServer(serverConfig); - UaNode serverNode = - server.getAddressSpaceManager().getManagedNode(Identifiers.Server).orElse(null); - if (serverNode instanceof ServerTypeNode) { - ((ServerTypeNode) serverNode).setEventNotifier(ubyte(1)); - } - return server; - } - - private Set<EndpointConfiguration> createEndpointConfigurations( - X509Certificate certificate, int tcpBindPort, int httpsBindPort) { - Set<EndpointConfiguration> endpointConfigurations = new LinkedHashSet<>(); - - List<String> bindAddresses = newArrayList(); - bindAddresses.add(WILD_CARD_ADDRESS); - - Set<String> hostnames = new LinkedHashSet<>(); - hostnames.add(HostnameUtil.getHostname()); - hostnames.addAll(HostnameUtil.getHostnames(WILD_CARD_ADDRESS)); - - for (String bindAddress : bindAddresses) { - for (String hostname : hostnames) { - EndpointConfiguration.Builder builder = - EndpointConfiguration.newBuilder() - .setBindAddress(bindAddress) - .setHostname(hostname) - .setPath("/iotdb") - .setCertificate(certificate) - .addTokenPolicies( - USER_TOKEN_POLICY_ANONYMOUS, - USER_TOKEN_POLICY_USERNAME, - USER_TOKEN_POLICY_X509); - - EndpointConfiguration.Builder noSecurityBuilder = - builder - .copy() - .setSecurityPolicy(SecurityPolicy.None) - .setSecurityMode(MessageSecurityMode.None); - - endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort)); - endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort)); - - endpointConfigurations.add( - buildTcpEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.SignAndEncrypt), - tcpBindPort)); - - endpointConfigurations.add( - buildHttpsEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.Sign), - httpsBindPort)); - - EndpointConfiguration.Builder discoveryBuilder = - builder - .copy() - .setPath("/iotdb/discovery") - .setSecurityPolicy(SecurityPolicy.None) - .setSecurityMode(MessageSecurityMode.None); - - endpointConfigurations.add(buildTcpEndpoint(discoveryBuilder, tcpBindPort)); - endpointConfigurations.add(buildHttpsEndpoint(discoveryBuilder, httpsBindPort)); - } - } - - return endpointConfigurations; - } - - private EndpointConfiguration buildTcpEndpoint( - EndpointConfiguration.Builder base, int tcpBindPort) { - return base.copy() - .setTransportProfile(TransportProfile.TCP_UASC_UABINARY) - .setBindPort(tcpBindPort) - .build(); - } - - private EndpointConfiguration buildHttpsEndpoint( - EndpointConfiguration.Builder base, int httpsBindPort) { - return base.copy() - .setTransportProfile(TransportProfile.HTTPS_UABINARY) - .setBindPort(httpsBindPort) - .build(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java index cb4aaef0f97..3ccaadb7ac9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.pipe.connector.protocol.websocket; import org.apache.iotdb.db.pipe.event.EnrichedEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java index 6df1bee24e2..a0b23a98de5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebsocketConnector.java @@ -86,7 +86,7 @@ public class WebSocketConnector implements PipeConnector { if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { LOGGER.warn( - "WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. " + "WebSocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. " + "Current event: {}.", tabletInsertionEvent); return; @@ -101,7 +101,7 @@ public class WebSocketConnector implements PipeConnector { public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { LOGGER.warn( - "WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.", + "WebSocketConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent); return; } @@ -134,7 +134,6 @@ public class WebSocketConnector implements PipeConnector { event -> event.decreaseReferenceCount( WebSocketConnector.class.getName(), true)))); - while (!commitQueue.isEmpty()) { final Pair<Long, Runnable> committer = commitQueue.peek(); if (lastCommitId.get() + 1 != committer.left) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index 9fde95da3fc..60b26fee571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector; -import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector; @@ -76,8 +75,6 @@ public class PipeConnectorSubtaskManager { } else if (connectorKey.equals( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) { pipeConnector = new IoTDBLegacyPipeConnector(); - } else if (connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) { - pipeConnector = new OpcUaConnector(); } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) { pipeConnector = new WebSocketConnector(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index 61edb88c0a5..abca512e1e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 4c8a0c87342..f7e33633071 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeCon import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector; -import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; @@ -44,7 +43,6 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class), ; private final String pipePluginName; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java deleted file mode 100644 index 287a2dcb116..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/OpcUaConnector.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.pipe.plugin.builtin.connector; - -/** - * This class is a placeholder and should not be initialized. It represents the OPC UA connector. - * There is a real implementation in the server module but cannot be imported here. The pipe agent - * in the server module will replace this class with the real implementation when initializing the - * OPC UA connector. - */ -public class OpcUaConnector extends PlaceholderConnector {} diff --git a/pom.xml b/pom.xml index ded8698fcf8..e8e3ef6501a 100644 --- a/pom.xml +++ b/pom.xml @@ -230,8 +230,6 @@ <codegen.phase>generate-sources</codegen.phase> <!-- WebSocket --> <websocket.version>1.5.3</websocket.version> - <!-- milo --> - <milo.version>0.6.10</milo.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim
