This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 42189bde77 feat(#3355): Add new avro format parser (#3358)
42189bde77 is described below

commit 42189bde77041fe0b649b574d15cfc8ff5adff15
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Dec 3 08:32:07 2024 +0100

    feat(#3355): Add new avro format parser (#3358)
    
    * Add avro parser
    
    * Improve avro parse exception
    
    * Update parser description rendering
    
    * feat(#3357): Add consumer group to Kafka adapter
    
    * Add parsing of avro field description
    
    * Revert parsing of avro field description
    
    * Fix tests
    
    * feat(#3362): Add SSL/SASL support to Kafka adapter and sink (#3364)
    
    * feat(#3362): Add SSL/SASL support to Kafka adapter and sink
    
    * Remove logging
    
    * Add Kafka migrations
    
    * Fix Kafka test
    
    ---------
    
    Co-authored-by: Dominik Riemer <[email protected]>
---
 pom.xml                                            |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |  16 +-
 .../constants/GlobalStreamPipesConstants.java      |   1 +
 .../commons/environment/DefaultEnvironment.java    |  42 ++++-
 .../commons/environment/Environment.java           |  39 +---
 streampipes-extensions-management/pom.xml          |   5 +
 .../connect/adapter/BrokerEventProcessor.java      |   5 +-
 .../connect/adapter/parser/AvroParser.java         | 209 +++++++++++++++++++++
 .../management/connect/adapter/parser/Parsers.java |   3 +-
 .../kafka/KafkaConnectorsModuleExport.java         |   8 +-
 .../connectors/kafka/adapter/KafkaProtocol.java    |  87 +++++----
 .../kafka/migration/KafkaAdapterMigrationV1.java   |  97 ++++++++++
 .../kafka/migration/KafkaSinkMigrationV1.java      |  51 +++++
 .../kafka/shared/kafka/KafkaAdapterConfig.java     |  13 +-
 .../{KafkaConfig.java => KafkaBaseConfig.java}     |  40 ++--
 .../kafka/shared/kafka/KafkaConfigExtractor.java   | 121 ++++++++++++
 .../kafka/shared/kafka/KafkaConfigProvider.java    | 153 +++++++++++++++
 .../kafka/shared/kafka/KafkaConnectUtils.java      | 187 ------------------
 .../connectors/kafka/sink/KafkaParameters.java     |  90 ---------
 .../connectors/kafka/sink/KafkaPublishSink.java    |  58 +++---
 .../strings.en                                     |  31 ++-
 .../strings.en                                     |   9 +-
 .../opcua/config/security/KeyStoreLoader.java      |   2 +-
 .../streampipes-extensions-all-jvm/pom.xml         |   5 +
 .../integration/adapters/KafkaAdapterTester.java   |  24 ++-
 .../kafka/config/KafkaConfigAppender.java          |   4 +-
 .../kafka/security/KafkaSecurityConfig.java        |  25 ---
 .../KafkaSecurityProtocolConfigAppender.java       |  70 +++++++
 ...g.java => KafkaSecuritySaslConfigAppender.java} |  40 +++-
 .../security/KafkaSecuritySaslPlainConfig.java     |  51 -----
 .../KafkaSecurityUnauthenticatedSSLConfig.java     |  32 ----
 .../impl/connect/RuntimeResolvableResource.java    |   2 +
 .../builder/adapter/ParserDescriptionBuilder.java  |   1 +
 ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts    |   4 +-
 ...tic-runtime-resolvable-oneof-input.component.ts |  37 +++-
 35 files changed, 990 insertions(+), 574 deletions(-)

diff --git a/pom.xml b/pom.xml
index 8f0d8ffded..455675231a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
         <jsrosbridge.version>0.2.0</jsrosbridge.version>
         <jjwt.version>0.11.2</jjwt.version>
         <jts-core.version>1.19.0</jts-core.version>
-        <kafka.version>3.4.0</kafka.version>
+        <kafka.version>3.7.1</kafka.version>
         <lightcouch.version>0.2.0</lightcouch.version>
         
<maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version>
         <mailapi.version>1.4.3</mailapi.version>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7a2d033e5a..2339475adb 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -119,7 +119,21 @@ public enum Envs {
   SP_OPCUA_APPLICATION_URI(
       "SP_OPCUA_APPLICATION_URI",
       "urn:org:apache:streampipes:opcua:client"
-  );
+  ),
+
+  // Default keystore and truststore
+  SP_SECURITY_KEYSTORE_FILENAME(
+      "SP_SECURITY_KEYSTORE_FILENAME",
+      "/streampipes-security/keystore.pfx"),
+  SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
+  SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
+  SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
+  SP_SECURITY_TRUSTSTORE_FILENAME(
+      "SP_SECURITY_TRUSTSTORE_FILENAME",
+      "/streampipes-security/truststore.pfx"),
+  SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
+  SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
+  SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
 
   private final String envVariableName;
   private String defaultValue;
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
index b452d74c1a..409a9a9090 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
@@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
   public static final String STD_DOCUMENTATION_NAME = "documentation.md";
 
   public static final String INTERNAL_TOPIC_PREFIX = 
"org-apache-streampipes-internal-";
+  public static final String CONNECT_TOPIC_PREFIX = 
"org.apache.streampipes.connect.";
 
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index bb5bbac193..3434924e3e 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -349,7 +349,7 @@ public class DefaultEnvironment implements Environment {
   }
 
   @Override
-  public StringEnvironmentVariable getOPcUaKeystoreType() {
+  public StringEnvironmentVariable getOpcUaKeystoreType() {
     return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
   }
 
@@ -357,4 +357,44 @@ public class DefaultEnvironment implements Environment {
   public StringEnvironmentVariable getOpcUaKeystoreAlias() {
     return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
   }
+
+  @Override
+  public StringEnvironmentVariable getKeystoreFilename() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeystorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeystoreType() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getKeyPassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststoreFilename() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getTruststoreType() {
+    return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
+  }
+
+  @Override
+  public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
+    return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cb441f009b..d1c4adf6ae 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -32,79 +32,56 @@ public interface Environment {
   // Service base configuration
 
   StringEnvironmentVariable getServiceHost();
-
   IntEnvironmentVariable getServicePort();
 
   StringEnvironmentVariable getSpCoreScheme();
 
   StringEnvironmentVariable getSpCoreHost();
-
   IntEnvironmentVariable getSpCorePort();
 
   // Time series storage env variables
 
   StringEnvironmentVariable getTsStorage();
-
   StringEnvironmentVariable getTsStorageProtocol();
-
   StringEnvironmentVariable getTsStorageHost();
-
   IntEnvironmentVariable getTsStoragePort();
-
   StringEnvironmentVariable getTsStorageToken();
-
   StringEnvironmentVariable getTsStorageOrg();
-
   StringEnvironmentVariable getTsStorageBucket();
 
   IntEnvironmentVariable getIotDbSessionPoolSize();
-
   BooleanEnvironmentVariable getIotDbSessionEnableCompression();
-
   StringEnvironmentVariable getIotDbUser();
-
   StringEnvironmentVariable getIotDbPassword();
 
   // CouchDB env variables
 
   StringEnvironmentVariable getCouchDbProtocol();
-
   StringEnvironmentVariable getCouchDbHost();
-
   IntEnvironmentVariable getCouchDbPort();
-
   StringEnvironmentVariable getCouchDbUsername();
-
   StringEnvironmentVariable getCouchDbPassword();
 
 
   // JWT & Authentication
 
   StringEnvironmentVariable getClientUser();
-
   StringEnvironmentVariable getClientSecret();
 
   StringEnvironmentVariable getJwtSecret();
-
   StringEnvironmentVariable getJwtPublicKeyLoc();
-
   StringEnvironmentVariable getJwtPrivateKeyLoc();
-
   StringEnvironmentVariable getJwtSigningMode();
 
   StringEnvironmentVariable getExtensionsAuthMode();
-
   StringEnvironmentVariable getEncryptionPasscode();
 
   BooleanEnvironmentVariable getOAuthEnabled();
-
   StringEnvironmentVariable getOAuthRedirectUri();
-
   List<OAuthConfiguration> getOAuthConfigurations();
 
   // Messaging
   StringEnvironmentVariable getKafkaRetentionTimeMs();
-
   StringEnvironmentVariable getPrioritizedProtocol();
 
 
@@ -164,14 +141,18 @@ public interface Environment {
   StringEnvironmentVariable getAllowedUploadFiletypes();
 
   StringEnvironmentVariable getOpcUaSecurityDir();
-
   StringEnvironmentVariable getOpcUaKeystoreFile();
-
   StringEnvironmentVariable getOpcUaKeystorePassword();
-
   StringEnvironmentVariable getOpcUaApplicationUri();
-
-  StringEnvironmentVariable getOPcUaKeystoreType();
-
+  StringEnvironmentVariable getOpcUaKeystoreType();
   StringEnvironmentVariable getOpcUaKeystoreAlias();
+
+  StringEnvironmentVariable getKeystoreFilename();
+  StringEnvironmentVariable getKeystorePassword();
+  StringEnvironmentVariable getKeystoreType();
+  StringEnvironmentVariable getKeyPassword();
+  StringEnvironmentVariable getTruststoreFilename();
+  StringEnvironmentVariable getTruststorePassword();
+  StringEnvironmentVariable getTruststoreType();
+  BooleanEnvironmentVariable getAllowSelfSignedCertificates();
 }
diff --git a/streampipes-extensions-management/pom.xml 
b/streampipes-extensions-management/pom.xml
index 3c0655c595..9c0427023b 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -94,6 +94,11 @@
             <groupId>com.opencsv</groupId>
             <artifactId>opencsv</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.11.4</version>
+        </dependency>
 
         <!-- Test dependencies -->
         <dependency>
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
index 2ad113e8a7..b3ed8c6fd1 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
@@ -23,11 +23,10 @@ import 
org.apache.streampipes.extensions.api.connect.IEventCollector;
 import org.apache.streampipes.extensions.api.connect.IParser;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 
-import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
+import java.io.ByteArrayInputStream;
 
 public record BrokerEventProcessor(
     IParser parser,
@@ -39,7 +38,7 @@ public record BrokerEventProcessor(
   @Override
   public void onEvent(byte[] payload) {
     try {
-      parser.parse(IOUtils.toInputStream(new String(payload), 
StandardCharsets.UTF_8), collector::collect);
+      parser.parse(new ByteArrayInputStream(payload), collector::collect);
     } catch (ParseException e) {
       LOG.error("Error while parsing: " + e.getMessage());
     }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
new file mode 100644
index 0000000000..32bf26f489
--- /dev/null
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.connect.adapter.parser;
+
+import org.apache.streampipes.commons.exceptions.connect.ParseException;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IParserEventHandler;
+import org.apache.streampipes.model.connect.grounding.ParserDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Options;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AvroParser implements IParser {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class);
+
+  public static final String ID = 
"org.apache.streampipes.extensions.management.connect.adapter.parser.avro";
+  public static final String LABEL = "Avro";
+  public static final String DESCRIPTION = "Can be used to read avro records";
+
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_REGISTRY = "schemaRegistry";
+  public static final String FLATTEN_RECORDS = "flattenRecord";
+
+  private final ParserUtils parserUtils;
+  private DatumReader<GenericRecord> datumReader;
+  private boolean schemaRegistry;
+  private boolean flattenRecord;
+
+
+  public AvroParser() {
+    parserUtils = new ParserUtils();
+  }
+
+  public AvroParser(String schemaString, boolean schemaRegistry, boolean 
flattenRecord) {
+    this();
+    Schema schema = new Schema.Parser().parse(schemaString);
+    this.datumReader = new GenericDatumReader<>(schema);
+    this.schemaRegistry = schemaRegistry;
+    this.flattenRecord = flattenRecord;
+  }
+
+  @Override
+  public ParserDescription declareDescription() {
+    return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION)
+      .requiredSingleValueSelection(
+          Labels.from(SCHEMA_REGISTRY, "Schema Registry",
+                  "Does the messages include the schema registry header?"),
+          Options.from("yes", "no")
+      )
+      .requiredSingleValueSelection(
+          Labels.from(FLATTEN_RECORDS, "Flatten Records",
+                  "Should nested records be flattened?"),
+          Options.from("no", "yes")
+      )
+      .requiredCodeblock(Labels.from(SCHEMA, "Schema",
+  "The schema of the avro record"),
+  "{\n"
+          + " \"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"Test\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"id\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"value\", \"type\": \"double\"}\n"
+          + " ]\n"
+          + "}")
+      .build();
+  }
+
+
+  @Override
+  public GuessSchema getGuessSchema(InputStream inputStream) throws 
ParseException {
+    GenericRecord avroRecord = getRecord(inputStream);
+    var event = toMap(avroRecord);
+    return parserUtils.getGuessSchema(event);
+  }
+
+  @Override
+  public void parse(InputStream inputStream, IParserEventHandler handler) 
throws ParseException {
+    GenericRecord avroRecord = getRecord(inputStream);
+    var event = toMap(avroRecord);
+    handler.handle(event);
+  }
+
+  @Override
+  public IParser fromDescription(List<StaticProperty> configuration) {
+    var extractor = StaticPropertyExtractor.from(configuration);
+    String schema = extractor.codeblockValue(SCHEMA);
+    boolean schemaRegistry = extractor
+        .selectedSingleValue(SCHEMA_REGISTRY, String.class)
+        .equals("yes");
+    boolean flattenRecords = extractor
+        .selectedSingleValue(FLATTEN_RECORDS, String.class)
+        .equals("yes");
+
+    return new AvroParser(schema, schemaRegistry, flattenRecords);
+  }
+
+  private GenericRecord getRecord(InputStream inputStream) throws 
ParseException {
+    try {
+      if (schemaRegistry) {
+        inputStream.skipNBytes(5);
+      }
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+      return datumReader.read(null, decoder);
+    } catch (IOException e) {
+      throw new ParseException(
+          "Error decoding the avro message. Please check the schema."
+      );
+    }
+  }
+
+
+  private Map<String, Object> toMap(GenericRecord avroRecord) {
+    Map<String, Object> resultMap = new LinkedHashMap<>();
+    avroRecord.getSchema().getFields().forEach(field -> {
+      String fieldName = field.name();
+      Object fieldValue = avroRecord.get(fieldName);
+      if (flattenRecord && fieldValue instanceof GenericRecord){
+        Map<String, Object> flatMap = unwrapNestedRecord((GenericRecord) 
fieldValue, fieldName);
+        resultMap.putAll(flatMap);
+      } else {
+        resultMap.put(fieldName, toMapHelper(fieldValue));
+      }
+    });
+
+    return resultMap;
+  }
+
+  private Object toMapHelper(Object fieldValue) {
+    if (fieldValue instanceof GenericRecord){
+      return toMap((GenericRecord) fieldValue);
+    }
+    if (fieldValue instanceof GenericData.Array<?>){
+      List<Object> valueList = new ArrayList<>();
+      ((GenericData.Array) fieldValue).forEach(value -> 
valueList.add(toMapHelper(value)));
+      return valueList;
+    }
+    if (fieldValue instanceof Map<?, ?>){
+      Map<Object, Object> valueMap = new LinkedHashMap<>();
+      ((Map<Object, Object>) fieldValue).forEach((key, value1) -> 
valueMap.put(convertUTF8(key), toMapHelper(value1)));
+      return valueMap;
+    }
+    return convertUTF8(fieldValue);
+  }
+
+  private Map<String, Object> unwrapNestedRecord(GenericRecord nestedRecord, 
String prefix) {
+    Map<String, Object> flatMap = new HashMap<>();
+
+    nestedRecord.getSchema().getFields().forEach(field -> {
+      String fieldName = field.name();
+      Object fieldValue = nestedRecord.get(fieldName);
+      String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName;
+      if (fieldValue instanceof GenericRecord) {
+        flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey));
+      } else {
+        flatMap.put(newKey, toMapHelper(fieldValue));
+      }
+    });
+
+    return flatMap;
+  }
+
+  private Object convertUTF8(Object fieldValue) {
+    if (fieldValue instanceof Utf8){
+      return fieldValue.toString();
+    }
+    return fieldValue;
+  }
+
+}
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
index 3c8a0110da..1dc2937792 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
@@ -30,7 +30,8 @@ public class Parsers {
         new JsonParsers(),
         new CsvParser(),
         new XmlParser(),
-        new ImageParser()
+        new ImageParser(),
+        new AvroParser()
     );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
index 351bef6996..c79c075576 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
@@ -23,9 +23,10 @@ import 
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
+import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
 import 
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
 
-import java.util.Collections;
 import java.util.List;
 
 public class KafkaConnectorsModuleExport implements IExtensionModuleExport {
@@ -45,6 +46,9 @@ public class KafkaConnectorsModuleExport implements 
IExtensionModuleExport {
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return Collections.emptyList();
+    return List.of(
+        new KafkaAdapterMigrationV1(),
+        new KafkaSinkMigrationV1()
+    );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index f2e0a76918..490df5a089 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -31,12 +31,12 @@ import 
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
 import 
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import 
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
 import 
org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -67,13 +67,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig {
 
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaProtocol.class);
-  KafkaConfig config;
+  KafkaAdapterConfig config;
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
 
@@ -84,20 +83,18 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
   }
 
   private void applyConfiguration(IStaticPropertyExtractor extractor) {
-    this.config = KafkaConnectUtils.getConfig(extractor, true);
+    this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, 
true);
   }
 
-  private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) 
throws KafkaException {
+  private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig 
kafkaConfig) throws KafkaException {
     final Properties props = new Properties();
 
-    kafkaConfig.getSecurityConfig().appendConfig(props);
-    kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
+    kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));
 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
 
-    props.put(ConsumerConfig.GROUP_ID_CONFIG,
-        "KafkaExampleConsumer" + System.currentTimeMillis());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
 
     props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
 
@@ -114,61 +111,67 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
                                              IStaticPropertyExtractor 
extractor)
       throws SpConfigurationException {
     RuntimeResolvableOneOfStaticProperty config = extractor
-        .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, 
RuntimeResolvableOneOfStaticProperty.class);
-    KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
-    boolean hideInternalTopics = 
extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
+        .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, 
RuntimeResolvableOneOfStaticProperty.class);
+    var kafkaConfig = new 
KafkaConfigExtractor().extractAdapterConfig(extractor, false);
+    boolean hideInternalTopics = 
extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());
 
     try {
-      Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
-      Set<String> topics = consumer.listTopics().keySet();
+      var consumer = createConsumer(kafkaConfig);
+      List<String> topics = new 
ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
       consumer.close();
 
       if (hideInternalTopics) {
         topics = topics
             .stream()
-            .filter(t -> 
!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
-            .collect(Collectors.toSet());
+            .filter(t -> 
(!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
+                && 
!t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
+            .toList();
       }
 
       
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
 
       return config;
     } catch (KafkaException e) {
-      throw new SpConfigurationException(e.getMessage(), e);
+      var message = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
+      throw new SpConfigurationException(message, e);
     }
   }
 
   @Override
   public IAdapterConfiguration declareConfig() {
 
-    StaticPropertyAlternative latestAlternative = 
KafkaConnectUtils.getAlternativesLatest();
+    StaticPropertyAlternative latestAlternative = 
KafkaConfigProvider.getAlternativesLatest();
     latestAlternative.setSelected(true);
 
     return AdapterConfigurationBuilder
-        .create(ID, 0, KafkaProtocol::new)
+        .create(ID, 1, KafkaProtocol::new)
         .withSupportedParsers(Parsers.defaultParsers())
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
 
-        .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
-            KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
-            KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
-            KafkaConnectUtils.getAlternativesSaslPlain(),
-            KafkaConnectUtils.getAlternativesSaslSSL())
+        .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
+            KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+            KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+            KafkaConfigProvider.getAlternativesSaslPlain(),
+            KafkaConfigProvider.getAlternativesSaslSSL())
 
-        .requiredTextParameter(KafkaConnectUtils.getHostLabel())
-        .requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
+        .requiredTextParameter(KafkaConfigProvider.getHostLabel())
+        .requiredIntegerParameter(KafkaConfigProvider.getPortLabel())
 
-        .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), 
true)
+        .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
+            KafkaConfigProvider.getAlternativesRandomGroupId(),
+            KafkaConfigProvider.getAlternativesGroupId())
 
