This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 88396aa1cf feat: Add additional properties to Kafka connectors,
improve logging (#3378)
88396aa1cf is described below
commit 88396aa1cfc682eca529dfc951fbf922b00b0935
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Dec 18 08:48:06 2024 +0100
feat: Add additional properties to Kafka connectors, improve logging (#3378)
* feat: Add additional properties to Kafka connectors, improve logging
* Fix checkstyle
* Fix test
---
.../kafka/KafkaConnectorsModuleExport.java | 6 ++-
.../connectors/kafka/adapter/KafkaProtocol.java | 24 ++++++----
.../kafka/migration/KafkaAdapterMigrationV2.java | 55 +++++++++++++++++++++
.../kafka/migration/KafkaSinkMigrationV2.java | 56 ++++++++++++++++++++++
.../kafka/shared/kafka/KafkaConfigExtractor.java | 27 +++++++++++
.../kafka/shared/kafka/KafkaConfigProvider.java | 1 +
.../connectors/kafka/sink/KafkaPublishSink.java | 8 +++-
.../strings.en | 3 ++
.../documentation.md | 5 +-
.../strings.en | 3 ++
.../integration/adapters/KafkaAdapterTester.java | 2 +-
.../kafka/config/SimpleConfigAppender.java | 38 +++++++++++++++
.../KafkaSecurityProtocolConfigAppender.java | 30 ++++++------
.../template/PipelineElementTemplateVisitor.java | 1 +
.../connect/RuntimeResolvableResource.java | 5 ++
15 files changed, 236 insertions(+), 28 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
index c79c075576..d434a9ecea 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
@@ -24,7 +24,9 @@ import
org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
+import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV2;
import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
+import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV2;
import
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
import java.util.List;
@@ -48,7 +50,9 @@ public class KafkaConnectorsModuleExport implements
IExtensionModuleExport {
public List<IModelMigrator<?, ?>> migrators() {
return List.of(
new KafkaAdapterMigrationV1(),
- new KafkaSinkMigrationV1()
+ new KafkaSinkMigrationV1(),
+ new KafkaAdapterMigrationV2(),
+ new KafkaSinkMigrationV2()
);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index 490df5a089..780d454e60 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -47,6 +47,7 @@ import
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticP
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.kafka.clients.consumer.Consumer;
@@ -61,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -71,8 +73,8 @@ import java.util.stream.Collectors;
public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig {
- private static final Logger logger =
LoggerFactory.getLogger(KafkaProtocol.class);
- KafkaAdapterConfig config;
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaProtocol.class);
+ private KafkaAdapterConfig config;
public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
@@ -131,9 +133,9 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
return config;
- } catch (KafkaException e) {
+ } catch (Exception e) {
var message = e.getCause() != null ? e.getCause().getMessage() :
e.getMessage();
- throw new SpConfigurationException(message, e);
+ throw new SpConfigurationException(message, e.getCause());
}
}
@@ -144,7 +146,7 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
latestAlternative.setSelected(true);
return AdapterConfigurationBuilder
- .create(ID, 1, KafkaProtocol::new)
+ .create(ID, 2, KafkaProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
@@ -172,6 +174,10 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
KafkaConfigProvider.getAlternativesEarliest(),
latestAlternative,
KafkaConfigProvider.getAlternativesNone())
+ .requiredCodeblock(
+ Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+ "# key=value, comments are ignored"
+ )
.buildConfiguration();
}
@@ -201,16 +207,16 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
try {
kafkaConsumer.disconnect();
} catch (SpRuntimeException e) {
- e.printStackTrace();
+ LOG.warn("Runtime exception when disconnecting from Kafka", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.warn("Interrupted exception when stopping thread", e);
}
- logger.info("Kafka Adapter was sucessfully stopped");
+ LOG.info("Kafka Adapter was sucessfully stopped");
thread.interrupt();
}
@@ -241,7 +247,7 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
while (true) {
final ConsumerRecords<byte[], byte[]> consumerRecords =
- consumer.poll(1000);
+ consumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> nEventsByte.add(record.value()));
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
new file mode 100644
index 0000000000..77ca6669c0
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV2.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaAdapterMigrationV2 implements IAdapterMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ KafkaProtocol.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 1,
+ 2
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor
extractor) throws RuntimeException {
+ element.getConfig().add(
+ StaticProperties
+
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+ CodeLanguage.None,
+ "# key=value, comments are ignored")
+ );
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
new file mode 100644
index 0000000000..1323f659f6
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV2.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaSinkMigrationV2 implements IDataSinkMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ KafkaPublishSink.ID,
+ SpServiceTagPrefix.DATA_SINK,
+ 1,
+ 2
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+
IDataSinkParameterExtractor extractor) throws RuntimeException {
+ element.getStaticProperties().add(
+ StaticProperties
+
.codeStaticProperty(Labels.withId(KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+ CodeLanguage.None,
+ "# key=value, comments are ignored")
+ );
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
index d06e544e74..0289fb56ca 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import org.apache.streampipes.messaging.kafka.config.SimpleConfigAppender;
import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
@@ -30,8 +31,13 @@ import
org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ADDITIONAL_PROPERTIES;
import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
@@ -101,6 +107,9 @@ public class KafkaConfigExtractor {
configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism,
username, password));
}
+ configAppenders.add(new SimpleConfigAppender(
+
parseAdditionalProperties(extractor.codeblockValue(ADDITIONAL_PROPERTIES)))
+ );
config.setConfigAppenders(configAppenders);
return config;
@@ -118,4 +127,22 @@ public class KafkaConfigExtractor {
default -> SecurityProtocol.PLAINTEXT;
};
}
+
+ public static Map<String, String> parseAdditionalProperties(String text) {
+ if (text == null || text.isEmpty()) {
+ return Map.of();
+ } else {
+ return Arrays.stream(text.split("\\R"))
+ .map(String::trim)
+ .filter(line -> !line.isEmpty() && !line.startsWith("#"))
+ .filter(line -> line.contains("="))
+ .map(line -> line.split("=", 2))
+ .collect(Collectors.toMap(
+ parts -> parts[0].trim(),
+ parts -> parts[1].trim(),
+ (existing, replacement) -> replacement,
+ LinkedHashMap::new
+ ));
+ }
+ }
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
index 05f652f780..6981696b9a 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -51,6 +51,7 @@ public class KafkaConfigProvider {
public static final String RANDOM_GROUP_ID = "random-group-id";
public static final String GROUP_ID = "group-id";
public static final String GROUP_ID_INPUT = "group-id-input";
+ public static final String ADDITIONAL_PROPERTIES = "additional-properties";
private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
index 61460413b3..9ac428a3bf 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -58,7 +59,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink
{
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
KafkaPublishSink::new,
- DataSinkBuilder.create(ID, 1)
+ DataSinkBuilder.create(ID, 2)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
@@ -76,6 +77,11 @@ public class KafkaPublishSink implements
IStreamPipesDataSink {
KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
KafkaConfigProvider.getAlternativesSaslPlain(),
KafkaConfigProvider.getAlternativesSaslSSL())
+ .requiredCodeblock(Labels.withId(
+ KafkaConfigProvider.ADDITIONAL_PROPERTIES),
+ CodeLanguage.None,
+ "# key=value, comments are ignored"
+ )
.build()
);
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index 4809237f5b..9e2d47b5be 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -86,3 +86,6 @@ latest.description=Offsets are initialized to the Latest
none.title=None
none.description=Consumer throws exceptions
+
+additional-properties.title=Additional configurations
+additional-properties.description=Additional Kafka consumer configurations in
the form key=value
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
index aec97f992b..14b2b66e25 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/documentation.md
@@ -50,6 +50,7 @@ The Kafka broker URL indicates the URL of the broker (e.g.,
localhost), the port
The topic where events should be sent to.
-## Output
+### Additional configurations
-(not applicable for data sinks)
\ No newline at end of file
+Can be used to provide additional Kafka producer configurations. Input must be
in form of key-value pairs, e.g.
+buffer.memory=33554432
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index c4cef7cf4e..3d70886857 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -55,3 +55,6 @@ security-mechanism.title=Security Mechanism
security-mechanism.description=SASL mechanism used for authentication.
Corresponds to Kafka Client sasl.mechanism property
username-group.title=Username and password
+
+additional-properties.title=Additional configurations
+additional-properties.description=Additional Kafka producer configurations in
the form key=value
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index db830f1b7a..803b2b1693 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -115,7 +115,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
// Set format to Json
((StaticPropertyAlternatives) (desc)
.getConfig()
- .get(7))
+ .get(8))
.getAlternatives()
.get(0)
.setSelected(true);
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
new file mode 100644
index 0000000000..3b4dadadac
--- /dev/null
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/SimpleConfigAppender.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.kafka.config;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class SimpleConfigAppender implements KafkaConfigAppender {
+
+ private final Map<String, String> configs;
+
+ public SimpleConfigAppender(Map<String, String> configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public void appendConfig(Properties props) throws SpRuntimeException {
+ configs.forEach(props::setProperty);
+ }
+}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
index 1a1a9bd92a..968b056b2f 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.messaging.kafka.security;
import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.variable.EnvironmentVariable;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.kafka.clients.CommonClientConfigs;
@@ -43,20 +44,13 @@ public class KafkaSecurityProtocolConfigAppender implements
KafkaConfigAppender
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
securityProtocol.toString());
if (isSslProtocol()) {
- props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
env.getKeystoreType().getValueOrDefault());
- props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
env.getKeystoreFilename().getValueOrDefault());
- props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
env.getKeystorePassword().getValueOrDefault());
-
- if (env.getKeyPassword().exists()) {
- props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
env.getKeyPassword().getValueOrDefault());
- }
-
- props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
env.getTruststoreType().getValueOrDefault());
- props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
env.getTruststoreFilename().getValueOrDefault());
-
- if (env.getTruststorePassword().exists()) {
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
env.getTruststorePassword().getValueOrDefault());
- }
+ addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
env.getKeystoreType());
+ addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
env.getKeystoreFilename());
+ addConfigIfPresent(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
env.getKeystorePassword());
+ addConfigIfPresent(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG,
env.getKeyPassword());
+ addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
env.getTruststoreType());
+ addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
env.getTruststoreFilename());
+ addConfigIfPresent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
env.getTruststorePassword());
if (env.getAllowSelfSignedCertificates().getValueOrDefault()) {
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
@@ -67,4 +61,12 @@ public class KafkaSecurityProtocolConfigAppender implements
KafkaConfigAppender
private boolean isSslProtocol() {
return securityProtocol == SecurityProtocol.SSL || securityProtocol ==
SecurityProtocol.SASL_SSL;
}
+
+ private void addConfigIfPresent(Properties props,
+ String configKey,
+ EnvironmentVariable<?> environmentVariable) {
+ if (environmentVariable.exists()) {
+ props.put(configKey, environmentVariable.getValueOrDefault());
+ }
+ }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index 7e91b9ce3e..f2121d267b 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -172,6 +172,7 @@ public class PipelineElementTemplateVisitor implements
StaticPropertyVisitor {
if (hasConfig(staticPropertyAlternatives)) {
Map<String, Object> values = getConfig(staticPropertyAlternatives);
var selectedId = getConfigValueAsString(staticPropertyAlternatives);
+ staticPropertyAlternatives.getAlternatives().forEach(a ->
a.setSelected(false));
staticPropertyAlternatives
.getAlternatives()
.stream()
diff --git
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
index 0412560c99..105596a74a 100644
---
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
+++
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/RuntimeResolvableResource.java
@@ -28,6 +28,8 @@ import
org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@@ -41,6 +43,8 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/api/v1/worker/resolvable")
public class RuntimeResolvableResource extends AbstractSharedRestInterface {
+ private static final Logger LOG =
LoggerFactory.getLogger(RuntimeResolvableResource.class);
+
@PostMapping(
path = "{id}/configurations",
consumes = MediaType.APPLICATION_JSON_VALUE,
@@ -64,6 +68,7 @@ public class RuntimeResolvableResource extends
AbstractSharedRestInterface {
"This element does not support dynamic options - is the pipeline
element description up to date?");
}
} catch (SpConfigurationException e) {
+ LOG.warn("Error when fetching runtime configurations: {}",
e.getMessage(), e);
return ResponseEntity
.status(HttpStatus.BAD_REQUEST)
.body(e);