This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch enhance-kafka-adapter
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit cfb5f0ed7cd800f229b39bd7fd3d99a8ef0eaa65
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Dec 17 20:43:45 2024 +0100

    feat: Add additional properties to Kafka connectors, improve logging
---
 .../kafka/KafkaConnectorsModuleExport.java         |  6 ++-
 .../connectors/kafka/adapter/KafkaProtocol.java    | 24 ++++++----
 .../kafka/migration/KafkaAdapterMigrationV2.java   | 55 +++++++++++++++++++++
 .../kafka/migration/KafkaSinkMigrationV2.java      | 56 ++++++++++++++++++++++
 .../kafka/shared/kafka/KafkaConfigExtractor.java   | 23 +++++++++
 .../kafka/shared/kafka/KafkaConfigProvider.java    |  1 +
 .../connectors/kafka/sink/KafkaPublishSink.java    |  8 +++-
 .../strings.en                                     |  3 ++
 .../documentation.md                               |  5 +-
 .../strings.en                                     |  3 ++
 .../kafka/config/SimpleConfigAppender.java         | 38 +++++++++++++++
 .../KafkaSecurityProtocolConfigAppender.java       | 30 ++++++------
 .../template/PipelineElementTemplateVisitor.java   |  1 +
 .../connect/RuntimeResolvableResource.java         |  6 +++
 14 files changed, 232 insertions(+), 27 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
index c79c075576..d434a9ecea 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
@@ -24,7 +24,9 @@ import 
org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
 import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
+import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV2;
 import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
+import 
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV2;
 import 
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
 
 import java.util.List;
@@ -48,7 +50,9 @@ public class KafkaConnectorsModuleExport implements 
IExtensionModuleExport {
   public List<IModelMigrator<?, ?>> migrators() {
     return List.of(
         new KafkaAdapterMigrationV1(),
-        new KafkaSinkMigrationV1()
+        new KafkaSinkMigrationV1(),
+        new KafkaAdapterMigrationV2(),
+        new KafkaSinkMigrationV2()
     );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index 490df5a089..780d454e60 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -47,6 +47,7 @@ import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticP
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 
 import org.apache.kafka.clients.consumer.Consumer;
@@ -61,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -71,8 +73,8 @@ import java.util.stream.Collectors;
 
 public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(KafkaProtocol.class);
-  KafkaAdapterConfig config;
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProtocol.class);
+  private KafkaAdapterConfig config;
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
 
@@ -131,9 +133,9 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
       
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
 
       return config;
-    } catch (KafkaException e) {
+    } catch (Exception e) {
       var message = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
-      throw new SpConfigurationException(message, e);
+      throw new SpConfigurationException(message, e.getCause());
     }
   }
 
@@ -144,7 +146,7 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     latestAlternative.setSelected(true);
 
     return AdapterConfigurationBuilder
-        .create(ID, 1, KafkaProtocol::new)
+        .create(ID, 2, KafkaProtocol::new)
         .withSupportedParsers(Parsers.defaultParsers())
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
@@ -172,6 +174,10 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
             KafkaConfigProvider.getAlternativesEarliest(),
             latestAlternative,
             KafkaConfigProvider.getAlternativesNone())
+        .requiredCodeblock(
+            Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+            "# key=value, comments are ignored"
+        )
         .buildConfiguration();
   }
 
@@ -201,16 +207,16 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     try {
       kafkaConsumer.disconnect();
     } catch (SpRuntimeException e) {
-      e.printStackTrace();
+      LOG.warn("Runtime exception when disconnecting from Kafka", e);
     }
 
     try {
       Thread.sleep(5000);
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      LOG.warn("Interrupted exception when stopping thread", e);
     }
 
-    logger.info("Kafka Adapter was sucessfully stopped");
+    LOG.info("Kafka Adapter was sucessfully stopped");
     thread.interrupt();
   }
 