-        
.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), 
Arrays.asList(
-            KafkaConnectUtils.HOST_KEY,
-            KafkaConnectUtils.PORT_KEY))
-        
.requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
-                KafkaConnectUtils.getAlternativesEarliest(),
-                latestAlternative,
-                KafkaConnectUtils.getAlternativesNone())
+        .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), 
true)
+
+        
.requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), 
Arrays.asList(
+            KafkaConfigProvider.HOST_KEY,
+            KafkaConfigProvider.PORT_KEY))
+        
.requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
+            KafkaConfigProvider.getAlternativesEarliest(),
+            latestAlternative,
+            KafkaConfigProvider.getAlternativesNone())
         .buildConfiguration();
   }
 
@@ -182,17 +185,11 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     protocol.setBrokerHostname(config.getKafkaHost());
     protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
 
-    List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2);
-    kafkaConfigAppenderList.add(this.config.getSecurityConfig());
-    kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());
-
     this.kafkaConsumer = new SpKafkaConsumer(protocol,
         config.getTopic(),
-        new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
-          collector.collect(event);
-        }),
-        kafkaConfigAppenderList
-        );
+        new BrokerEventProcessor(extractor.selectedParser(), collector),
+        config.getConfigAppenders()
+    );
 
     thread = new Thread(this.kafkaConsumer);
     thread.start();
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
new file mode 100644
index 0000000000..59a860d050
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import 
org.apache.streampipes.extensions.management.connect.adapter.parser.AvroParser;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaAdapterMigrationV1 implements IAdapterMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        KafkaProtocol.ID,
+        SpServiceTagPrefix.ADAPTER,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+                                                     IStaticPropertyExtractor 
extractor) throws RuntimeException {
+
+    migrateSecurity((StaticPropertyAlternatives) element.getConfig().get(0));
+    migrateAvro((StaticPropertyAlternatives) element.getConfig().get(6));
+    element.getConfig().add(3, makeConsumerGroup());
+    return MigrationResult.success(element);
+  }
+
+  public void migrateSecurity(StaticPropertyAlternatives securityAlternatives) 
{
+    migrateGroup(securityAlternatives.getAlternatives().get(2));
+    migrateGroup(securityAlternatives.getAlternatives().get(3));
+  }
+
+  public void migrateAvro(StaticPropertyAlternatives formatAlternatives) {
+    var parser = new AvroParser();
+    var avroParserDescription = new StaticPropertyAlternative(
+        parser.declareDescription().getName(),
+        parser.declareDescription().getName(),
+        parser.declareDescription().getDescription());
+
+    
avroParserDescription.setStaticProperty(parser.declareDescription().getConfig());
+    formatAlternatives.getAlternatives().add(
+        avroParserDescription
+    );
+  }
+
+  private StaticPropertyAlternatives makeConsumerGroup() {
+    var consumerGroupAlternatives = StaticProperties.alternatives(
+        KafkaConfigProvider.getConsumerGroupLabel(),
+        KafkaConfigProvider.getAlternativesRandomGroupId(),
+        KafkaConfigProvider.getAlternativesGroupId()
+    );
+    consumerGroupAlternatives.getAlternatives().get(0).setSelected(true);
+    return consumerGroupAlternatives;
+  }
+
+  private void migrateGroup(StaticPropertyAlternative alternative) {
+    boolean selected = alternative.getSelected();
+    var securityMechanism = StaticProperties.singleValueSelection(
+        Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+        KafkaConfigProvider.makeSecurityMechanism());
+    securityMechanism.getOptions().get(0).setSelected(selected);
+    ((StaticPropertyGroup) 
alternative.getStaticProperty()).setHorizontalRendering(false);
+    ((StaticPropertyGroup) 
alternative.getStaticProperty()).getStaticProperties().add(
+        0,
+        securityMechanism
+    );
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
new file mode 100644
index 0000000000..d1e1d581a0
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import 
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+public class KafkaSinkMigrationV1 implements IDataSinkMigrator {
+
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        KafkaPublishSink.ID,
+        SpServiceTagPrefix.DATA_SINK,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+
+    new KafkaAdapterMigrationV1().migrateSecurity(
+        (StaticPropertyAlternatives) element.getStaticProperties().get(3));
+
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
similarity index 74%
rename from 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
rename to 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
index 235a6eda39..e24dccf040 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
@@ -16,14 +16,17 @@
  *
  */
 
-package org.apache.streampipes.messaging.kafka.security;
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
 
-import java.util.Properties;
+public class KafkaAdapterConfig extends KafkaBaseConfig {
 
-public class KafkaSecurityUnauthenticatedPlainConfig extends 
KafkaSecurityConfig {
+  private String groupId;
 
-  @Override
-  public void appendConfig(Properties props) {
+  public String getGroupId() {
+    return groupId;
+  }
 
+  public void setGroupId(String groupId) {
+    this.groupId = groupId;
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
similarity index 55%
rename from 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
rename to 
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
index 770cfc0561..dd43893e89 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
@@ -18,28 +18,20 @@
 
 package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
 
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 
-public class KafkaConfig {
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaBaseConfig {
 
   private String kafkaHost;
   private Integer kafkaPort;
   private String topic;
+  private List<KafkaConfigAppender> configAppenders;
 
-  KafkaSecurityConfig securityConfig;
-  AutoOffsetResetConfig autoOffsetResetConfig;
-
-  public KafkaConfig(String kafkaHost,
-                     Integer kafkaPort,
-                     String topic,
-                     KafkaSecurityConfig securityConfig,
-                     AutoOffsetResetConfig autoOffsetResetConfig) {
-    this.kafkaHost = kafkaHost;
-    this.kafkaPort = kafkaPort;
-    this.topic = topic;
-    this.securityConfig = securityConfig;
-    this.autoOffsetResetConfig = autoOffsetResetConfig;
+  public KafkaBaseConfig() {
+    this.configAppenders = new ArrayList<>();
   }
 
   public String getKafkaHost() {
@@ -66,19 +58,11 @@ public class KafkaConfig {
     this.topic = topic;
   }
 
-  public KafkaSecurityConfig getSecurityConfig() {
-    return securityConfig;
-  }
-
-  public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
-    this.securityConfig = securityConfig;
-  }
-
-  public AutoOffsetResetConfig getAutoOffsetResetConfig() {
-    return autoOffsetResetConfig;
+  public List<KafkaConfigAppender> getConfigAppenders() {
+    return configAppenders;
   }
 
-  public void setAutoOffsetResetConfig(AutoOffsetResetConfig 
autoOffsetResetConfig) {
-    this.autoOffsetResetConfig = autoOffsetResetConfig;
+  public void setConfigAppenders(List<KafkaConfigAppender> configAppenders) {
+    this.configAppenders = configAppenders;
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
new file mode 100644
index 0000000000..d06e544e74
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
+import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.ArrayList;
+
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY;
+
+public class KafkaConfigExtractor {
+
+  public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor 
extractor,
+                                                 boolean containsTopic) {
+
+    var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
+
+    var topic = "";
+    if (containsTopic) {
+      topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
+    }
+    config.setTopic(topic);
+
+    if 
(extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID))
 {
+      config.setGroupId("StreamPipesKafkaConsumer" + 
System.currentTimeMillis());
+    } else {
+      config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, 
String.class));
+    }
+
+    StaticPropertyAlternatives alternatives = 
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
+        StaticPropertyAlternatives.class);
+
+    // Set default value if no value is provided.
+    if (alternatives == null) {
+      config.getConfigAppenders().add(new 
AutoOffsetResetConfig(KafkaConfigProvider.LATEST));
+    } else {
+      String auto = 
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
+      config.getConfigAppenders().add(new AutoOffsetResetConfig(auto));
+    }
+    return config;
+  }
+
+  public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) {
+    var config = extractCommonConfigs(extractor, new KafkaBaseConfig());
+    config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class));
+
+    return config;
+  }
+
+  private <T extends KafkaBaseConfig> T 
extractCommonConfigs(IParameterExtractor extractor,
+                                                             T config) {
+    var configAppenders = new ArrayList<KafkaConfigAppender>();
+    var env = Environments.getEnvironment();
+    config.setKafkaHost(extractor.singleValueParameter(HOST_KEY, 
String.class));
+    config.setKafkaPort(extractor.singleValueParameter(PORT_KEY, 
Integer.class));
+
+    var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+    var securityProtocol = getSecurityProtocol(authentication);
+    configAppenders.add(new 
KafkaSecurityProtocolConfigAppender(securityProtocol, env));
+
+    // check if SASL authentication is defined
+    if (isSaslSecurityMechanism(securityProtocol)) {
+      String username = extractor.singleValueParameter(USERNAME_KEY, 
String.class);
+      String password = extractor.secretValue(PASSWORD_KEY);
+      String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM, 
String.class);
+
+      configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, 
username, password));
+    }
+    config.setConfigAppenders(configAppenders);
+
+    return config;
+  }
+
+  private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) {
+    return SecurityProtocol.SASL_PLAINTEXT == securityProtocol || 
SecurityProtocol.SASL_SSL == securityProtocol;
+  }
+
+  public SecurityProtocol getSecurityProtocol(String 
selectedSecurityConfiguration) {
+    return switch (selectedSecurityConfiguration) {
+      case "unauthenticated-ssl" -> SecurityProtocol.SSL;
+      case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT;
+      case "sasl-ssl" -> SecurityProtocol.SASL_SSL;
+      default -> SecurityProtocol.PLAINTEXT;
+    };
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
new file mode 100644
index 0000000000..05f652f780
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.List;
+
+public class KafkaConfigProvider {
+
+  public static final String TOPIC_KEY = "topic";
+  public static final String HOST_KEY = "host";
+  public static final String PORT_KEY = "port";
+
+  public static final String ACCESS_MODE = "access-mode";
+  public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+  public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+  public static final String SASL_PLAIN = "sasl-plain";
+  public static final String SASL_SSL = "sasl-ssl";
+
+  public static final String SECURITY_MECHANISM = "security-mechanism";
+  public static final String USERNAME_GROUP = "username-group";
+  public static final String USERNAME_KEY = "username";
+  public static final String PASSWORD_KEY = "password";
+
+  public static final String CONSUMER_GROUP = "consumer-group";
+  public static final String RANDOM_GROUP_ID = "random-group-id";
+  public static final String GROUP_ID = "group-id";
+  public static final String GROUP_ID_INPUT = "group-id-input";
+
+
+  private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
+
+  public static final String AUTO_OFFSET_RESET_CONFIG = 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+  public static final String EARLIEST = "earliest";
+  public static final String LATEST = "latest";
+  public static final String NONE = "none";
+
+  public static Label getTopicLabel() {
+    return Labels.withId(TOPIC_KEY);
+  }
+
+  public static Label getHideInternalTopicsLabel() {
+    return Labels.withId(HIDE_INTERNAL_TOPICS);
+  }
+
+  public static String getHideInternalTopicsKey() {
+    return HIDE_INTERNAL_TOPICS;
+  }
+
+  public static Label getHostLabel() {
+    return Labels.withId(HOST_KEY);
+  }
+
+  public static Label getPortLabel() {
+    return Labels.withId(PORT_KEY);
+  }
+
+  public static Label getAccessModeLabel() {
+    return Labels.withId(ACCESS_MODE);
+  }
+
+  public static Label getConsumerGroupLabel() {
+    return Labels.withId(CONSUMER_GROUP);
+  }
+
+  public static Label getAutoOffsetResetConfigLabel() {
+    return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
+  }
+
+  public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() 
{
+    return 
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN));
+  }
+
+  public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
+    return 
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL));
+  }
+
+  public static StaticPropertyAlternative getAlternativesSaslPlain() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN),
+        makeAuthenticationGroup()
+    );
+  }
+
+  public static StaticPropertyAlternative getAlternativesSaslSSL() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL),
+        makeAuthenticationGroup());
+  }
+
+  public static StaticPropertyAlternative getAlternativesRandomGroupId() {
+    return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
+  }
+
+  public static StaticPropertyAlternative getAlternativesGroupId() {
+    return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID),
+        
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT)));
+  }
+
+  public static StaticPropertyAlternative getAlternativesLatest() {
+    return Alternatives.from(Labels.withId(LATEST));
+  }
+
+  public static StaticPropertyAlternative getAlternativesEarliest() {
+    return Alternatives.from(Labels.withId(EARLIEST));
+  }
+
+  public static StaticPropertyAlternative getAlternativesNone() {
+    return Alternatives.from(Labels.withId(NONE));
+  }
+
+  private static StaticPropertyGroup makeAuthenticationGroup() {
+    var group = 
StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP),
+        StaticProperties.singleValueSelection(
+            Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+            makeSecurityMechanism()),
+        
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)),
+        
StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY)));
+    group.setHorizontalRendering(false);
+    return group;
+  }
+
+  public static List<Option> makeSecurityMechanism() {
+    return List.of(
+        new Option("PLAIN"),
+        new Option("SCRAM-SHA-256"),
+        new Option("SCRAM-SHA-512")
+    );
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
deleted file mode 100644
index f56a230cbb..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-
-import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Label;
-import org.apache.streampipes.sdk.helpers.Labels;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-public class KafkaConnectUtils {
-
-  public static final String TOPIC_KEY = "topic";
-  public static final String HOST_KEY = "host";
-  public static final String PORT_KEY = "port";
-
-  public static final String KEY_SERIALIZATION = "key-serialization";
-  public static final String VALUE_SERIALIZATION = "value-serialization";
-
-  public static final String KEY_DESERIALIZATION = "key-deserialization";
-  public static final String VALUE_DESERIALIZATION = "value-deserialization";
-
-  public static final String ACCESS_MODE = "access-mode";
-  public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
-  public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
-  public static final String SASL_PLAIN = "sasl-plain";
-  public static final String SASL_SSL = "sasl-ssl";
-
-  public static final String USERNAME_GROUP = "username-group";
-  public static final String USERNAME_KEY = "username";
-  public static final String PASSWORD_KEY = "password";
-
-
-  private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
-
-  public static final String AUTO_OFFSET_RESET_CONFIG = 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-  public static final String EARLIEST = "earliest";
-  public static final String LATEST = "latest";
-  public static final String NONE = "none";
-
-  public static Label getTopicLabel() {
-    return Labels.withId(TOPIC_KEY);
-  }
-
-  public static Label getHideInternalTopicsLabel() {
-    return Labels.withId(HIDE_INTERNAL_TOPICS);
-  }
-
-  public static String getHideInternalTopicsKey() {
-    return HIDE_INTERNAL_TOPICS;
-  }
-
-  public static Label getHostLabel() {
-    return Labels.withId(HOST_KEY);
-  }
-
-  public static Label getPortLabel() {
-    return Labels.withId(PORT_KEY);
-  }
-
-  public static Label getAccessModeLabel() {
-    return Labels.withId(ACCESS_MODE);
-  }
-
-  public static Label getAutoOffsetResetConfigLabel() {
-    return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
-  }
-
-
-  public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, 
boolean containsTopic) {
-    String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
-    String topic = "";
-    if (containsTopic) {
-      topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
-    }
-
-    Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-
-    String authentication = 
extractor.selectedAlternativeInternalId(ACCESS_MODE);
-    boolean isUseSSL = isUseSSL(authentication);
-
-    KafkaSecurityConfig securityConfig;
-
-    //KafkaSerializerConfig serializerConfig = new 
KafkaSerializerByteArrayConfig()
-
-    // check if a user for the authentication is defined
-    if (authentication.equals(KafkaConnectUtils.SASL_SSL) || 
authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
-      String username = extractor.singleValueParameter(USERNAME_KEY, 
String.class);
-      String password = extractor.secretValue(PASSWORD_KEY);
-
-      securityConfig = isUseSSL
-          ? new KafkaSecuritySaslSSLConfig(username, password) :
-          new KafkaSecuritySaslPlainConfig(username, password);
-    } else {
-      // set security config for none authenticated access
-      securityConfig = isUseSSL
-          ? new KafkaSecurityUnauthenticatedSSLConfig() :
-          new KafkaSecurityUnauthenticatedPlainConfig();
-    }
-
-    StaticPropertyAlternatives alternatives = 
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
-            StaticPropertyAlternatives.class);
-
-    // Set default value if no value is provided.
-    if (alternatives == null) {
-      AutoOffsetResetConfig autoOffsetResetConfig = new 
AutoOffsetResetConfig(KafkaConnectUtils.LATEST);
-
-      return new KafkaConfig(brokerUrl, port, topic, securityConfig, 
autoOffsetResetConfig);
-    } else {
-      String auto = 
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
-      AutoOffsetResetConfig autoOffsetResetConfig = new 
AutoOffsetResetConfig(auto);
-
-      return new KafkaConfig(brokerUrl, port, topic, securityConfig, 
autoOffsetResetConfig);
-    }
-  }
-
-  private static boolean isUseSSL(String authentication) {
-    if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)
-        || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-
-  public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() 
{
-    return 
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_PLAIN));
-  }
-
-  public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
-    return 
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_SSL));
-  }
-
-  public static StaticPropertyAlternative getAlternativesSaslPlain() {
-    return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_PLAIN),
-        StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-            
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-            
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
-  }
-
-  public static StaticPropertyAlternative getAlternativesSaslSSL() {
-    return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_SSL),
-        StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-            
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-            
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
-  }
-
-
-  public static StaticPropertyAlternative getAlternativesLatest() {
-    return Alternatives.from(Labels.withId(LATEST));
-  }
-
-  public static StaticPropertyAlternative getAlternativesEarliest() {
-    return Alternatives.from(Labels.withId(EARLIEST));
-  }
-
-  public static StaticPropertyAlternative getAlternativesNone() {
-    return Alternatives.from(Labels.withId(NONE));
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
deleted file mode 100644
index b1e439547f..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.sink;
-
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
-
-public class KafkaParameters {
-
-  private final String kafkaHost;
-
-  private final Integer kafkaPort;
-
-  private final String topic;
-
-  private final String authentication;
-
-  private String username;
-
-  private String password;
-
-  private final boolean useSSL;
-
-  public KafkaParameters(IDataSinkParameters params) {
-    var extractor = params.extractor();
-    this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, 
String.class);
-    this.kafkaHost = 
extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
-    this.kafkaPort = 
extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
-    this.authentication = 
extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
-
-    if (!useAuthentication()) {
-      this.useSSL = 
KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication);
-    } else {
-      String username = 
extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
-      String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
-      this.username = username;
-      this.password = password;
-      this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication);
-    }
-  }
-
-  public String getKafkaHost() {
-    return kafkaHost;
-  }
-
-  public Integer getKafkaPort() {
-    return kafkaPort;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public String getUsername() {
-    return username;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public String getAuthentication() {
-    return authentication;
-  }
-
-  public boolean isUseSSL() {
-    return useSSL;
-  }
-
-  public boolean useAuthentication() {
-    return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication)
-        || KafkaConnectUtils.SASL_SSL.equals(this.authentication);
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
index 238bbb8048..61460413b3 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
@@ -24,13 +24,9 @@ import 
org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
@@ -41,17 +37,20 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 
-import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class KafkaPublishSink implements IStreamPipesDataSink {
 
+  public static final String ID = 
"org.apache.streampipes.sinks.brokers.jvm.kafka";
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPublishSink.class);
+
   private SpKafkaProducer producer;
 
   private JsonDataFormatDefinition dataFormatDefinition;
 
-  private KafkaParameters params;
-
   public KafkaPublishSink() {
   }
 
@@ -59,7 +58,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink 
{
   public IDataSinkConfiguration declareConfig() {
     return DataSinkConfiguration.create(
         KafkaPublishSink::new,
-        
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka", 0)
+        DataSinkBuilder.create(ID, 1)
             .category(DataSinkType.MESSAGING)
             .withLocales(Locales.EN)
             .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
@@ -68,15 +67,15 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
                 .requiredProperty(EpRequirements.anyProperty())
                 .build())
 
-            .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), 
false, false)
-            .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), 
false, false)
-            
.requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+            
.requiredTextParameter(Labels.withId(KafkaConfigProvider.TOPIC_KEY), false, 
false)
+            
.requiredTextParameter(Labels.withId(KafkaConfigProvider.HOST_KEY), false, 
false)
+            
.requiredIntegerParameter(Labels.withId(KafkaConfigProvider.PORT_KEY), 9092)
 
