This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 3362-saslssl-support-for-kafka-adapter
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 7bb56d583ac01d80d02fe3f0ec5583577d0be8b4
Author: Dominik Riemer <[email protected]>
AuthorDate: Sun Dec 1 21:58:58 2024 +0100

    feat(#3362): Add SSL/SASL support to Kafka adapter and sink
---
 pom.xml                                            |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |  16 +-
 .../constants/GlobalStreamPipesConstants.java      |   1 +
 .../commons/environment/DefaultEnvironment.java    |  42 +++-
 .../commons/environment/Environment.java           |  39 +---
 .../connectors/kafka/adapter/KafkaProtocol.java    |  84 ++++----
 .../kafka/shared/kafka/KafkaAdapterConfig.java     |  13 +-
 .../{KafkaConfig.java => KafkaBaseConfig.java}     |  51 ++---
 .../kafka/shared/kafka/KafkaConfigExtractor.java   | 121 ++++++++++++
 .../kafka/shared/kafka/KafkaConfigProvider.java    | 153 +++++++++++++++
 .../kafka/shared/kafka/KafkaConnectUtils.java      | 211 ---------------------
 .../connectors/kafka/sink/KafkaParameters.java     |  90 ---------
 .../connectors/kafka/sink/KafkaPublishSink.java    |  49 ++---
 .../strings.en                                     |  19 +-
 .../strings.en                                     |   3 +
 .../opcua/config/security/KeyStoreLoader.java      |   2 +-
 .../integration/adapters/KafkaAdapterTester.java   |   8 +-
 .../kafka/config/KafkaConfigAppender.java          |   4 +-
 .../kafka/security/KafkaSecurityConfig.java        |  25 ---
 .../KafkaSecurityProtocolConfigAppender.java       |  70 +++++++
 ...g.java => KafkaSecuritySaslConfigAppender.java} |  40 +++-
 .../security/KafkaSecuritySaslPlainConfig.java     |  51 -----
 .../KafkaSecurityUnauthenticatedSSLConfig.java     |  32 ----
 .../impl/connect/RuntimeResolvableResource.java    |   2 +
 .../base-runtime-resolvable-input.ts               |   1 +
 ...tic-runtime-resolvable-oneof-input.component.ts |  37 +++-
 26 files changed, 575 insertions(+), 591 deletions(-)

diff --git a/pom.xml b/pom.xml
index 8f0d8ffded..455675231a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
         <jsrosbridge.version>0.2.0</jsrosbridge.version>
         <jjwt.version>0.11.2</jjwt.version>
         <jts-core.version>1.19.0</jts-core.version>
-        <kafka.version>3.4.0</kafka.version>
+        <kafka.version>3.7.1</kafka.version>
         <lightcouch.version>0.2.0</lightcouch.version>
         
<maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version>
         <mailapi.version>1.4.3</mailapi.version>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7a2d033e5a..2339475adb 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -119,7 +119,21 @@ public enum Envs {
   SP_OPCUA_APPLICATION_URI(
       "SP_OPCUA_APPLICATION_URI",
       "urn:org:apache:streampipes:opcua:client"
-  );
+  ),
+
+  // Default keystore and truststore
+  SP_SECURITY_KEYSTORE_FILENAME(
+      "SP_SECURITY_KEYSTORE_FILENAME",
+      "/streampipes-security/keystore.pfx"),
+  SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
+  SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
+  SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
+  SP_SECURITY_TRUSTSTORE_FILENAME(
+      "SP_SECURITY_TRUSTSTORE_FILENAME",
+      "/streampipes-security/truststore.pfx"),
+  SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
+  SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
+  SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
 
   private final String envVariableName;
   private String defaultValue;
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
index b452d74c1a..409a9a9090 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
@@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
   public static final String STD_DOCUMENTATION_NAME = "documentation.md";
 
   public static final String INTERNAL_TOPIC_PREFIX = 
"org-apache-streampipes-internal-";
+  public static final String CONNECT_TOPIC_PREFIX = 
"org.apache.streampipes.connect.";
 
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index bb5bbac193..3434924e3e 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -349,7 +349,7 @@ public class DefaultEnvironment implements Environment {
   }
 
   @Override