@@ -241,7 +247,7 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
 
     while (true) {
       final ConsumerRecords<byte[], byte[]> consumerRecords =
-          consumer.poll(1000);
+          consumer.poll(Duration.ofMillis(1000));
 
       consumerRecords.forEach(record -> nEventsByte.add(record.value()));
 
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
new file mode 100644
index 0000000000..77ca6669c0
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaAdapterMigrationV2 implements IAdapterMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        KafkaProtocol.ID,
+        SpServiceTagPrefix.ADAPTER,
+        1,
+        2
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+                                                     IStaticPropertyExtractor 
extractor) throws RuntimeException {
+    element.getConfig().add(
+        StaticProperties
+            
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+                CodeLanguage.None,
+                "# key=value, comments are ignored")
+    );
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
new file mode 100644
index 0000000000..1323f659f6
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import 
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaSinkMigrationV2 implements IDataSinkMigrator {
+
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        KafkaPublishSink.ID,
+        SpServiceTagPrefix.DATA_SINK,
+        1,
+        2
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+    element.getStaticProperties().add(
+        StaticProperties
+            
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+                CodeLanguage.None,
+                "# key=value, comments are ignored")
+    );
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
index d06e544e74..399f42a8c9 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
 import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import org.apache.streampipes.messaging.kafka.config.SimpleConfigAppender;
 import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
 import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
@@ -30,8 +31,13 @@ import 
org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ADDITIONAL_PROPERTIES;
 import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
 import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
 import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
@@ -101,6 +107,9 @@ public class KafkaConfigExtractor {
 
       configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, 
username, password));
     }
+    configAppenders.add(new SimpleConfigAppender(
+        
parseAdditionalProperties(extractor.codeblockValue(ADDITIONAL_PROPERTIES)))
+    );
     config.setConfigAppenders(configAppenders);
 
     return config;
@@ -118,4 +127,18 @@ public class KafkaConfigExtractor {
       default -> SecurityProtocol.PLAINTEXT;
     };
   }