-            .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
-                KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
-                KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
-                KafkaConnectUtils.getAlternativesSaslPlain(),
-                KafkaConnectUtils.getAlternativesSaslSSL())
+            
.requiredAlternatives(Labels.withId(KafkaConfigProvider.ACCESS_MODE),
+                KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+                KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+                KafkaConfigProvider.getAlternativesSaslPlain(),
+                KafkaConfigProvider.getAlternativesSaslSSL())
             .build()
     );
   }
@@ -84,26 +83,13 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
   @Override
   public void onPipelineStarted(IDataSinkParameters parameters,
                                 EventSinkRuntimeContext runtimeContext) {
-    this.params = new KafkaParameters(parameters);
+    var kafkaConfig = new 
KafkaConfigExtractor().extractSinkConfig(parameters.extractor());
     this.dataFormatDefinition = new JsonDataFormatDefinition();
 
-    KafkaSecurityConfig securityConfig;
-    // check if a user for the authentication is defined
-    if (params.useAuthentication()) {
-      securityConfig = params.isUseSSL()
-          ? new KafkaSecuritySaslSSLConfig(params.getUsername(), 
params.getPassword()) :
-          new KafkaSecuritySaslPlainConfig(params.getUsername(), 
params.getPassword());
-    } else {
-      // set security config for none authenticated access
-      securityConfig = params.isUseSSL()
-          ? new KafkaSecurityUnauthenticatedSSLConfig() :
-          new KafkaSecurityUnauthenticatedPlainConfig();
-    }
-
     this.producer = new SpKafkaProducer(
-        params.getKafkaHost() + ":" + params.getKafkaPort(),
-        params.getTopic(),
-        List.of(securityConfig));
+        kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort(),
+        kafkaConfig.getTopic(),
+        kafkaConfig.getConfigAppenders());
   }
 
   @Override