-  public StringEnvironmentVariable getOPcUaKeystoreType() {
+  public StringEnvironmentVariable getOpcUaKeystoreType() {
     return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
   }
 
@@ -357,4 +357,44 @@ public class DefaultEnvironment implements Environment {
   public StringEnvironmentVariable getOpcUaKeystoreAlias() {
     return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
   }
+
+  @Override
+  public StringEnvironmentVariable getKeystoreFilename() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeystorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeystoreType() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeyPassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststoreFilename() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststoreType() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
+  }
+
+  @Override
+  public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
+    return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cb441f009b..d1c4adf6ae 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -32,79 +32,56 @@ public interface Environment {
   // Service base configuration
 
   StringEnvironmentVariable getServiceHost();
-
   IntEnvironmentVariable getServicePort();
 
   StringEnvironmentVariable getSpCoreScheme();
 
   StringEnvironmentVariable getSpCoreHost();
-
   IntEnvironmentVariable getSpCorePort();
 
   // Time series storage env variables
 
   StringEnvironmentVariable getTsStorage();
-
   StringEnvironmentVariable getTsStorageProtocol();
-
   StringEnvironmentVariable getTsStorageHost();
-
   IntEnvironmentVariable getTsStoragePort();
-
   StringEnvironmentVariable getTsStorageToken();
-
   StringEnvironmentVariable getTsStorageOrg();
-
   StringEnvironmentVariable getTsStorageBucket();
 
   IntEnvironmentVariable getIotDbSessionPoolSize();
-
   BooleanEnvironmentVariable getIotDbSessionEnableCompression();
-
   StringEnvironmentVariable getIotDbUser();
-
   StringEnvironmentVariable getIotDbPassword();
 
   // CouchDB env variables
 
   StringEnvironmentVariable getCouchDbProtocol();
-
   StringEnvironmentVariable getCouchDbHost();
-
   IntEnvironmentVariable getCouchDbPort();
-
   StringEnvironmentVariable getCouchDbUsername();
-
   StringEnvironmentVariable getCouchDbPassword();
 
 
   // JWT & Authentication
 
   StringEnvironmentVariable getClientUser();
-
   StringEnvironmentVariable getClientSecret();
 
   StringEnvironmentVariable getJwtSecret();
-
   StringEnvironmentVariable getJwtPublicKeyLoc();
-
   StringEnvironmentVariable getJwtPrivateKeyLoc();
-
   StringEnvironmentVariable getJwtSigningMode();
 
   StringEnvironmentVariable getExtensionsAuthMode();
-
   StringEnvironmentVariable getEncryptionPasscode();
 
   BooleanEnvironmentVariable getOAuthEnabled();
-
   StringEnvironmentVariable getOAuthRedirectUri();
-
   List<OAuthConfiguration> getOAuthConfigurations();
 
   // Messaging
   StringEnvironmentVariable getKafkaRetentionTimeMs();
-
   StringEnvironmentVariable getPrioritizedProtocol();
 
 
@@ -164,14 +141,18 @@ public interface Environment {
   StringEnvironmentVariable getAllowedUploadFiletypes();
 
   StringEnvironmentVariable getOpcUaSecurityDir();
-
   StringEnvironmentVariable getOpcUaKeystoreFile();
-
   StringEnvironmentVariable getOpcUaKeystorePassword();
-
   StringEnvironmentVariable getOpcUaApplicationUri();
-
-  StringEnvironmentVariable getOPcUaKeystoreType();
-
+  StringEnvironmentVariable getOpcUaKeystoreType();
   StringEnvironmentVariable getOpcUaKeystoreAlias();
+
+  StringEnvironmentVariable getKeystoreFilename();
+  StringEnvironmentVariable getKeystorePassword();
+  StringEnvironmentVariable getKeystoreType();
+  StringEnvironmentVariable getKeyPassword();
+  StringEnvironmentVariable getTruststoreFilename();
+  StringEnvironmentVariable getTruststorePassword();
+  StringEnvironmentVariable getTruststoreType();
+  BooleanEnvironmentVariable getAllowSelfSignedCertificates();
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index fb387b59ee..90dd1ee2a1 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -31,12 +31,12 @@ import 
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
 import 
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import 
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
 import 
org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -67,13 +67,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig {
 
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaProtocol.class);
-  KafkaConfig config;
+  KafkaAdapterConfig config;
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
 
@@ -84,14 +83,13 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
   }
 
   private void applyConfiguration(IStaticPropertyExtractor extractor) {
-    this.config = KafkaConnectUtils.getConfig(extractor, true);
+    this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, 
true);
   }
 
-  private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) 
throws KafkaException {
+  private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig 
kafkaConfig) throws KafkaException {
     final Properties props = new Properties();
 
-    kafkaConfig.getSecurityConfig().appendConfig(props);
-    kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
+    kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));
 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
@@ -113,34 +111,36 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
                                              IStaticPropertyExtractor 
extractor)
       throws SpConfigurationException {
     RuntimeResolvableOneOfStaticProperty config = extractor
-        .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, 
RuntimeResolvableOneOfStaticProperty.class);
-    KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
-    boolean hideInternalTopics = 
extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
+        .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, 
RuntimeResolvableOneOfStaticProperty.class);
+    var kafkaConfig = new 
KafkaConfigExtractor().extractAdapterConfig(extractor, false);
+    boolean hideInternalTopics = 
extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());
 
     try {
-      Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
-      Set<String> topics = consumer.listTopics().keySet();
+      var consumer = createConsumer(kafkaConfig);
+      List<String> topics = new 
ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
       consumer.close();
 
       if (hideInternalTopics) {
         topics = topics
             .stream()
-            .filter(t -> 
!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
-            .collect(Collectors.toSet());
+            .filter(t -> 
(!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
+                && 
!t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
+            .toList();
       }
 
       
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
 
       return config;
     } catch (KafkaException e) {
-      throw new SpConfigurationException(e.getMessage(), e);
+      var message = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
+      throw new SpConfigurationException(message, e);
     }
   }
 
   @Override
   public IAdapterConfiguration declareConfig() {
 
-    StaticPropertyAlternative latestAlternative = 
KafkaConnectUtils.getAlternativesLatest();
+    StaticPropertyAlternative latestAlternative = 
KafkaConfigProvider.getAlternativesLatest();
     latestAlternative.setSelected(true);
 
     return AdapterConfigurationBuilder
@@ -150,28 +150,28 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
 
-        .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
-            KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
-            KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
-            KafkaConnectUtils.getAlternativesSaslPlain(),
-            KafkaConnectUtils.getAlternativesSaslSSL())
+        .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
+            KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+            KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+            KafkaConfigProvider.getAlternativesSaslPlain(),
+            KafkaConfigProvider.getAlternativesSaslSSL())
 
-        .requiredTextParameter(KafkaConnectUtils.getHostLabel())
-        .requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
+        .requiredTextParameter(KafkaConfigProvider.getHostLabel())
+        .requiredIntegerParameter(KafkaConfigProvider.getPortLabel())
 
-        .requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
-            KafkaConnectUtils.getAlternativesRandomGroupId(),
-            KafkaConnectUtils.getAlternativesGroupId())
+        .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
+            KafkaConfigProvider.getAlternativesRandomGroupId(),
+            KafkaConfigProvider.getAlternativesGroupId())
 