+
+  public static Map<String, String> parseAdditionalProperties(String text) {
+    return Arrays.stream(text.split("\\R"))
+        .map(String::trim)
+        .filter(line -> !line.isEmpty() && !line.startsWith("#"))
+        .filter(line -> line.contains("="))
+        .map(line -> line.split("=", 2))
+        .collect(Collectors.toMap(
+            parts -> parts[0].trim(),
+            parts -> parts[1].trim(),
+            (existing, replacement) -> replacement,
+            LinkedHashMap::new
+        ));
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
index 05f652f780..6981696b9a 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -51,6 +51,7 @@ public class KafkaConfigProvider {
   public static final String RANDOM_GROUP_ID = "random-group-id";
   public static final String GROUP_ID = "group-id";
   public static final String GROUP_ID_INPUT = "group-id-input";
+  public static final String ADDITIONAL_PROPERTIES = "additional-properties";
 
 
   private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
index 61460413b3..9ac428a3bf 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
@@ -58,7 +59,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink 
{
   public IDataSinkConfiguration declareConfig() {
     return DataSinkConfiguration.create(
         KafkaPublishSink::new,
-        DataSinkBuilder.create(ID, 1)
+        DataSinkBuilder.create(ID, 2)
             .category(DataSinkType.MESSAGING)
             .withLocales(Locales.EN)
             .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
@@ -76,6 +77,11 @@ public class KafkaPublishSink implements 
IStreamPipesDataSink {
                 KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
                 KafkaConfigProvider.getAlternativesSaslPlain(),
                 KafkaConfigProvider.getAlternativesSaslSSL())
+            .requiredCodeblock(Labels.withId(
+                    KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+                CodeLanguage.None,
+                "# key=value, comments are ignored"
+            )
             .build()
     );
   }
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index 4809237f5b..9e2d47b5be 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -86,3 +86,6 @@ latest.description=Offsets are initialized to the Latest
 
 none.title=None
 none.description=Consumer throws exceptions
+
+additional-properties.title=Additional configurations
+additional-properties.description=Additional Kafka consumer configurations in 
the form key=value
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
index aec97f992b..14b2b66e25 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
@@ -50,6 +50,7 @@ The Kafka broker URL indicates the URL of the broker (e.g., 
localhost), the port
 The topic where events should be sent to.
 
 
-## Output
+### Additional configurations
 
-(not applicable for data sinks)
\ No newline at end of file
+Can be used to provide additional Kafka producer configurations. Input must be 
in form of key-value pairs, e.g.
+buffer.memory=33554432
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index c4cef7cf4e..3d70886857 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -55,3 +55,6 @@ security-mechanism.title=Security Mechanism
 security-mechanism.description=SASL mechanism used for authentication. 
Corresponds to Kafka Client sasl.mechanism property
 
 username-group.title=Username and password
+
+additional-properties.title=Additional configurations
+additional-properties.description=Additional Kafka producer configurations in 
the form key=value
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
new file mode 100644
index 0000000000..3b4dadadac
--- /dev/null
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.kafka.config;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class SimpleConfigAppender implements KafkaConfigAppender {
+
+  private final Map<String, String> configs;
+
+  public SimpleConfigAppender(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public void appendConfig(Properties props) throws SpRuntimeException {
+    configs.forEach(props::setProperty);
+  }
+}
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
index 1a1a9bd92a..968b056b2f 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.messaging.kafka.security;
 
 import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.variable.EnvironmentVariable;
 import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -43,20 +44,13 @@ public class KafkaSecurityProtocolConfigAppender implements 
KafkaConfigAppender
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.toString());
 
     if (isSslProtocol()) {
-      props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
env.getKeystoreType().getValueOrDefault());
-      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
env.getKeystoreFilename().getValueOrDefault());
-      props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
env.getKeystorePassword().getValueOrDefault());
-
-      if (env.getKeyPassword().exists()) {
-        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
env.getKeyPassword().getValueOrDefault());
-      }
-
-      props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
env.getTruststoreType().getValueOrDefault());
-      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
env.getTruststoreFilename().getValueOrDefault());
-
-      if (env.getTruststorePassword().exists()) {
-        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
env.getTruststorePassword().getValueOrDefault());
-      }
+      addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
env.getKeystoreType());
+      addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
env.getKeystoreFilename());
+      addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
env.getKeystorePassword());
+      addConfigIfPresent(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
env.getKeyPassword());
+      addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
env.getTruststoreType());
+      addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
env.getTruststoreFilename());
+      addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
env.getTruststorePassword());
 
       if (env.getAllowSelfSignedCertificates().getValueOrDefault()) {
         props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
@@ -67,4 +61,12 @@ public class KafkaSecurityProtocolConfigAppender implements 
KafkaConfigAppender
   private boolean isSslProtocol() {
     return securityProtocol == SecurityProtocol.SSL || securityProtocol == 
SecurityProtocol.SASL_SSL;
   }
+
+  private void addConfigIfPresent(Properties props,
+                                  String configKey,
+                                  EnvironmentVariable<?> environmentVariable) {
+    if (environmentVariable.exists()) {
+      props.put(configKey, environmentVariable.getValueOrDefault());
+    }
+  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index 7e91b9ce3e..f2121d267b 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -172,6 +172,7 @@ public class PipelineElementTemplateVisitor implements 
StaticPropertyVisitor {
     if (hasConfig(staticPropertyAlternatives)) {
       Map<String, Object> values = getConfig(staticPropertyAlternatives);
       var selectedId = getConfigValueAsString(staticPropertyAlternatives);
+      staticPropertyAlternatives.getAlternatives().forEach(a -> 
a.setSelected(false));
       staticPropertyAlternatives
           .getAlternatives()
           .stream()
diff --git 
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
index 0412560c99..057e36dffd 100644
--- 
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
+++ 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
@@ -28,6 +28,9 @@ import 
org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
@@ -41,6 +44,8 @@ import org.springframework.web.bind.annotation.RestController;
 @RequestMapping("/api/v1/worker/resolvable")
 public class RuntimeResolvableResource extends AbstractSharedRestInterface {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeResolvableResource.class);
+
   @PostMapping(
       path = "{id}/configurations",
       consumes = MediaType.APPLICATION_JSON_VALUE,
@@ -64,6 +69,7 @@ public class RuntimeResolvableResource extends 
AbstractSharedRestInterface {
             "This element does not support dynamic options - is the pipeline 
element description up to date?");
       }
     } catch (SpConfigurationException e) {
+      LOG.warn("Error when fetching runtime configurations: {}", 
e.getMessage(), e);
       return ResponseEntity
           .status(HttpStatus.BAD_REQUEST)
           .body(e);

Reply via email to