@@ -112,7 +98,7 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
       Map<String, Object> rawEvent = event.getRaw();
       this.producer.publish(dataFormatDefinition.fromMap(rawEvent));
     } catch (SpRuntimeException e) {
-      e.printStackTrace();
+      LOG.error(e.getMessage(), e);
     }
   }
 
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index fd8ef54bc4..4809237f5b 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -34,23 +34,38 @@ username.description=
 password.title=Password
 password.description=
 
-access-mode.title=Access Mode
-access-mode.description=Unauthenticated or SASL/PLAIN
+access-mode.title=Security protocol
+access-mode.description=Security protocol used to communicate with brokers. 
Corresponds to Kafka Client security.protocol property
 
-unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.title=PLAINTEXT
 unauthenticated-plain.description=No authentication and plaintext
 
-unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.title=SSL
 unauthenticated-ssl.description=Using SSL with no authentication
 
-sasl-plain.title=SASL/PLAIN
-sasl-plain.description=Username and password, no encryption
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication. 
Corresponds to Kafka Client sasl.mechanism property
+
+sasl-plain.title=SASL/PLAINTEXT
+sasl-plain.description=SASL authentication, no encryption
 
 sasl-ssl.title=SASL/SSL
-sasl-ssl.description=Username and password, with ssl encryption
+sasl-ssl.description=SASL authentication, with ssl encryption
 
 username-group.title=Username and password
 