-        .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), 
true)
+        .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), 
true)
 
-        
.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), 
Arrays.asList(
-            KafkaConnectUtils.HOST_KEY,
-            KafkaConnectUtils.PORT_KEY))
-        
.requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
-                KafkaConnectUtils.getAlternativesEarliest(),
-                latestAlternative,
-                KafkaConnectUtils.getAlternativesNone())
+        
.requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), 
Arrays.asList(
+            KafkaConfigProvider.HOST_KEY,
+            KafkaConfigProvider.PORT_KEY))
+        
.requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
+            KafkaConfigProvider.getAlternativesEarliest(),
+            latestAlternative,
+            KafkaConfigProvider.getAlternativesNone())
         .buildConfiguration();
   }
 
@@ -185,17 +185,11 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     protocol.setBrokerHostname(config.getKafkaHost());
     protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
 
-    List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2);
-    kafkaConfigAppenderList.add(this.config.getSecurityConfig());
-    kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());
-
     this.kafkaConsumer = new SpKafkaConsumer(protocol,
         config.getTopic(),
-        new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
-          collector.collect(event);
-        }),
-        kafkaConfigAppenderList
-        );
+        new BrokerEventProcessor(extractor.selectedParser(), collector),
+        config.getConfigAppenders()
+    );
 
     thread = new Thread(this.kafkaConsumer);
     thread.start();
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
similarity index 74%
rename from 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
rename to 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
index 235a6eda39..e24dccf040 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
@@ -16,14 +16,17 @@
  *
  */
 
-package org.apache.streampipes.messaging.kafka.security;
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
 
