This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch 3362-saslssl-support-for-kafka-adapter in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 7bb56d583ac01d80d02fe3f0ec5583577d0be8b4 Author: Dominik Riemer <[email protected]> AuthorDate: Sun Dec 1 21:58:58 2024 +0100 feat(#3362): Add SSL/SASL support to Kafka adapter and sink --- pom.xml | 2 +- .../apache/streampipes/commons/constants/Envs.java | 16 +- .../constants/GlobalStreamPipesConstants.java | 1 + .../commons/environment/DefaultEnvironment.java | 42 +++- .../commons/environment/Environment.java | 39 +--- .../connectors/kafka/adapter/KafkaProtocol.java | 84 ++++---- .../kafka/shared/kafka/KafkaAdapterConfig.java | 13 +- .../{KafkaConfig.java => KafkaBaseConfig.java} | 51 ++--- .../kafka/shared/kafka/KafkaConfigExtractor.java | 121 ++++++++++++ .../kafka/shared/kafka/KafkaConfigProvider.java | 153 +++++++++++++++ .../kafka/shared/kafka/KafkaConnectUtils.java | 211 --------------------- .../connectors/kafka/sink/KafkaParameters.java | 90 --------- .../connectors/kafka/sink/KafkaPublishSink.java | 49 ++--- .../strings.en | 19 +- .../strings.en | 3 + .../opcua/config/security/KeyStoreLoader.java | 2 +- .../integration/adapters/KafkaAdapterTester.java | 8 +- .../kafka/config/KafkaConfigAppender.java | 4 +- .../kafka/security/KafkaSecurityConfig.java | 25 --- .../KafkaSecurityProtocolConfigAppender.java | 70 +++++++ ...g.java => KafkaSecuritySaslConfigAppender.java} | 40 +++- .../security/KafkaSecuritySaslPlainConfig.java | 51 ----- .../KafkaSecurityUnauthenticatedSSLConfig.java | 32 ---- .../impl/connect/RuntimeResolvableResource.java | 2 + .../base-runtime-resolvable-input.ts | 1 + ...tic-runtime-resolvable-oneof-input.component.ts | 37 +++- 26 files changed, 575 insertions(+), 591 deletions(-) diff --git a/pom.xml b/pom.xml index 8f0d8ffded..455675231a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ <jsrosbridge.version>0.2.0</jsrosbridge.version> <jjwt.version>0.11.2</jjwt.version> <jts-core.version>1.19.0</jts-core.version> - <kafka.version>3.4.0</kafka.version> + <kafka.version>3.7.1</kafka.version> <lightcouch.version>0.2.0</lightcouch.version> <maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version> <mailapi.version>1.4.3</mailapi.version> diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 7a2d033e5a..2339475adb 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -119,7 +119,21 @@ public enum Envs { SP_OPCUA_APPLICATION_URI( "SP_OPCUA_APPLICATION_URI", "urn:org:apache:streampipes:opcua:client" - ); + ), + + // Default keystore and truststore + SP_SECURITY_KEYSTORE_FILENAME( + "SP_SECURITY_KEYSTORE_FILENAME", + "/streampipes-security/keystore.pfx"), + SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""), + SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"), + SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null), + SP_SECURITY_TRUSTSTORE_FILENAME( + "SP_SECURITY_TRUSTSTORE_FILENAME", + "/streampipes-security/truststore.pfx"), + SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""), + SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"), + SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java index b452d74c1a..409a9a9090 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java @@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants { public static final String STD_DOCUMENTATION_NAME = "documentation.md"; public static final String INTERNAL_TOPIC_PREFIX = "org-apache-streampipes-internal-"; + public static final String CONNECT_TOPIC_PREFIX = "org.apache.streampipes.connect."; } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index bb5bbac193..3434924e3e 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -349,7 +349,7 @@ public class DefaultEnvironment implements Environment { } @Override - public StringEnvironmentVariable getOPcUaKeystoreType() { + public StringEnvironmentVariable getOpcUaKeystoreType() { return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE); } @@ -357,4 +357,44 @@ public class DefaultEnvironment implements Environment { public StringEnvironmentVariable getOpcUaKeystoreAlias() { return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS); } + + @Override + public StringEnvironmentVariable getKeystoreFilename() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME); + } + + @Override + public StringEnvironmentVariable getKeystorePassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD); + } + + @Override + public StringEnvironmentVariable getKeystoreType() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE); + } + + @Override + public StringEnvironmentVariable getKeyPassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD); + } + + @Override + public StringEnvironmentVariable getTruststoreFilename() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME); + } + + @Override + public StringEnvironmentVariable getTruststorePassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD); + } + + @Override + public StringEnvironmentVariable getTruststoreType() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE); + } + + @Override + public BooleanEnvironmentVariable getAllowSelfSignedCertificates() { + return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED); + } } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index cb441f009b..d1c4adf6ae 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -32,79 +32,56 @@ public interface Environment { // Service base configuration StringEnvironmentVariable getServiceHost(); - IntEnvironmentVariable getServicePort(); StringEnvironmentVariable getSpCoreScheme(); StringEnvironmentVariable getSpCoreHost(); - IntEnvironmentVariable getSpCorePort(); // Time series storage env variables StringEnvironmentVariable getTsStorage(); - StringEnvironmentVariable getTsStorageProtocol(); - StringEnvironmentVariable getTsStorageHost(); - IntEnvironmentVariable getTsStoragePort(); - StringEnvironmentVariable getTsStorageToken(); - StringEnvironmentVariable getTsStorageOrg(); - StringEnvironmentVariable getTsStorageBucket(); IntEnvironmentVariable getIotDbSessionPoolSize(); - BooleanEnvironmentVariable getIotDbSessionEnableCompression(); - StringEnvironmentVariable getIotDbUser(); - StringEnvironmentVariable getIotDbPassword(); // CouchDB env variables StringEnvironmentVariable getCouchDbProtocol(); - StringEnvironmentVariable getCouchDbHost(); - IntEnvironmentVariable getCouchDbPort(); - StringEnvironmentVariable getCouchDbUsername(); - StringEnvironmentVariable getCouchDbPassword(); // JWT & Authentication StringEnvironmentVariable getClientUser(); - StringEnvironmentVariable getClientSecret(); StringEnvironmentVariable getJwtSecret(); - StringEnvironmentVariable getJwtPublicKeyLoc(); - StringEnvironmentVariable getJwtPrivateKeyLoc(); - StringEnvironmentVariable getJwtSigningMode(); StringEnvironmentVariable getExtensionsAuthMode(); - StringEnvironmentVariable getEncryptionPasscode(); BooleanEnvironmentVariable getOAuthEnabled(); - StringEnvironmentVariable getOAuthRedirectUri(); - List<OAuthConfiguration> getOAuthConfigurations(); // Messaging StringEnvironmentVariable getKafkaRetentionTimeMs(); - StringEnvironmentVariable getPrioritizedProtocol(); @@ -164,14 +141,18 @@ public interface Environment { StringEnvironmentVariable getAllowedUploadFiletypes(); StringEnvironmentVariable getOpcUaSecurityDir(); - StringEnvironmentVariable getOpcUaKeystoreFile(); - StringEnvironmentVariable getOpcUaKeystorePassword(); - StringEnvironmentVariable getOpcUaApplicationUri(); - - StringEnvironmentVariable getOPcUaKeystoreType(); - + StringEnvironmentVariable getOpcUaKeystoreType(); StringEnvironmentVariable getOpcUaKeystoreAlias(); + + StringEnvironmentVariable getKeystoreFilename(); + StringEnvironmentVariable getKeystorePassword(); + StringEnvironmentVariable getKeystoreType(); + StringEnvironmentVariable getKeyPassword(); + StringEnvironmentVariable getTruststoreFilename(); + StringEnvironmentVariable getTruststorePassword(); + StringEnvironmentVariable getTruststoreType(); + BooleanEnvironmentVariable getAllowSelfSignedCertificates(); } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index fb387b59ee..90dd1ee2a1 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -31,12 +31,12 @@ import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor; import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers; import org.apache.streampipes.messaging.kafka.SpKafkaConsumer; -import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.guess.GuessSchema; import org.apache.streampipes.model.extensions.ExtensionAssetType; @@ -67,13 +67,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.stream.Collectors; public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig { private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class); - KafkaConfig config; + KafkaAdapterConfig config; public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka"; @@ -84,14 +83,13 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig } private void applyConfiguration(IStaticPropertyExtractor extractor) { - this.config = KafkaConnectUtils.getConfig(extractor, true); + this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true); } - private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException { + private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException { final Properties props = new Properties(); - kafkaConfig.getSecurityConfig().appendConfig(props); - kafkaConfig.getAutoOffsetResetConfig().appendConfig(props); + kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props)); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort()); @@ -113,34 +111,36 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig IStaticPropertyExtractor extractor) throws SpConfigurationException { RuntimeResolvableOneOfStaticProperty config = extractor - .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class); - KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false); - boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey()); + .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class); + var kafkaConfig = new KafkaConfigExtractor().extractAdapterConfig(extractor, false); + boolean hideInternalTopics = extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey()); try { - Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig); - Set<String> topics = consumer.listTopics().keySet(); + var consumer = createConsumer(kafkaConfig); + List<String> topics = new ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList(); consumer.close(); if (hideInternalTopics) { topics = topics .stream() - .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)) - .collect(Collectors.toSet()); + .filter(t -> (!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX) + && !t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX))) + .toList(); } config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList())); return config; } catch (KafkaException e) { - throw new SpConfigurationException(e.getMessage(), e); + var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + throw new SpConfigurationException(message, e); } } @Override public IAdapterConfiguration declareConfig() { - StaticPropertyAlternative latestAlternative = KafkaConnectUtils.getAlternativesLatest(); + StaticPropertyAlternative latestAlternative = KafkaConfigProvider.getAlternativesLatest(); latestAlternative.setSelected(true); return AdapterConfigurationBuilder @@ -150,28 +150,28 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig .withLocales(Locales.EN) .withCategory(AdapterType.Generic, AdapterType.Manufacturing) - .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(), - KafkaConnectUtils.getAlternativeUnauthenticatedPlain(), - KafkaConnectUtils.getAlternativeUnauthenticatedSSL(), - KafkaConnectUtils.getAlternativesSaslPlain(), - KafkaConnectUtils.getAlternativesSaslSSL()) + .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(), + KafkaConfigProvider.getAlternativeUnauthenticatedPlain(), + KafkaConfigProvider.getAlternativeUnauthenticatedSSL(), + KafkaConfigProvider.getAlternativesSaslPlain(), + KafkaConfigProvider.getAlternativesSaslSSL()) - .requiredTextParameter(KafkaConnectUtils.getHostLabel()) - .requiredIntegerParameter(KafkaConnectUtils.getPortLabel()) + .requiredTextParameter(KafkaConfigProvider.getHostLabel()) + .requiredIntegerParameter(KafkaConfigProvider.getPortLabel()) - .requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(), - KafkaConnectUtils.getAlternativesRandomGroupId(), - KafkaConnectUtils.getAlternativesGroupId()) + .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(), + KafkaConfigProvider.getAlternativesRandomGroupId(), + KafkaConfigProvider.getAlternativesGroupId()) - .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true) + .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), true) - .requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList( - KafkaConnectUtils.HOST_KEY, - KafkaConnectUtils.PORT_KEY)) - .requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(), - KafkaConnectUtils.getAlternativesEarliest(), - latestAlternative, - KafkaConnectUtils.getAlternativesNone()) + .requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), Arrays.asList( + KafkaConfigProvider.HOST_KEY, + KafkaConfigProvider.PORT_KEY)) + .requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(), + KafkaConfigProvider.getAlternativesEarliest(), + latestAlternative, + KafkaConfigProvider.getAlternativesNone()) .buildConfiguration(); } @@ -185,17 +185,11 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig protocol.setBrokerHostname(config.getKafkaHost()); protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic())); - List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2); - kafkaConfigAppenderList.add(this.config.getSecurityConfig()); - kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig()); - this.kafkaConsumer = new SpKafkaConsumer(protocol, config.getTopic(), - new BrokerEventProcessor(extractor.selectedParser(), (event) -> { - collector.collect(event); - }), - kafkaConfigAppenderList - ); + new BrokerEventProcessor(extractor.selectedParser(), collector), + config.getConfigAppenders() + ); thread = new Thread(this.kafkaConsumer); thread.start(); diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java similarity index 74% rename from streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java index 235a6eda39..e24dccf040 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java @@ -16,14 +16,17 @@ * */ -package org.apache.streampipes.messaging.kafka.security; +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; -import java.util.Properties; +public class KafkaAdapterConfig extends KafkaBaseConfig { -public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig { + private String groupId; - @Override - public void appendConfig(Properties props) { + public String getGroupId() { + return groupId; + } + public void setGroupId(String groupId) { + this.groupId = groupId; } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java similarity index 50% rename from streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java index 7f02e4c5b1..dd43893e89 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java @@ -18,31 +18,20 @@ package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; -import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; -public class KafkaConfig { +import java.util.ArrayList; +import java.util.List; + +public class KafkaBaseConfig { private String kafkaHost; private Integer kafkaPort; private String topic; - private String groupId; - - KafkaSecurityConfig securityConfig; - AutoOffsetResetConfig autoOffsetResetConfig; + private List<KafkaConfigAppender> configAppenders; - public KafkaConfig(String kafkaHost, - Integer kafkaPort, - String topic, - String groupId, - KafkaSecurityConfig securityConfig, - AutoOffsetResetConfig autoOffsetResetConfig) { - this.kafkaHost = kafkaHost; - this.kafkaPort = kafkaPort; - this.topic = topic; - this.groupId = groupId; - this.securityConfig = securityConfig; - this.autoOffsetResetConfig = autoOffsetResetConfig; + public KafkaBaseConfig() { + this.configAppenders = new ArrayList<>(); } public String getKafkaHost() { @@ -69,27 +58,11 @@ public class KafkaConfig { this.topic = topic; } - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public KafkaSecurityConfig getSecurityConfig() { - return securityConfig; - } - - public void setSecurityConfig(KafkaSecurityConfig securityConfig) { - this.securityConfig = securityConfig; - } - - public AutoOffsetResetConfig getAutoOffsetResetConfig() { - return autoOffsetResetConfig; + public List<KafkaConfigAppender> getConfigAppenders() { + return configAppenders; } - public void setAutoOffsetResetConfig(AutoOffsetResetConfig autoOffsetResetConfig) { - this.autoOffsetResetConfig = autoOffsetResetConfig; + public void setConfigAppenders(List<KafkaConfigAppender> configAppenders) { + this.configAppenders = configAppenders; } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java new file mode 100644 index 0000000000..d06e544e74 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; + +import org.apache.streampipes.commons.environment.Environments; +import org.apache.streampipes.extensions.api.extractor.IParameterExtractor; +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; +import org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender; +import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; + +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.util.ArrayList; + +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY; + +public class KafkaConfigExtractor { + + public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extractor, + boolean containsTopic) { + + var config = extractCommonConfigs(extractor, new KafkaAdapterConfig()); + + var topic = ""; + if (containsTopic) { + topic = extractor.selectedSingleValue(TOPIC_KEY, String.class); + } + config.setTopic(topic); + + if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)) { + config.setGroupId("StreamPipesKafkaConsumer" + System.currentTimeMillis()); + } else { + config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, String.class)); + } + + StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, + StaticPropertyAlternatives.class); + + // Set default value if no value is provided. + if (alternatives == null) { + config.getConfigAppenders().add(new AutoOffsetResetConfig(KafkaConfigProvider.LATEST)); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + config.getConfigAppenders().add(new AutoOffsetResetConfig(auto)); + } + return config; + } + + public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) { + var config = extractCommonConfigs(extractor, new KafkaBaseConfig()); + config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class)); + + return config; + } + + private <T extends KafkaBaseConfig> T extractCommonConfigs(IParameterExtractor extractor, + T config) { + var configAppenders = new ArrayList<KafkaConfigAppender>(); + var env = Environments.getEnvironment(); + config.setKafkaHost(extractor.singleValueParameter(HOST_KEY, String.class)); + config.setKafkaPort(extractor.singleValueParameter(PORT_KEY, Integer.class)); + + var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE); + var securityProtocol = getSecurityProtocol(authentication); + configAppenders.add(new KafkaSecurityProtocolConfigAppender(securityProtocol, env)); + + // check if SASL authentication is defined + if (isSaslSecurityMechanism(securityProtocol)) { + String username = extractor.singleValueParameter(USERNAME_KEY, String.class); + String password = extractor.secretValue(PASSWORD_KEY); + String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM, String.class); + + configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, username, password)); + } + config.setConfigAppenders(configAppenders); + + return config; + } + + private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) { + return SecurityProtocol.SASL_PLAINTEXT == securityProtocol || SecurityProtocol.SASL_SSL == securityProtocol; + } + + public SecurityProtocol getSecurityProtocol(String selectedSecurityConfiguration) { + return switch (selectedSecurityConfiguration) { + case "unauthenticated-ssl" -> SecurityProtocol.SSL; + case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT; + case "sasl-ssl" -> SecurityProtocol.SASL_SSL; + default -> SecurityProtocol.PLAINTEXT; + }; + } +} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java new file mode 100644 index 0000000000..05f652f780 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; + +import org.apache.streampipes.model.staticproperty.Option; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; +import org.apache.streampipes.model.staticproperty.StaticPropertyGroup; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.Label; +import org.apache.streampipes.sdk.helpers.Labels; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.List; + +public class KafkaConfigProvider { + + public static final String TOPIC_KEY = "topic"; + public static final String HOST_KEY = "host"; + public static final String PORT_KEY = "port"; + + public static final String ACCESS_MODE = "access-mode"; + public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain"; + public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl"; + public static final String SASL_PLAIN = "sasl-plain"; + public static final String SASL_SSL = "sasl-ssl"; + + public static final String SECURITY_MECHANISM = "security-mechanism"; + public static final String USERNAME_GROUP = "username-group"; + public static final String USERNAME_KEY = "username"; + public static final String PASSWORD_KEY = "password"; + + public static final String CONSUMER_GROUP = "consumer-group"; + public static final String RANDOM_GROUP_ID = "random-group-id"; + public static final String GROUP_ID = "group-id"; + public static final String GROUP_ID_INPUT = "group-id-input"; + + + private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics"; + + public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; + public static final String EARLIEST = "earliest"; + public static final String LATEST = "latest"; + public static final String NONE = "none"; + + public static Label getTopicLabel() { + return Labels.withId(TOPIC_KEY); + } + + public static Label getHideInternalTopicsLabel() { + return Labels.withId(HIDE_INTERNAL_TOPICS); + } + + public static String getHideInternalTopicsKey() { + return HIDE_INTERNAL_TOPICS; + } + + public static Label getHostLabel() { + return Labels.withId(HOST_KEY); + } + + public static Label getPortLabel() { + return Labels.withId(PORT_KEY); + } + + public static Label getAccessModeLabel() { + return Labels.withId(ACCESS_MODE); + } + + public static Label getConsumerGroupLabel() { + return Labels.withId(CONSUMER_GROUP); + } + + public static Label getAutoOffsetResetConfigLabel() { + return Labels.withId(AUTO_OFFSET_RESET_CONFIG); + } + + public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN)); + } + + public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL)); + } + + public static StaticPropertyAlternative getAlternativesSaslPlain() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN), + makeAuthenticationGroup() + ); + } + + public static StaticPropertyAlternative getAlternativesSaslSSL() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL), + makeAuthenticationGroup()); + } + + public static StaticPropertyAlternative getAlternativesRandomGroupId() { + return Alternatives.from(Labels.withId(RANDOM_GROUP_ID)); + } + + public static StaticPropertyAlternative getAlternativesGroupId() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID), + StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT))); + } + + public static StaticPropertyAlternative getAlternativesLatest() { + return Alternatives.from(Labels.withId(LATEST)); + } + + public static StaticPropertyAlternative getAlternativesEarliest() { + return Alternatives.from(Labels.withId(EARLIEST)); + } + + public static StaticPropertyAlternative getAlternativesNone() { + return Alternatives.from(Labels.withId(NONE)); + } + + private static StaticPropertyGroup makeAuthenticationGroup() { + var group = StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP), + StaticProperties.singleValueSelection( + Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM), + makeSecurityMechanism()), + StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)), + StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY))); + group.setHorizontalRendering(false); + return group; + } + + public static List<Option> makeSecurityMechanism() { + return List.of( + new Option("PLAIN"), + new Option("SCRAM-SHA-256"), + new Option("SCRAM-SHA-512") + ); + } +} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java deleted file mode 100644 index aa473dcbf7..0000000000 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; - -import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; -import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig; -import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; -import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; -import org.apache.streampipes.sdk.StaticProperties; -import org.apache.streampipes.sdk.helpers.Alternatives; -import org.apache.streampipes.sdk.helpers.Label; -import org.apache.streampipes.sdk.helpers.Labels; - -import org.apache.kafka.clients.consumer.ConsumerConfig; - -public class KafkaConnectUtils { - - public static final String TOPIC_KEY = "topic"; - public static final String HOST_KEY = "host"; - public static final String PORT_KEY = "port"; - - public static final String KEY_SERIALIZATION = "key-serialization"; - public static final String VALUE_SERIALIZATION = "value-serialization"; - - public static final String KEY_DESERIALIZATION = "key-deserialization"; - public static final String VALUE_DESERIALIZATION = "value-deserialization"; - - public static final String ACCESS_MODE = "access-mode"; - public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain"; - public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl"; - public static final String SASL_PLAIN = "sasl-plain"; - public static final String SASL_SSL = "sasl-ssl"; - - public static final String USERNAME_GROUP = "username-group"; - public static final String USERNAME_KEY = "username"; - public static final String PASSWORD_KEY = "password"; - - public static final String CONSUMER_GROUP = "consumer-group"; - public static final String RANDOM_GROUP_ID = "random-group-id"; - public static final String GROUP_ID = "group-id"; - public static final String GROUP_ID_INPUT = "group-id-input"; - - - private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics"; - - public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; - public static final String EARLIEST = "earliest"; - public static final String LATEST = "latest"; - public static final String NONE = "none"; - - public static Label getTopicLabel() { - return Labels.withId(TOPIC_KEY); - } - - public static Label getHideInternalTopicsLabel() { - return Labels.withId(HIDE_INTERNAL_TOPICS); - } - - public static String getHideInternalTopicsKey() { - return HIDE_INTERNAL_TOPICS; - } - - public static Label getHostLabel() { - return Labels.withId(HOST_KEY); - } - - public static Label getPortLabel() { - return Labels.withId(PORT_KEY); - } - - public static Label getAccessModeLabel() { - return Labels.withId(ACCESS_MODE); - } - - public static Label getConsumerGroupLabel() { - return Labels.withId(CONSUMER_GROUP); - } - - public static Label getAutoOffsetResetConfigLabel() { - return Labels.withId(AUTO_OFFSET_RESET_CONFIG); - } - - - public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean containsTopic) { - String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class); - String topic = ""; - if (containsTopic) { - topic = extractor.selectedSingleValue(TOPIC_KEY, String.class); - } - - Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class); - - String authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE); - boolean isUseSSL = isUseSSL(authentication); - - KafkaSecurityConfig securityConfig; - - //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig() - - // check if a user for the authentication is defined - if (authentication.equals(KafkaConnectUtils.SASL_SSL) || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) { - String username = extractor.singleValueParameter(USERNAME_KEY, String.class); - String password = extractor.secretValue(PASSWORD_KEY); - - securityConfig = isUseSSL - ? new KafkaSecuritySaslSSLConfig(username, password) : - new KafkaSecuritySaslPlainConfig(username, password); - } else { - // set security config for none authenticated access - securityConfig = isUseSSL - ? new KafkaSecurityUnauthenticatedSSLConfig() : - new KafkaSecurityUnauthenticatedPlainConfig(); - } - - String groupId; - if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)){ - groupId = "KafkaExampleConsumer" + System.currentTimeMillis(); - } else { - groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class); - } - - StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, - StaticPropertyAlternatives.class); - - // Set default value if no value is provided. - if (alternatives == null) { - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); - - return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig); - } else { - String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); - AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - - return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig); - } - } - - private static boolean isUseSSL(String authentication) { - if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN) - || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) { - return false; - } else { - return true; - } - } - - - public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() { - return Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)); - } - - public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() { - return Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_SSL)); - } - - public static StaticPropertyAlternative getAlternativesSaslPlain() { - return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_PLAIN), - StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP), - StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)), - StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY)))); - } - - public static StaticPropertyAlternative getAlternativesSaslSSL() { - return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_SSL), - StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP), - StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)), - StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY)))); - } - - public static StaticPropertyAlternative getAlternativesRandomGroupId(){ - return Alternatives.from(Labels.withId(RANDOM_GROUP_ID)); - } - - public static StaticPropertyAlternative getAlternativesGroupId(){ - return Alternatives.from(Labels.withId(KafkaConnectUtils.GROUP_ID), - StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.GROUP_ID_INPUT))); - } - - public static StaticPropertyAlternative getAlternativesLatest() { - return Alternatives.from(Labels.withId(LATEST)); - } - - public static StaticPropertyAlternative getAlternativesEarliest() { - return Alternatives.from(Labels.withId(EARLIEST)); - } - - public static StaticPropertyAlternative getAlternativesNone() { - return Alternatives.from(Labels.withId(NONE)); - } -} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java deleted file mode 100644 index b1e439547f..0000000000 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.connectors.kafka.sink; - -import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils; - -public class KafkaParameters { - - private final String kafkaHost; - - private final Integer kafkaPort; - - private final String topic; - - private final String authentication; - - private String username; - - private String password; - - private final boolean useSSL; - - public KafkaParameters(IDataSinkParameters params) { - var extractor = params.extractor(); - this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class); - this.kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class); - this.kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class); - this.authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE); - - if (!useAuthentication()) { - this.useSSL = KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication); - } else { - String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class); - String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY); - this.username = username; - this.password = password; - this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication); - } - } - - public String getKafkaHost() { - return kafkaHost; - } - - public Integer getKafkaPort() { - return kafkaPort; - } - - public String getTopic() { - return topic; - } - - public String getUsername() { - return username; - } - - public String getPassword() { - return password; - } - - public String getAuthentication() { - return authentication; - } - - public boolean isUseSSL() { - return useSSL; - } - - public boolean useAuthentication() { - return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication) - || KafkaConnectUtils.SASL_SSL.equals(this.authentication); - } -} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java index 238bbb8048..2a8db93fdc 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java @@ -24,13 +24,10 @@ import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; import org.apache.streampipes.messaging.kafka.SpKafkaProducer; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.runtime.Event; @@ -41,7 +38,6 @@ import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; -import java.util.List; import java.util.Map; public class KafkaPublishSink implements IStreamPipesDataSink { @@ -50,7 +46,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink { private JsonDataFormatDefinition dataFormatDefinition; - private KafkaParameters params; + private KafkaBaseConfig kafkaConfig; public KafkaPublishSink() { } @@ -68,15 +64,15 @@ public class KafkaPublishSink implements IStreamPipesDataSink { .requiredProperty(EpRequirements.anyProperty()) .build()) - .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false) - .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false) - .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092) + .requiredTextParameter(Labels.withId(KafkaConfigProvider.TOPIC_KEY), false, false) + .requiredTextParameter(Labels.withId(KafkaConfigProvider.HOST_KEY), false, false) + .requiredIntegerParameter(Labels.withId(KafkaConfigProvider.PORT_KEY), 9092) - .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE), - KafkaConnectUtils.getAlternativeUnauthenticatedPlain(), - KafkaConnectUtils.getAlternativeUnauthenticatedSSL(), - KafkaConnectUtils.getAlternativesSaslPlain(), - KafkaConnectUtils.getAlternativesSaslSSL()) + .requiredAlternatives(Labels.withId(KafkaConfigProvider.ACCESS_MODE), + KafkaConfigProvider.getAlternativeUnauthenticatedPlain(), + KafkaConfigProvider.getAlternativeUnauthenticatedSSL(), + KafkaConfigProvider.getAlternativesSaslPlain(), + KafkaConfigProvider.getAlternativesSaslSSL()) .build() ); } @@ -84,26 +80,13 @@ public class KafkaPublishSink implements IStreamPipesDataSink { @Override public void onPipelineStarted(IDataSinkParameters parameters, EventSinkRuntimeContext runtimeContext) { - this.params = new KafkaParameters(parameters); + this.kafkaConfig = new KafkaConfigExtractor().extractSinkConfig(parameters.extractor()); this.dataFormatDefinition = new JsonDataFormatDefinition(); - KafkaSecurityConfig securityConfig; - // check if a user for the authentication is defined - if (params.useAuthentication()) { - securityConfig = params.isUseSSL() - ? new KafkaSecuritySaslSSLConfig(params.getUsername(), params.getPassword()) : - new KafkaSecuritySaslPlainConfig(params.getUsername(), params.getPassword()); - } else { - // set security config for none authenticated access - securityConfig = params.isUseSSL() - ? new KafkaSecurityUnauthenticatedSSLConfig() : - new KafkaSecurityUnauthenticatedPlainConfig(); - } - this.producer = new SpKafkaProducer( - params.getKafkaHost() + ":" + params.getKafkaPort(), - params.getTopic(), - List.of(securityConfig)); + kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort(), + kafkaConfig.getTopic(), + kafkaConfig.getConfigAppenders()); } @Override diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en index 81b8b78859..4809237f5b 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en @@ -34,20 +34,23 @@ username.description= password.title=Password password.description= -access-mode.title=Access Mode -access-mode.description=Unauthenticated or SASL/PLAIN +access-mode.title=Security protocol +access-mode.description=Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property -unauthenticated-plain.title=Unauthenticated Plain +unauthenticated-plain.title=PLAINTEXT unauthenticated-plain.description=No authentication and plaintext -unauthenticated-ssl.title=Unauthenticated SSL +unauthenticated-ssl.title=SSL unauthenticated-ssl.description=Using SSL with no authentication -sasl-plain.title=SASL/PLAIN -sasl-plain.description=Username and password, no encryption +security-mechanism.title=Security Mechanism +security-mechanism.description=SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property + +sasl-plain.title=SASL/PLAINTEXT +sasl-plain.description=SASL authentication, no encryption sasl-ssl.title=SASL/SSL -sasl-ssl.description=Username and password, with ssl encryption +sasl-ssl.description=SASL authentication, with ssl encryption username-group.title=Username and password @@ -82,4 +85,4 @@ latest.title=Latest latest.description=Offsets are initialized to the Latest none.title=None -none.description=Consumer throws exceptions \ No newline at end of file +none.description=Consumer throws exceptions diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en index a2e8d1b5f1..0d73d6d2f4 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en @@ -51,4 +51,7 @@ sasl-plain.description=Username and password, no encryption sasl-ssl.title=SASL/SSL sasl-ssl.description=Username and password, with ssl encryption +security-mechanism.title=Security Mechanism +security-mechanism.description=SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property + username-group.title=Username and password diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java index 4b132d19f8..72f01b0c28 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java @@ -44,7 +44,7 @@ public class KeyStoreLoader { public KeyStoreLoader load(Environment env, Path securityDir) throws Exception { - var keystore = KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault()); + var keystore = KeyStore.getInstance(env.getOpcUaKeystoreType().getValueOrDefault()); var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault(); var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault(); var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault(); diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java index b5e242eb50..db830f1b7a 100644 --- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java +++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java @@ -22,7 +22,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; import org.apache.streampipes.integration.containers.KafkaContainer; import org.apache.streampipes.integration.containers.KafkaDevContainer; import org.apache.streampipes.manager.template.AdapterTemplateHandler; @@ -69,9 +69,9 @@ public class KafkaAdapterTester extends AdapterTesterBase { .get(5)) .setOptions(list); List<Map<String, Object>> configs = new ArrayList<>(); - configs.add(Map.of(KafkaConnectUtils.HOST_KEY, kafkaContainer.getBrokerHost())); - configs.add(Map.of(KafkaConnectUtils.PORT_KEY, kafkaContainer.getBrokerPort())); - configs.add(Map.of(KafkaConnectUtils.TOPIC_KEY, TOPIC)); + configs.add(Map.of(KafkaConfigProvider.HOST_KEY, kafkaContainer.getBrokerHost())); + configs.add(Map.of(KafkaConfigProvider.PORT_KEY, kafkaContainer.getBrokerPort())); + configs.add(Map.of(KafkaConfigProvider.TOPIC_KEY, TOPIC)); var template = new PipelineElementTemplate("name", "description", configs); diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java index 1e53ebd225..77c5cb5afa 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java @@ -18,9 +18,11 @@ package org.apache.streampipes.messaging.kafka.config; +import org.apache.streampipes.commons.exceptions.SpRuntimeException; + import java.util.Properties; public interface KafkaConfigAppender { - void appendConfig(Properties props); + void appendConfig(Properties props) throws SpRuntimeException; } diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java deleted file mode 100644 index 646033f437..0000000000 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.messaging.kafka.security; - -import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; - -public abstract class KafkaSecurityConfig implements KafkaConfigAppender { - -} diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java new file mode 100644 index 0000000000..1a1a9bd92a --- /dev/null +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.messaging.kafka.security; + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.util.Properties; + +public class KafkaSecurityProtocolConfigAppender implements KafkaConfigAppender { + + private final SecurityProtocol securityProtocol; + private final Environment env; + + public KafkaSecurityProtocolConfigAppender(SecurityProtocol securityProtocol, + Environment env) { + this.securityProtocol = securityProtocol; + this.env = env; + } + + @Override + public void appendConfig(Properties props) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.toString()); + + if (isSslProtocol()) { + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, env.getKeystoreType().getValueOrDefault()); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getKeystoreFilename().getValueOrDefault()); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getKeystorePassword().getValueOrDefault()); + + if (env.getKeyPassword().exists()) { + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getKeyPassword().getValueOrDefault()); + } + + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, env.getTruststoreType().getValueOrDefault()); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getTruststoreFilename().getValueOrDefault()); + + if (env.getTruststorePassword().exists()) { + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getTruststorePassword().getValueOrDefault()); + } + + if (env.getAllowSelfSignedCertificates().getValueOrDefault()) { + props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } + } + } + + private boolean isSslProtocol() { + return securityProtocol == SecurityProtocol.SSL || securityProtocol == SecurityProtocol.SASL_SSL; + } +} diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java similarity index 50% rename from streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java rename to streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java index 2c5ef691e3..68698f1217 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java @@ -18,34 +18,56 @@ package org.apache.streampipes.messaging.kafka.security; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; + import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; import java.util.Properties; -public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig { +public class KafkaSecuritySaslConfigAppender implements KafkaConfigAppender { + private final String securityMechanism; private final String username; private final String password; - public KafkaSecuritySaslSSLConfig(String username, String password) { + public KafkaSecuritySaslConfigAppender(String securityMechanism, + String username, + String password) { + this.securityMechanism = securityMechanism; this.username = username; this.password = password; } @Override - public void appendConfig(Properties props) { + public void appendConfig(Properties props) throws SpRuntimeException { + props.put(SaslConfigs.SASL_MECHANISM, securityMechanism); + String saslJaasConfig = makeJaasConfig(); + + props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig); + } - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString()); + private String makeJaasConfig() { + if (securityMechanism.equals("PLAIN")) { + return makeSaslPlainConfig(); + } else { + return makeSaslScramConfig(); + } + } - String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + private String makeSaslPlainConfig() { + return "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"; + } - props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig); + private String makeSaslScramConfig() { + return "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + + username + + "\" password=\"" + + password + + "\";"; } } diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java deleted file mode 100644 index a7e5fa94fc..0000000000 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.messaging.kafka.security; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; - -import java.util.Properties; - -public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig { - - private final String username; - private final String password; - - public KafkaSecuritySaslPlainConfig(String username, String password) { - this.username = username; - this.password = password; - } - - @Override - public void appendConfig(Properties props) { - - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString()); - - String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" - + username - + "\" password=\"" - + password - + "\";"; - - props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig); - } -} diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java deleted file mode 100644 index 8fdd02fd12..0000000000 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.messaging.kafka.security; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; - -import java.util.Properties; - -public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig { - - @Override - public void appendConfig(Properties props) { - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString()); - } -} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index 32cfb73044..ce66dbeed8 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -30,6 +30,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.resource.management.SpResourceManager; +import org.apache.streampipes.resource.management.secret.SecretProvider; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; @@ -75,6 +76,7 @@ public class RuntimeResolvableResource extends AbstractAdapterResource<WorkerAdm SpServiceUrlProvider.ADAPTER, runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags() ); + SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties()); RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); return ok(result); diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts index 564f2c2d06..5c3c14a0c9 100644 --- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts +++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts @@ -122,6 +122,7 @@ export abstract class BaseRuntimeResolvableInput< ngOnChanges(changes: SimpleChanges): void { if (changes['completedConfigurations']) { + console.log(changes['completedConfigurations']); if ( this.staticPropertyUtils.allDependenciesSatisfied( this.staticProperty.dependsOn, diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts index eb6ffcc10c..57588eb155 100644 --- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts +++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts @@ -18,6 +18,8 @@ import { Component, OnChanges, OnInit } from '@angular/core'; import { + Option, + RuntimeResolvableAnyStaticProperty, RuntimeResolvableOneOfStaticProperty, StaticPropertyUnion, } from '@streampipes/platform-services'; @@ -48,13 +50,39 @@ export class StaticRuntimeResolvableOneOfInputComponent } afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) { - this.staticProperty.options = staticProperty.options; if ( - this.staticProperty.options && - this.staticProperty.options.length > 0 + this.staticProperty.options?.length > 0 && + this.isOptionSelected() ) { - this.staticProperty.options[0].selected = true; + const selectedOption = this.staticProperty.options.find( + o => o.selected, + ); + this.addSelectedOption(staticProperty, selectedOption); + } else { + if (staticProperty.options?.length > 0) { + staticProperty.options[0].selected = true; + } } + this.staticProperty.options = staticProperty.options; + } + + isOptionSelected(): boolean { + return this.staticProperty.options.find(o => o.selected) !== undefined; + } + + addSelectedOption( + staticProperty: RuntimeResolvableOneOfStaticProperty, + selectedOption: Option, + ): void { + staticProperty.options + .filter(o => { + return o.internalName !== null + ? o.internalName === selectedOption.internalName + : o.name === selectedOption.name; + }) + .forEach(o => { + o.selected = true; + }); } select(id) { @@ -65,7 +93,6 @@ export class StaticRuntimeResolvableOneOfInputComponent option => option.elementId === id, ).selected = true; this.performValidation(); - this.applyCompletedConfiguration(true); } parse(