+consumer-group.title=Consumer Group
+consumer-group.description=Use random group id or insert a specific one
+
+random-group-id.title=Random group id
+random-group-id.description=StreamPipes generates a random group id
+
+group-id.title=Insert group id
+group-id.description=Insert the group id
+
+group-id-input.title=Group id
+group-id-input.description=
+
 key-deserialization.title=Key Deserializer
 key-deserialization.description=
 
@@ -70,4 +85,4 @@ latest.title=Latest
 latest.description=Offsets are initialized to the Latest
 
 none.title=None
-none.description=Consumer throws exceptions
\ No newline at end of file
+none.description=Consumer throws exceptions
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index a2e8d1b5f1..c4cef7cf4e 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -39,16 +39,19 @@ password.description=The password to authenticate with the 
broker
 access-mode.title=Access Mode
 access-mode.description=Unauthenticated or SASL/PLAIN
 
-unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.title=PLAINTEXT
 unauthenticated-plain.description=No authentication and plaintext
 
-unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.title=SSL
 unauthenticated-ssl.description=Using SSL with no authentication
 
-sasl-plain.title=SASL/PLAIN
+sasl-plain.title=SASL/PLAINTEXT
 sasl-plain.description=Username and password, no encryption
 
 sasl-ssl.title=SASL/SSL
 sasl-ssl.description=Username and password, with ssl encryption
 
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication. 
Corresponds to Kafka Client sasl.mechanism property
+
 username-group.title=Username and password
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
index 4b132d19f8..72f01b0c28 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -44,7 +44,7 @@ public class KeyStoreLoader {
 
   public KeyStoreLoader load(Environment env,
                              Path securityDir) throws Exception {
-    var keystore = 
KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault());
+    var keystore = 
KeyStore.getInstance(env.getOpcUaKeystoreType().getValueOrDefault());
     var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
     var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
     var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault();
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml 
b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index e487c1aaaa..1e92fd2d48 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -64,6 +64,11 @@
             <artifactId>streampipes-connectors-kafka</artifactId>
             <version>0.97.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.11.4</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-connectors-mqtt</artifactId>
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index 75883a244a..db830f1b7a 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -22,7 +22,7 @@ import 
org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
 import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
 import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import org.apache.streampipes.integration.containers.KafkaContainer;
 import org.apache.streampipes.integration.containers.KafkaDevContainer;
 import org.apache.streampipes.manager.template.AdapterTemplateHandler;
@@ -66,12 +66,12 @@ public class KafkaAdapterTester extends AdapterTesterBase {
     list.add(new Option(TOPIC));
     ((RuntimeResolvableOneOfStaticProperty) 
configuration.getAdapterDescription()
             .getConfig()
-            .get(4))
+            .get(5))
             .setOptions(list);
     List<Map<String, Object>> configs = new ArrayList<>();
-    configs.add(Map.of(KafkaConnectUtils.HOST_KEY, 
kafkaContainer.getBrokerHost()));
-    configs.add(Map.of(KafkaConnectUtils.PORT_KEY, 
kafkaContainer.getBrokerPort()));
-    configs.add(Map.of(KafkaConnectUtils.TOPIC_KEY, TOPIC));
+    configs.add(Map.of(KafkaConfigProvider.HOST_KEY, 
kafkaContainer.getBrokerHost()));
+    configs.add(Map.of(KafkaConfigProvider.PORT_KEY, 
kafkaContainer.getBrokerPort()));
+    configs.add(Map.of(KafkaConfigProvider.TOPIC_KEY, TOPIC));
     var template = new PipelineElementTemplate("name", "description", configs);
 
 
@@ -89,17 +89,25 @@ public class KafkaAdapterTester extends AdapterTesterBase {
         .get(0)
         .setSelected(true);
 
+    // Set consumer group to random group id
+    ((StaticPropertyAlternatives) (desc)
+        .getConfig()
+        .get(3))
+        .getAlternatives()
+        .get(0)
+        .setSelected(true);
+
     // Set AUTO_OFFSET_RESET_CONFIG configuration to Earliest option
     ((StaticPropertyAlternatives) (desc)
         .getConfig()
-        .get(5))
+        .get(6))
         .getAlternatives()
         .get(0)
         .setSelected(true);
 
     ((StaticPropertyAlternatives) (desc)
          .getConfig()
-         .get(5))
+         .get(6))
          .getAlternatives()
          .get(1)
          .setSelected(false);