-import java.util.Properties;
+public class KafkaAdapterConfig extends KafkaBaseConfig {
 
-public class KafkaSecurityUnauthenticatedPlainConfig extends 
KafkaSecurityConfig {
+  private String groupId;
 
-  @Override
-  public void appendConfig(Properties props) {
+  public String getGroupId() {
+    return groupId;
+  }
 
+  public void setGroupId(String groupId) {
+    this.groupId = groupId;
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
similarity index 50%
rename from 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
rename to 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
index 7f02e4c5b1..dd43893e89 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
@@ -18,31 +18,20 @@
 
 package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
 
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 
-public class KafkaConfig {
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaBaseConfig {
 
   private String kafkaHost;
   private Integer kafkaPort;
   private String topic;
-  private String groupId;
-
-  KafkaSecurityConfig securityConfig;
-  AutoOffsetResetConfig autoOffsetResetConfig;
+  private List<KafkaConfigAppender> configAppenders;
 
-  public KafkaConfig(String kafkaHost,
-                     Integer kafkaPort,
-                     String topic,
-                     String groupId,
-                     KafkaSecurityConfig securityConfig,
-                     AutoOffsetResetConfig autoOffsetResetConfig) {
-    this.kafkaHost = kafkaHost;
-    this.kafkaPort = kafkaPort;
-    this.topic = topic;
-    this.groupId = groupId;
-    this.securityConfig = securityConfig;
-    this.autoOffsetResetConfig = autoOffsetResetConfig;
+  public KafkaBaseConfig() {
+    this.configAppenders = new ArrayList<>();
   }
 
   public String getKafkaHost() {
@@ -69,27 +58,11 @@ public class KafkaConfig {
     this.topic = topic;
   }
 
-  public String getGroupId() {
-    return groupId;
-  }
-
-  public void setGroupId(String groupId) {
-    this.groupId = groupId;
-  }
-
-  public KafkaSecurityConfig getSecurityConfig() {
-    return securityConfig;
-  }
-
-  public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
-    this.securityConfig = securityConfig;
-  }
-
-  public AutoOffsetResetConfig getAutoOffsetResetConfig() {
-    return autoOffsetResetConfig;
+  public List<KafkaConfigAppender> getConfigAppenders() {
+    return configAppenders;
   }
 
-  public void setAutoOffsetResetConfig(AutoOffsetResetConfig 
autoOffsetResetConfig) {
-    this.autoOffsetResetConfig = autoOffsetResetConfig;
+  public void setConfigAppenders(List<KafkaConfigAppender> configAppenders) {
+    this.configAppenders = configAppenders;
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
new file mode 100644
index 0000000000..d06e544e74
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
+import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.ArrayList;
+
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY;
+
+public class KafkaConfigExtractor {
+
+  public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor 
extractor,
+                                                 boolean containsTopic) {
+
+    var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
+
+    var topic = "";
+    if (containsTopic) {
+      topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
+    }
+    config.setTopic(topic);
+
+    if 
(extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID))
 {
+      config.setGroupId("StreamPipesKafkaConsumer" + 
System.currentTimeMillis());
+    } else {
+      config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, 
String.class));
+    }
+
+    StaticPropertyAlternatives alternatives = 
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
+        StaticPropertyAlternatives.class);
+
+    // Set default value if no value is provided.
+    if (alternatives == null) {
+      config.getConfigAppenders().add(new 
AutoOffsetResetConfig(KafkaConfigProvider.LATEST));
+    } else {
+      String auto = 
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
+      config.getConfigAppenders().add(new AutoOffsetResetConfig(auto));
+    }
+    return config;
+  }
+
+  public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) {
+    var config = extractCommonConfigs(extractor, new KafkaBaseConfig());
+    config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class));
+
+    return config;
+  }
+
+  private <T extends KafkaBaseConfig> T 
extractCommonConfigs(IParameterExtractor extractor,
+                                                             T config) {
+    var configAppenders = new ArrayList<KafkaConfigAppender>();
+    var env = Environments.getEnvironment();
+    config.setKafkaHost(extractor.singleValueParameter(HOST_KEY, 
String.class));
+    config.setKafkaPort(extractor.singleValueParameter(PORT_KEY, 
Integer.class));
+
+    var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+    var securityProtocol = getSecurityProtocol(authentication);
+    configAppenders.add(new 
KafkaSecurityProtocolConfigAppender(securityProtocol, env));
+
+    // check if SASL authentication is defined
+    if (isSaslSecurityMechanism(securityProtocol)) {
+      String username = extractor.singleValueParameter(USERNAME_KEY, 
String.class);
+      String password = extractor.secretValue(PASSWORD_KEY);
+      String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM, 
String.class);
+
+      configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, 
username, password));
+    }
+    config.setConfigAppenders(configAppenders);
+
+    return config;
+  }
+
+  private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) {
+    return SecurityProtocol.SASL_PLAINTEXT == securityProtocol || 
SecurityProtocol.SASL_SSL == securityProtocol;
+  }
+
+  public SecurityProtocol getSecurityProtocol(String 
selectedSecurityConfiguration) {
+    return switch (selectedSecurityConfiguration) {
+      case "unauthenticated-ssl" -> SecurityProtocol.SSL;
+      case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT;
+      case "sasl-ssl" -> SecurityProtocol.SASL_SSL;
+      default -> SecurityProtocol.PLAINTEXT;
+    };
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
new file mode 100644
index 0000000000..05f652f780
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.List;
+
+public class KafkaConfigProvider {
+
+  public static final String TOPIC_KEY = "topic";
+  public static final String HOST_KEY = "host";
+  public static final String PORT_KEY = "port";
+
+  public static final String ACCESS_MODE = "access-mode";
+  public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+  public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+  public static final String SASL_PLAIN = "sasl-plain";
+  public static final String SASL_SSL = "sasl-ssl";
+
+  public static final String SECURITY_MECHANISM = "security-mechanism";
+  public static final String USERNAME_GROUP = "username-group";
+  public static final String USERNAME_KEY = "username";
+  public static final String PASSWORD_KEY = "password";
+
+  public static final String CONSUMER_GROUP = "consumer-group";
+  public static final String RANDOM_GROUP_ID = "random-group-id";
+  public static final String GROUP_ID = "group-id";
+  public static final String GROUP_ID_INPUT = "group-id-input";
+
+
+  private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
+
+  public static final String AUTO_OFFSET_RESET_CONFIG = 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+  public static final String EARLIEST = "earliest";
+  public static final String LATEST = "latest";
+  public static final String NONE = "none";
+
+  public static Label getTopicLabel() {
+    return Labels.withId(TOPIC_KEY);
+  }
+
+  public static Label getHideInternalTopicsLabel() {
+    return Labels.withId(HIDE_INTERNAL_TOPICS);
+  }
+
+  public static String getHideInternalTopicsKey() {
+    return HIDE_INTERNAL_TOPICS;
+  }
+
+  public static Label getHostLabel() {
+    return Labels.withId(HOST_KEY);
+  }
+
+  public static Label getPortLabel() {
+    return Labels.withId(PORT_KEY);
+  }
+
+  public static Label getAccessModeLabel() {
+    return Labels.withId(ACCESS_MODE);
+  }
+
+  public static Label getConsumerGroupLabel() {
+    return Labels.withId(CONSUMER_GROUP);
+  }
+
+  public static Label getAutoOffsetResetConfigLabel() {
+    return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
+  }
+
+  public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() 
{
+    return 
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN));
+  }
+
+  public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
+    return 
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL));
+  }
+
+  public static StaticPropertyAlternative getAlternativesSaslPlain() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN),
+        makeAuthenticationGroup()
+    );
+  }
+
+  public static StaticPropertyAlternative getAlternativesSaslSSL() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL),
+        makeAuthenticationGroup());
+  }
+
+  public static StaticPropertyAlternative getAlternativesRandomGroupId() {
+    return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
+  }
+
+  public static StaticPropertyAlternative getAlternativesGroupId() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID),
+        
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT)));
+  }
+
+  public static StaticPropertyAlternative getAlternativesLatest() {
+    return Alternatives.from(Labels.withId(LATEST));
+  }
+
+  public static StaticPropertyAlternative getAlternativesEarliest() {
+    return Alternatives.from(Labels.withId(EARLIEST));
+  }
+
+  public static StaticPropertyAlternative getAlternativesNone() {
+    return Alternatives.from(Labels.withId(NONE));
+  }
+
+  private static StaticPropertyGroup makeAuthenticationGroup() {
+    var group = 
StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP),
+        StaticProperties.singleValueSelection(
+            Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+            makeSecurityMechanism()),
+        
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)),
+        
StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY)));
+    group.setHorizontalRendering(false);
+    return group;
+  }
+
+  public static List<Option> makeSecurityMechanism() {
+    return List.of(
+        new Option("PLAIN"),
+        new Option("SCRAM-SHA-256"),
+        new Option("SCRAM-SHA-512")
+    );
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
deleted file mode 100644
index aa473dcbf7..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-
-import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Label;
-import org.apache.streampipes.sdk.helpers.Labels;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-public class KafkaConnectUtils {
-
-  public static final String TOPIC_KEY = "topic";
-  public static final String HOST_KEY = "host";
-  public static final String PORT_KEY = "port";
-
-  public static final String KEY_SERIALIZATION = "key-serialization";
-  public static final String VALUE_SERIALIZATION = "value-serialization";
-
-  public static final String KEY_DESERIALIZATION = "key-deserialization";
-  public static final String VALUE_DESERIALIZATION = "value-deserialization";
-
-  public static final String ACCESS_MODE = "access-mode";
-  public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
-  public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
-  public static final String SASL_PLAIN = "sasl-plain";
-  public static final String SASL_SSL = "sasl-ssl";
-
-  public static final String USERNAME_GROUP = "username-group";
-  public static final String USERNAME_KEY = "username";
-  public static final String PASSWORD_KEY = "password";
-
-  public static final String CONSUMER_GROUP = "consumer-group";
-  public static final String RANDOM_GROUP_ID = "random-group-id";
-  public static final String GROUP_ID = "group-id";
-  public static final String GROUP_ID_INPUT = "group-id-input";
-
-
-  private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
-
-  public static final String AUTO_OFFSET_RESET_CONFIG = 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-  public static final String EARLIEST = "earliest";
-  public static final String LATEST = "latest";
-  public static final String NONE = "none";
-
-  public static Label getTopicLabel() {
-    return Labels.withId(TOPIC_KEY);
-  }
-
-  public static Label getHideInternalTopicsLabel() {
-    return Labels.withId(HIDE_INTERNAL_TOPICS);
-  }
-
-  public static String getHideInternalTopicsKey() {
-    return HIDE_INTERNAL_TOPICS;
-  }
-
-  public static Label getHostLabel() {
-    return Labels.withId(HOST_KEY);
-  }
-
-  public static Label getPortLabel() {
-    return Labels.withId(PORT_KEY);
-  }
-
-  public static Label getAccessModeLabel() {
-    return Labels.withId(ACCESS_MODE);
-  }
-
-  public static Label getConsumerGroupLabel() {
-    return Labels.withId(CONSUMER_GROUP);
-  }
-
-  public static Label getAutoOffsetResetConfigLabel() {
-    return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
-  }
-
-
-  public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, 
boolean containsTopic) {
-    String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
-    String topic = "";
-    if (containsTopic) {
-      topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
-    }
-
-    Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-
-    String authentication = 
extractor.selectedAlternativeInternalId(ACCESS_MODE);
-    boolean isUseSSL = isUseSSL(authentication);
-
-    KafkaSecurityConfig securityConfig;
-
-    //KafkaSerializerConfig serializerConfig = new 
KafkaSerializerByteArrayConfig()
-
-    // check if a user for the authentication is defined
-    if (authentication.equals(KafkaConnectUtils.SASL_SSL) || 
authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
-      String username = extractor.singleValueParameter(USERNAME_KEY, 
String.class);
-      String password = extractor.secretValue(PASSWORD_KEY);
-
-      securityConfig = isUseSSL
-          ? new KafkaSecuritySaslSSLConfig(username, password) :
-          new KafkaSecuritySaslPlainConfig(username, password);
-    } else {
-      // set security config for none authenticated access
-      securityConfig = isUseSSL
-          ? new KafkaSecurityUnauthenticatedSSLConfig() :
-          new KafkaSecurityUnauthenticatedPlainConfig();
-    }
-
-    String groupId;
-    if 
(extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)){
-      groupId = "KafkaExampleConsumer" + System.currentTimeMillis();
-    } else {
-      groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class);
-    }
-
-    StaticPropertyAlternatives alternatives = 
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
-            StaticPropertyAlternatives.class);
-
-    // Set default value if no value is provided.
-    if (alternatives == null) {
-      AutoOffsetResetConfig autoOffsetResetConfig = new 
AutoOffsetResetConfig(KafkaConnectUtils.LATEST);
-
-      return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, 
autoOffsetResetConfig);
-    } else {
-      String auto = 
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
-      AutoOffsetResetConfig autoOffsetResetConfig = new 
AutoOffsetResetConfig(auto);
-
-      return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, 
autoOffsetResetConfig);
-    }
-  }
-
-  private static boolean isUseSSL(String authentication) {
-    if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)
-        || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-
-  public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() 
{
-    return 
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_PLAIN));
-  }
-
-  public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
-    return 
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_SSL));
-  }
-
-  public static StaticPropertyAlternative getAlternativesSaslPlain() {
-    return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_PLAIN),
-        StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-            
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-            
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
-  }
-
-  public static StaticPropertyAlternative getAlternativesSaslSSL() {
-    return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_SSL),
-        StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-            
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-            
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
-  }
-
-  public static StaticPropertyAlternative getAlternativesRandomGroupId(){
-    return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
-  }
-
-  public static StaticPropertyAlternative getAlternativesGroupId(){
-    return Alternatives.from(Labels.withId(KafkaConnectUtils.GROUP_ID),
-        
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.GROUP_ID_INPUT)));
-  }
-
-  public static StaticPropertyAlternative getAlternativesLatest() {
-    return Alternatives.from(Labels.withId(LATEST));
-  }
-
-  public static StaticPropertyAlternative getAlternativesEarliest() {
-    return Alternatives.from(Labels.withId(EARLIEST));
-  }
-
-  public static StaticPropertyAlternative getAlternativesNone() {
-    return Alternatives.from(Labels.withId(NONE));
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
deleted file mode 100644
index b1e439547f..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.sink;
-
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
-
-public class KafkaParameters {
-
-  private final String kafkaHost;
-
-  private final Integer kafkaPort;
-
-  private final String topic;
-
-  private final String authentication;
-
-  private String username;
-
-  private String password;
-
-  private final boolean useSSL;
-
-  public KafkaParameters(IDataSinkParameters params) {
-    var extractor = params.extractor();
-    this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, 
String.class);
-    this.kafkaHost = 
extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
-    this.kafkaPort = 
extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
-    this.authentication = 
extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
-
-    if (!useAuthentication()) {
-      this.useSSL = 
KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication);
-    } else {
-      String username = 
extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
-      String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
-      this.username = username;
-      this.password = password;
-      this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication);
-    }
-  }
-
-  public String getKafkaHost() {
-    return kafkaHost;
-  }
-
-  public Integer getKafkaPort() {
-    return kafkaPort;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public String getUsername() {
-    return username;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public String getAuthentication() {
-    return authentication;
-  }
-
-  public boolean isUseSSL() {
-    return useSSL;
-  }
-
-  public boolean useAuthentication() {
-    return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication)
-        || KafkaConnectUtils.SASL_SSL.equals(this.authentication);
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
index 238bbb8048..2a8db93fdc 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
@@ -24,13 +24,10 @@ import 
org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
@@ -41,7 +38,6 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 
-import java.util.List;
 import java.util.Map;
 
 public class KafkaPublishSink implements IStreamPipesDataSink {
@@ -50,7 +46,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink 
{
 
   private JsonDataFormatDefinition dataFormatDefinition;
 
-  private KafkaParameters params;
+  private KafkaBaseConfig kafkaConfig;
 
   public KafkaPublishSink() {
   }
@@ -68,15 +64,15 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
                 .requiredProperty(EpRequirements.anyProperty())
                 .build())
 
-            .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), 
false, false)
-            .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), 
false, false)
-            
.requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+            
.requiredTextParameter(Labels.withId(KafkaConfigProvider.TOPIC_KEY), false, 
false)
+            
.requiredTextParameter(Labels.withId(KafkaConfigProvider.HOST_KEY), false, 
false)
+            
.requiredIntegerParameter(Labels.withId(KafkaConfigProvider.PORT_KEY), 9092)
 
-            .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
-                KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
-                KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
-                KafkaConnectUtils.getAlternativesSaslPlain(),
-                KafkaConnectUtils.getAlternativesSaslSSL())
+            
.requiredAlternatives(Labels.withId(KafkaConfigProvider.ACCESS_MODE),
+                KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+                KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+                KafkaConfigProvider.getAlternativesSaslPlain(),
+                KafkaConfigProvider.getAlternativesSaslSSL())
             .build()
     );
   }
@@ -84,26 +80,13 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
   @Override
   public void onPipelineStarted(IDataSinkParameters parameters,
                                 EventSinkRuntimeContext runtimeContext) {
-    this.params = new KafkaParameters(parameters);
+    this.kafkaConfig = new 
KafkaConfigExtractor().extractSinkConfig(parameters.extractor());
     this.dataFormatDefinition = new JsonDataFormatDefinition();
 
-    KafkaSecurityConfig securityConfig;
-    // check if a user for the authentication is defined
-    if (params.useAuthentication()) {
-      securityConfig = params.isUseSSL()
-          ? new KafkaSecuritySaslSSLConfig(params.getUsername(), 
params.getPassword()) :
-          new KafkaSecuritySaslPlainConfig(params.getUsername(), 
params.getPassword());
-    } else {
-      // set security config for none authenticated access
-      securityConfig = params.isUseSSL()
-          ? new KafkaSecurityUnauthenticatedSSLConfig() :
-          new KafkaSecurityUnauthenticatedPlainConfig();
-    }
-
     this.producer = new SpKafkaProducer(
-        params.getKafkaHost() + ":" + params.getKafkaPort(),
-        params.getTopic(),
-        List.of(securityConfig));
+        kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort(),
+        kafkaConfig.getTopic(),
+        kafkaConfig.getConfigAppenders());
   }
 
   @Override
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index 81b8b78859..4809237f5b 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -34,20 +34,23 @@ username.description=
 password.title=Password
 password.description=
 