@@ -107,7 +115,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
     // Set format to Json
     ((StaticPropertyAlternatives) (desc)
          .getConfig()
-         .get(6))
+         .get(7))
          .getAlternatives()
          .get(0)
          .setSelected(true);
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
index 1e53ebd225..77c5cb5afa 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
@@ -18,9 +18,11 @@
 
 package org.apache.streampipes.messaging.kafka.config;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
 import java.util.Properties;
 
 public interface KafkaConfigAppender {
 
-  void appendConfig(Properties props);
+  void appendConfig(Properties props) throws SpRuntimeException;
 }
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
deleted file mode 100644
index 646033f437..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-public abstract class KafkaSecurityConfig implements KafkaConfigAppender {
-
-}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
new file mode 100644
index 0000000000..1a1a9bd92a
--- /dev/null
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecurityProtocolConfigAppender implements 
KafkaConfigAppender {
+
+  private final SecurityProtocol securityProtocol;
+  private final Environment env;
+
+  public KafkaSecurityProtocolConfigAppender(SecurityProtocol securityProtocol,
+                                             Environment env) {
+    this.securityProtocol = securityProtocol;
+    this.env = env;
+  }
+
+  @Override
+  public void appendConfig(Properties props) {
+    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.toString());
+
+    if (isSslProtocol()) {
+      props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
env.getKeystoreType().getValueOrDefault());
+      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
env.getKeystoreFilename().getValueOrDefault());
+      props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
env.getKeystorePassword().getValueOrDefault());
+
+      if (env.getKeyPassword().exists()) {
+        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
env.getKeyPassword().getValueOrDefault());
+      }
+
+      props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
env.getTruststoreType().getValueOrDefault());
+      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
env.getTruststoreFilename().getValueOrDefault());
+
+      if (env.getTruststorePassword().exists()) {
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
env.getTruststorePassword().getValueOrDefault());
+      }
+
+      if (env.getAllowSelfSignedCertificates().getValueOrDefault()) {
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+      }
+    }
+  }
+
+  private boolean isSslProtocol() {
+    return securityProtocol == SecurityProtocol.SSL || securityProtocol == 
SecurityProtocol.SASL_SSL;
+  }
+}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
similarity index 50%
rename from 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
rename to 
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
index 2c5ef691e3..68698f1217 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
@@ -18,34 +18,56 @@
 
 package org.apache.streampipes.messaging.kafka.security;
 
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
 import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.Properties;
 
-public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {
+public class KafkaSecuritySaslConfigAppender implements KafkaConfigAppender {
 
+  private final String securityMechanism;
   private final String username;
   private final String password;
 
-  public KafkaSecuritySaslSSLConfig(String username, String password) {
+  public KafkaSecuritySaslConfigAppender(String securityMechanism,
+                                         String username,
+                                         String password) {
+    this.securityMechanism = securityMechanism;
     this.username = username;
     this.password = password;
   }
 
   @Override
-  public void appendConfig(Properties props) {
+  public void appendConfig(Properties props) throws SpRuntimeException {
+    props.put(SaslConfigs.SASL_MECHANISM, securityMechanism);
+    String saslJaasConfig = makeJaasConfig();
+
+    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+  }
 
-    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_SSL.toString());
+  private String makeJaasConfig() {
+    if (securityMechanism.equals("PLAIN")) {
+      return makeSaslPlainConfig();
+    } else {
+      return makeSaslScramConfig();
+    }
+  }
 
-    String saslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+  private String makeSaslPlainConfig() {
+    return "org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\""
         + username
         + "\" password=\""
         + password
         + "\";";
+  }
 
-    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+  private String makeSaslScramConfig() {
+    return "org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\""
+        + username
+        + "\" password=\""
+        + password
+        + "\";";
   }
 }
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
deleted file mode 100644
index a7e5fa94fc..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {
-
-  private final String username;
-  private final String password;
-
-  public KafkaSecuritySaslPlainConfig(String username, String password) {
-    this.username = username;
-    this.password = password;
-  }
-
-  @Override
-  public void appendConfig(Properties props) {
-
-    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_PLAINTEXT.toString());
-
-    String saslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
-        + username
-        + "\" password=\""
-        + password
-        + "\";";
-
-    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
-  }
-}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
deleted file mode 100644
index 8fdd02fd12..0000000000
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig 
{
-
-  @Override
-  public void appendConfig(Properties props) {
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SSL.toString());
-  }
-}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index 32cfb73044..ce66dbeed8 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.resource.management.secret.SecretProvider;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
@@ -75,6 +76,7 @@ public class RuntimeResolvableResource extends 
AbstractAdapterResource<WorkerAdm
           SpServiceUrlProvider.ADAPTER,
           
runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags()
       );
+      
SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties());
       RuntimeOptionsResponse result = 
WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest);
 
       return ok(result);
diff --git 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
index 8a3d351638..591f8b4b67 100644
--- 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
+++ 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
@@ -53,6 +53,7 @@ public class ParserDescriptionBuilder extends
         elementDescription.getName(),
         elementDescription.getDescription(),
         getStaticProperties());
+    group.setHorizontalRendering(false);
     this.elementDescription.setConfig(group);
   }
 }
diff --git a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts 
b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
index 7029ac309a..51d149369b 100644
--- a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
+++ b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
@@ -35,7 +35,7 @@ describe('Test Kafka Integration', () => {
         const sink: PipelineElementInput = PipelineElementBuilder.create(
             'kafka_publisher',
         )
-            .addInput('radio', 'access-mode-unauthenticated_plain', '')
+            .addInput('radio', 'access-mode-plaintext', '')
             .addInput('input', 'host', host)
             .addInput(
                 'input',
@@ -48,7 +48,7 @@ describe('Test Kafka Integration', () => {
         const adapter = AdapterBuilder.create('Apache_Kafka')
             .setName('Kafka4')
             .setTimestampProperty('timestamp')
-            .addProtocolInput('radio', 'access-mode-unauthenticated_plain', '')
+            .addProtocolInput('radio', 'access-mode-plaintext', '')
             .addProtocolInput('input', 'host', host)
             .addProtocolInput('input', 'port', port)
             .addProtocolInput('click', 'sp-reload', '')
diff --git 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index eb6ffcc10c..57588eb155 100644
--- 
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ 
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -18,6 +18,8 @@
 
 import { Component, OnChanges, OnInit } from '@angular/core';
 import {
+    Option,
+    RuntimeResolvableAnyStaticProperty,
     RuntimeResolvableOneOfStaticProperty,
     StaticPropertyUnion,
 } from '@streampipes/platform-services';
@@ -48,13 +50,39 @@ export class StaticRuntimeResolvableOneOfInputComponent
     }
 
     afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
-        this.staticProperty.options = staticProperty.options;
         if (
-            this.staticProperty.options &&
-            this.staticProperty.options.length > 0
+            this.staticProperty.options?.length > 0 &&
+            this.isOptionSelected()
         ) {
-            this.staticProperty.options[0].selected = true;
+            const selectedOption = this.staticProperty.options.find(
+                o => o.selected,
+            );
+            this.addSelectedOption(staticProperty, selectedOption);
+        } else {
+            if (staticProperty.options?.length > 0) {
+                staticProperty.options[0].selected = true;
+            }
         }
+        this.staticProperty.options = staticProperty.options;
+    }
+
+    isOptionSelected(): boolean {
+        return this.staticProperty.options.find(o => o.selected) !== undefined;
+    }
+
+    addSelectedOption(
+        staticProperty: RuntimeResolvableOneOfStaticProperty,
+        selectedOption: Option,
+    ): void {
+        staticProperty.options
+            .filter(o => {
+                return o.internalName !== null
+                    ? o.internalName === selectedOption.internalName
+                    : o.name === selectedOption.name;
+            })
+            .forEach(o => {
+                o.selected = true;
+            });
     }
 
     select(id) {
@@ -65,7 +93,6 @@ export class StaticRuntimeResolvableOneOfInputComponent
             option => option.elementId === id,
         ).selected = true;
         this.performValidation();
-        this.applyCompletedConfiguration(true);
     }
 
     parse(

Reply via email to