-access-mode.title=Access Mode
-access-mode.description=Unauthenticated or SASL/PLAIN
+access-mode.title=Security protocol
+access-mode.description=Security protocol used to communicate with brokers. 
Corresponds to Kafka Client security.protocol property
 
-unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.title=PLAINTEXT
 unauthenticated-plain.description=No authentication and plaintext
 
-unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.title=SSL
 unauthenticated-ssl.description=Using SSL with no authentication
 
-sasl-plain.title=SASL/PLAIN
-sasl-plain.description=Username and password, no encryption
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication. 
Corresponds to Kafka Client sasl.mechanism property
+
+sasl-plain.title=SASL/PLAINTEXT
+sasl-plain.description=SASL authentication, no encryption
 
 sasl-ssl.title=SASL/SSL
-sasl-ssl.description=Username and password, with ssl encryption
+sasl-ssl.description=SASL authentication, with ssl encryption
 
 username-group.title=Username and password
 
@@ -82,4 +85,4 @@ latest.title=Latest
 latest.description=Offsets are initialized to the Latest
 
 none.title=None
-none.description=Consumer throws exceptions
\ No newline at end of file
+none.description=Consumer throws exceptions
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index a2e8d1b5f1..0d73d6d2f4 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -51,4 +51,7 @@ sasl-plain.description=Username and password, no encryption
 sasl-ssl.title=SASL/SSL
 sasl-ssl.description=Username and password, with ssl encryption
 
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication. 
Corresponds to Kafka Client sasl.mechanism property
+
 username-group.title=Username and password
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
index 4b132d19f8..72f01b0c28 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -44,7 +44,7 @@ public class KeyStoreLoader {
 
   public KeyStoreLoader load(Environment env,
                              Path securityDir) throws Exception {
-    var keystore = 
KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault());
+    var keystore = 
KeyStore.getInstance(env.getOpcUaKeystoreType().getValueOrDefault());
     var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
     var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
     var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault();
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index b5e242eb50..db830f1b7a 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -22,7 +22,7 @@ import 
org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
 import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
 import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import org.apache.streampipes.integration.containers.KafkaContainer;
 import org.apache.streampipes.integration.containers.KafkaDevContainer;
 import org.apache.streampipes.manager.template.AdapterTemplateHandler;
@@ -69,9 +69,9 @@ public class KafkaAdapterTester extends AdapterTesterBase {
             .get(5))
             .setOptions(list);
     List<Map<String, Object>> configs = new ArrayList<>();
-    configs.add(Map.of(KafkaConnectUtils.HOST_KEY, 
kafkaContainer.getBrokerHost()));
-    configs.add(Map.of(KafkaConnectUtils.PORT_KEY, 
kafkaContainer.getBrokerPort()));
-    configs.add(Map.of(KafkaConnectUtils.TOPIC_KEY, TOPIC));
+    configs.add(Map.of(KafkaConfigProvider.HOST_KEY, 
kafkaContainer.getBrokerHost()));
+    configs.add(Map.of(KafkaConfigProvider.PORT_KEY, 
kafkaContainer.getBrokerPort()));
+    configs.add(Map.of(KafkaConfigProvider.TOPIC_KEY, TOPIC));
     var template = new PipelineElementTemplate("name", "description", configs);
 
 
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
index 1e53ebd225..77c5cb5afa 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
@@ -18,9 +18,11 @@
 
 package org.apache.streampipes.messaging.kafka.config;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
 import java.util.Properties;
 
 public interface KafkaConfigAppender {
 
-  void appendConfig(Properties props);
+  void appendConfig(Properties props) throws SpRuntimeException;
 }
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
deleted file mode 100644
index 646033f437..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-public abstract class KafkaSecurityConfig implements KafkaConfigAppender {
-
-}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
new file mode 100644
index 0000000000..1a1a9bd92a
--- /dev/null
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecurityProtocolConfigAppender implements 
KafkaConfigAppender {
+
+  private final SecurityProtocol securityProtocol;
+  private final Environment env;
+
+  public KafkaSecurityProtocolConfigAppender(SecurityProtocol securityProtocol,
+                                             Environment env) {
+    this.securityProtocol = securityProtocol;
+    this.env = env;
+  }
+
+  @Override
+  public void appendConfig(Properties props) {
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.toString());
+
+    if (isSslProtocol()) {
+      props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
env.getKeystoreType().getValueOrDefault());
+      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
env.getKeystoreFilename().getValueOrDefault());
+      props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
env.getKeystorePassword().getValueOrDefault());
+
+      if (env.getKeyPassword().exists()) {
+        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
env.getKeyPassword().getValueOrDefault());
+      }
+
+      props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
env.getTruststoreType().getValueOrDefault());
+      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
env.getTruststoreFilename().getValueOrDefault());
+
+      if (env.getTruststorePassword().exists()) {
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
env.getTruststorePassword().getValueOrDefault());
+      }
+
+      if (env.getAllowSelfSignedCertificates().getValueOrDefault()) {
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+      }
+    }
+  }
+
+  private boolean isSslProtocol() {
+    return securityProtocol == SecurityProtocol.SSL || securityProtocol == 
SecurityProtocol.SASL_SSL;
+  }
+}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
similarity index 50%
rename from 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
rename to 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
index 2c5ef691e3..68698f1217 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
@@ -18,34 +18,56 @@
 
 package org.apache.streampipes.messaging.kafka.security;
 
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
 import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.Properties;
 
-public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {
+public class KafkaSecuritySaslConfigAppender implements KafkaConfigAppender {
 
+  private final String securityMechanism;
   private final String username;
   private final String password;
 
-  public KafkaSecuritySaslSSLConfig(String username, String password) {
+  public KafkaSecuritySaslConfigAppender(String securityMechanism,
+                                         String username,
+                                         String password) {
+    this.securityMechanism = securityMechanism;
     this.username = username;
     this.password = password;
   }
 
   @Override
-  public void appendConfig(Properties props) {
+  public void appendConfig(Properties props) throws SpRuntimeException {
+    props.put(SaslConfigs.SASL_MECHANISM, securityMechanism);
+    String saslJaasConfig = makeJaasConfig();
+
+    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+  }
 
-    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_SSL.toString());
+  private String makeJaasConfig() {
+    if (securityMechanism.equals("PLAIN")) {
+      return makeSaslPlainConfig();
+    } else {
+      return makeSaslScramConfig();
+    }
+  }
 
-    String saslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+  private String makeSaslPlainConfig() {
+    return "org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\""
         + username
         + "\" password=\""
         + password
         + "\";";
+  }
 
-    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+  private String makeSaslScramConfig() {
+    return "org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\""
+        + username
+        + "\" password=\""
+        + password
+        + "\";";
   }
 }
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
deleted file mode 100644
index a7e5fa94fc..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {
-
-  private final String username;
-  private final String password;
-
-  public KafkaSecuritySaslPlainConfig(String username, String password) {
-    this.username = username;
-    this.password = password;
-  }
-
-  @Override
-  public void appendConfig(Properties props) {
-
-    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_PLAINTEXT.toString());
-
-    String saslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
-        + username
-        + "\" password=\""
-        + password
-        + "\";";
-
-    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
-  }
-}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
deleted file mode 100644
index 8fdd02fd12..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig 
{
-
-  @Override
-  public void appendConfig(Properties props) {
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SSL.toString());
-  }
-}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index 32cfb73044..ce66dbeed8 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.resource.management.secret.SecretProvider;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
@@ -75,6 +76,7 @@ public class RuntimeResolvableResource extends 
AbstractAdapterResource<WorkerAdm
           SpServiceUrlProvider.ADAPTER,
           
runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags()
       );
+      
SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties());
       RuntimeOptionsResponse result = 
WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest);
 
       return ok(result);
diff --git 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
index 564f2c2d06..5c3c14a0c9 100644
--- 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
+++ 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
@@ -122,6 +122,7 @@ export abstract class BaseRuntimeResolvableInput<
 
     ngOnChanges(changes: SimpleChanges): void {
         if (changes['completedConfigurations']) {
+            console.log(changes['completedConfigurations']);
             if (
                 this.staticPropertyUtils.allDependenciesSatisfied(
                     this.staticProperty.dependsOn,
diff --git 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index eb6ffcc10c..57588eb155 100644
--- 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -18,6 +18,8 @@
 
 import { Component, OnChanges, OnInit } from '@angular/core';
 import {
+    Option,
+    RuntimeResolvableAnyStaticProperty,
     RuntimeResolvableOneOfStaticProperty,
     StaticPropertyUnion,
 } from '@streampipes/platform-services';
@@ -48,13 +50,39 @@ export class StaticRuntimeResolvableOneOfInputComponent
     }
 
     afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
-        this.staticProperty.options = staticProperty.options;
         if (
-            this.staticProperty.options &&
-            this.staticProperty.options.length > 0
+            this.staticProperty.options?.length > 0 &&
+            this.isOptionSelected()
         ) {
-            this.staticProperty.options[0].selected = true;
+            const selectedOption = this.staticProperty.options.find(
+                o => o.selected,
+            );
+            this.addSelectedOption(staticProperty, selectedOption);
+        } else {
+            if (staticProperty.options?.length > 0) {
+                staticProperty.options[0].selected = true;
+            }
         }
+        this.staticProperty.options = staticProperty.options;
+    }
+
+    isOptionSelected(): boolean {
+        return this.staticProperty.options.find(o => o.selected) !== undefined;
+    }
+
+    addSelectedOption(
+        staticProperty: RuntimeResolvableOneOfStaticProperty,
+        selectedOption: Option,
+    ): void {
+        staticProperty.options
+            .filter(o => {
+                return o.internalName !== null
+                    ? o.internalName === selectedOption.internalName
+                    : o.name === selectedOption.name;
+            })
+            .forEach(o => {
+                o.selected = true;
+            });
     }
 
     select(id) {
@@ -65,7 +93,6 @@ export class StaticRuntimeResolvableOneOfInputComponent
             option => option.elementId === id,
         ).selected = true;
         this.performValidation();
-        this.applyCompletedConfiguration(true);
     }
 
     parse(

Reply via email to