This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 42189bde77 feat(#3355): Add new avro format parser (#3358)
42189bde77 is described below
commit 42189bde77041fe0b649b574d15cfc8ff5adff15
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Dec 3 08:32:07 2024 +0100
feat(#3355): Add new avro format parser (#3358)
* Add avro parser
* Improve avro parse exception
* Update parser description rendering
* feat(#3357): Add consumer group to Kafka adapter
* Add parsing of avro field description
* Revert parsing of avro field description
* Fix tests
* feat(#3362): Add SSL/SASL support to Kafka adapter and sink (#3364)
* feat(#3362): Add SSL/SASL support to Kafka adapter and sink
* Remove logging
* Add Kafka migrations
* Fix Kafka test
---------
Co-authored-by: Dominik Riemer <[email protected]>
---
pom.xml | 2 +-
.../apache/streampipes/commons/constants/Envs.java | 16 +-
.../constants/GlobalStreamPipesConstants.java | 1 +
.../commons/environment/DefaultEnvironment.java | 42 ++++-
.../commons/environment/Environment.java | 39 +---
streampipes-extensions-management/pom.xml | 5 +
.../connect/adapter/BrokerEventProcessor.java | 5 +-
.../connect/adapter/parser/AvroParser.java | 209 +++++++++++++++++++++
.../management/connect/adapter/parser/Parsers.java | 3 +-
.../kafka/KafkaConnectorsModuleExport.java | 8 +-
.../connectors/kafka/adapter/KafkaProtocol.java | 87 +++++----
.../kafka/migration/KafkaAdapterMigrationV1.java | 97 ++++++++++
.../kafka/migration/KafkaSinkMigrationV1.java | 51 +++++
.../kafka/shared/kafka/KafkaAdapterConfig.java | 13 +-
.../{KafkaConfig.java => KafkaBaseConfig.java} | 40 ++--
.../kafka/shared/kafka/KafkaConfigExtractor.java | 121 ++++++++++++
.../kafka/shared/kafka/KafkaConfigProvider.java | 153 +++++++++++++++
.../kafka/shared/kafka/KafkaConnectUtils.java | 187 ------------------
.../connectors/kafka/sink/KafkaParameters.java | 90 ---------
.../connectors/kafka/sink/KafkaPublishSink.java | 58 +++---
.../strings.en | 31 ++-
.../strings.en | 9 +-
.../opcua/config/security/KeyStoreLoader.java | 2 +-
.../streampipes-extensions-all-jvm/pom.xml | 5 +
.../integration/adapters/KafkaAdapterTester.java | 24 ++-
.../kafka/config/KafkaConfigAppender.java | 4 +-
.../kafka/security/KafkaSecurityConfig.java | 25 ---
.../KafkaSecurityProtocolConfigAppender.java | 70 +++++++
...g.java => KafkaSecuritySaslConfigAppender.java} | 40 +++-
.../security/KafkaSecuritySaslPlainConfig.java | 51 -----
.../KafkaSecurityUnauthenticatedSSLConfig.java | 32 ----
.../impl/connect/RuntimeResolvableResource.java | 2 +
.../builder/adapter/ParserDescriptionBuilder.java | 1 +
ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts | 4 +-
...tic-runtime-resolvable-oneof-input.component.ts | 37 +++-
35 files changed, 990 insertions(+), 574 deletions(-)
diff --git a/pom.xml b/pom.xml
index 8f0d8ffded..455675231a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
<jsrosbridge.version>0.2.0</jsrosbridge.version>
<jjwt.version>0.11.2</jjwt.version>
<jts-core.version>1.19.0</jts-core.version>
- <kafka.version>3.4.0</kafka.version>
+ <kafka.version>3.7.1</kafka.version>
<lightcouch.version>0.2.0</lightcouch.version>
<maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version>
<mailapi.version>1.4.3</mailapi.version>
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7a2d033e5a..2339475adb 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -119,7 +119,21 @@ public enum Envs {
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
- );
+ ),
+
+ // Default keystore and truststore
+ SP_SECURITY_KEYSTORE_FILENAME(
+ "SP_SECURITY_KEYSTORE_FILENAME",
+ "/streampipes-security/keystore.pfx"),
+ SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
+ SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
+ SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
+ SP_SECURITY_TRUSTSTORE_FILENAME(
+ "SP_SECURITY_TRUSTSTORE_FILENAME",
+ "/streampipes-security/truststore.pfx"),
+ SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
+ SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
+ SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
private final String envVariableName;
private String defaultValue;
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
index b452d74c1a..409a9a9090 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
@@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
public static final String STD_DOCUMENTATION_NAME = "documentation.md";
public static final String INTERNAL_TOPIC_PREFIX =
"org-apache-streampipes-internal-";
+ public static final String CONNECT_TOPIC_PREFIX =
"org.apache.streampipes.connect.";
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index bb5bbac193..3434924e3e 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -349,7 +349,7 @@ public class DefaultEnvironment implements Environment {
}
@Override
- public StringEnvironmentVariable getOPcUaKeystoreType() {
+ public StringEnvironmentVariable getOpcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}
@@ -357,4 +357,44 @@ public class DefaultEnvironment implements Environment {
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}
+
+ @Override
+ public StringEnvironmentVariable getKeystoreFilename() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeystorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeystoreType() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeyPassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststoreFilename() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststoreType() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
+ }
+
+ @Override
+ public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
+ return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cb441f009b..d1c4adf6ae 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -32,79 +32,56 @@ public interface Environment {
// Service base configuration
StringEnvironmentVariable getServiceHost();
-
IntEnvironmentVariable getServicePort();
StringEnvironmentVariable getSpCoreScheme();
StringEnvironmentVariable getSpCoreHost();
-
IntEnvironmentVariable getSpCorePort();
// Time series storage env variables
StringEnvironmentVariable getTsStorage();
-
StringEnvironmentVariable getTsStorageProtocol();
-
StringEnvironmentVariable getTsStorageHost();
-
IntEnvironmentVariable getTsStoragePort();
-
StringEnvironmentVariable getTsStorageToken();
-
StringEnvironmentVariable getTsStorageOrg();
-
StringEnvironmentVariable getTsStorageBucket();
IntEnvironmentVariable getIotDbSessionPoolSize();
-
BooleanEnvironmentVariable getIotDbSessionEnableCompression();
-
StringEnvironmentVariable getIotDbUser();
-
StringEnvironmentVariable getIotDbPassword();
// CouchDB env variables
StringEnvironmentVariable getCouchDbProtocol();
-
StringEnvironmentVariable getCouchDbHost();
-
IntEnvironmentVariable getCouchDbPort();
-
StringEnvironmentVariable getCouchDbUsername();
-
StringEnvironmentVariable getCouchDbPassword();
// JWT & Authentication
StringEnvironmentVariable getClientUser();
-
StringEnvironmentVariable getClientSecret();
StringEnvironmentVariable getJwtSecret();
-
StringEnvironmentVariable getJwtPublicKeyLoc();
-
StringEnvironmentVariable getJwtPrivateKeyLoc();
-
StringEnvironmentVariable getJwtSigningMode();
StringEnvironmentVariable getExtensionsAuthMode();
-
StringEnvironmentVariable getEncryptionPasscode();
BooleanEnvironmentVariable getOAuthEnabled();
-
StringEnvironmentVariable getOAuthRedirectUri();
-
List<OAuthConfiguration> getOAuthConfigurations();
// Messaging
StringEnvironmentVariable getKafkaRetentionTimeMs();
-
StringEnvironmentVariable getPrioritizedProtocol();
@@ -164,14 +141,18 @@ public interface Environment {
StringEnvironmentVariable getAllowedUploadFiletypes();
StringEnvironmentVariable getOpcUaSecurityDir();
-
StringEnvironmentVariable getOpcUaKeystoreFile();
-
StringEnvironmentVariable getOpcUaKeystorePassword();
-
StringEnvironmentVariable getOpcUaApplicationUri();
-
- StringEnvironmentVariable getOPcUaKeystoreType();
-
+ StringEnvironmentVariable getOpcUaKeystoreType();
StringEnvironmentVariable getOpcUaKeystoreAlias();
+
+ StringEnvironmentVariable getKeystoreFilename();
+ StringEnvironmentVariable getKeystorePassword();
+ StringEnvironmentVariable getKeystoreType();
+ StringEnvironmentVariable getKeyPassword();
+ StringEnvironmentVariable getTruststoreFilename();
+ StringEnvironmentVariable getTruststorePassword();
+ StringEnvironmentVariable getTruststoreType();
+ BooleanEnvironmentVariable getAllowSelfSignedCertificates();
}
diff --git a/streampipes-extensions-management/pom.xml
b/streampipes-extensions-management/pom.xml
index 3c0655c595..9c0427023b 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -94,6 +94,11 @@
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
index 2ad113e8a7..b3ed8c6fd1 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
@@ -23,11 +23,10 @@ import
org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.messaging.InternalEventProcessor;
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
+import java.io.ByteArrayInputStream;
public record BrokerEventProcessor(
IParser parser,
@@ -39,7 +38,7 @@ public record BrokerEventProcessor(
@Override
public void onEvent(byte[] payload) {
try {
- parser.parse(IOUtils.toInputStream(new String(payload),
StandardCharsets.UTF_8), collector::collect);
+ parser.parse(new ByteArrayInputStream(payload), collector::collect);
} catch (ParseException e) {
LOG.error("Error while parsing: " + e.getMessage());
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
new file mode 100644
index 0000000000..32bf26f489
--- /dev/null
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.connect.adapter.parser;
+
+import org.apache.streampipes.commons.exceptions.connect.ParseException;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IParserEventHandler;
+import org.apache.streampipes.model.connect.grounding.ParserDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Options;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AvroParser implements IParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class);
+
+ public static final String ID =
"org.apache.streampipes.extensions.management.connect.adapter.parser.avro";
+ public static final String LABEL = "Avro";
+ public static final String DESCRIPTION = "Can be used to read avro records";
+
+ public static final String SCHEMA = "schema";
+ public static final String SCHEMA_REGISTRY = "schemaRegistry";
+ public static final String FLATTEN_RECORDS = "flattenRecord";
+
+ private final ParserUtils parserUtils;
+ private DatumReader<GenericRecord> datumReader;
+ private boolean schemaRegistry;
+ private boolean flattenRecord;
+
+
+ public AvroParser() {
+ parserUtils = new ParserUtils();
+ }
+
+ public AvroParser(String schemaString, boolean schemaRegistry, boolean
flattenRecord) {
+ this();
+ Schema schema = new Schema.Parser().parse(schemaString);
+ this.datumReader = new GenericDatumReader<>(schema);
+ this.schemaRegistry = schemaRegistry;
+ this.flattenRecord = flattenRecord;
+ }
+
+ @Override
+ public ParserDescription declareDescription() {
+ return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION)
+ .requiredSingleValueSelection(
+ Labels.from(SCHEMA_REGISTRY, "Schema Registry",
+ "Does the messages include the schema registry header?"),
+ Options.from("yes", "no")
+ )
+ .requiredSingleValueSelection(
+ Labels.from(FLATTEN_RECORDS, "Flatten Records",
+ "Should nested records be flattened?"),
+ Options.from("no", "yes")
+ )
+ .requiredCodeblock(Labels.from(SCHEMA, "Schema",
+ "The schema of the avro record"),
+ "{\n"
+ + " \"namespace\": \"example.avro\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Test\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": \"string\"},\n"
+ + " {\"name\": \"value\", \"type\": \"double\"}\n"
+ + " ]\n"
+ + "}")
+ .build();
+ }
+
+
+ @Override
+ public GuessSchema getGuessSchema(InputStream inputStream) throws
ParseException {
+ GenericRecord avroRecord = getRecord(inputStream);
+ var event = toMap(avroRecord);
+ return parserUtils.getGuessSchema(event);
+ }
+
+ @Override
+ public void parse(InputStream inputStream, IParserEventHandler handler)
throws ParseException {
+ GenericRecord avroRecord = getRecord(inputStream);
+ var event = toMap(avroRecord);
+ handler.handle(event);
+ }
+
+ @Override
+ public IParser fromDescription(List<StaticProperty> configuration) {
+ var extractor = StaticPropertyExtractor.from(configuration);
+ String schema = extractor.codeblockValue(SCHEMA);
+ boolean schemaRegistry = extractor
+ .selectedSingleValue(SCHEMA_REGISTRY, String.class)
+ .equals("yes");
+ boolean flattenRecords = extractor
+ .selectedSingleValue(FLATTEN_RECORDS, String.class)
+ .equals("yes");
+
+ return new AvroParser(schema, schemaRegistry, flattenRecords);
+ }
+
+ private GenericRecord getRecord(InputStream inputStream) throws
ParseException {
+ try {
+ if (schemaRegistry) {
+ inputStream.skipNBytes(5);
+ }
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream,
null);
+ return datumReader.read(null, decoder);
+ } catch (IOException e) {
+ throw new ParseException(
+ "Error decoding the avro message. Please check the schema."
+ );
+ }
+ }
+
+
+ private Map<String, Object> toMap(GenericRecord avroRecord) {
+ Map<String, Object> resultMap = new LinkedHashMap<>();
+ avroRecord.getSchema().getFields().forEach(field -> {
+ String fieldName = field.name();
+ Object fieldValue = avroRecord.get(fieldName);
+ if (flattenRecord && fieldValue instanceof GenericRecord){
+ Map<String, Object> flatMap = unwrapNestedRecord((GenericRecord)
fieldValue, fieldName);
+ resultMap.putAll(flatMap);
+ } else {
+ resultMap.put(fieldName, toMapHelper(fieldValue));
+ }
+ });
+
+ return resultMap;
+ }
+
+ private Object toMapHelper(Object fieldValue) {
+ if (fieldValue instanceof GenericRecord){
+ return toMap((GenericRecord) fieldValue);
+ }
+ if (fieldValue instanceof GenericData.Array<?>){
+ List<Object> valueList = new ArrayList<>();
+ ((GenericData.Array) fieldValue).forEach(value ->
valueList.add(toMapHelper(value)));
+ return valueList;
+ }
+ if (fieldValue instanceof Map<?, ?>){
+ Map<Object, Object> valueMap = new LinkedHashMap<>();
+ ((Map<Object, Object>) fieldValue).forEach((key, value1) ->
valueMap.put(convertUTF8(key), toMapHelper(value1)));
+ return valueMap;
+ }
+ return convertUTF8(fieldValue);
+ }
+
+ private Map<String, Object> unwrapNestedRecord(GenericRecord nestedRecord,
String prefix) {
+ Map<String, Object> flatMap = new HashMap<>();
+
+ nestedRecord.getSchema().getFields().forEach(field -> {
+ String fieldName = field.name();
+ Object fieldValue = nestedRecord.get(fieldName);
+ String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName;
+ if (fieldValue instanceof GenericRecord) {
+ flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey));
+ } else {
+ flatMap.put(newKey, toMapHelper(fieldValue));
+ }
+ });
+
+ return flatMap;
+ }
+
+ private Object convertUTF8(Object fieldValue) {
+ if (fieldValue instanceof Utf8){
+ return fieldValue.toString();
+ }
+ return fieldValue;
+ }
+
+}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
index 3c8a0110da..1dc2937792 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
@@ -30,7 +30,8 @@ public class Parsers {
new JsonParsers(),
new CsvParser(),
new XmlParser(),
- new ImageParser()
+ new ImageParser(),
+ new AvroParser()
);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
index 351bef6996..c79c075576 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/KafkaConnectorsModuleExport.java
@@ -23,9 +23,10 @@ import
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
+import
org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
import
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
-import java.util.Collections;
import java.util.List;
public class KafkaConnectorsModuleExport implements IExtensionModuleExport {
@@ -45,6 +46,9 @@ public class KafkaConnectorsModuleExport implements
IExtensionModuleExport {
@Override
public List<IModelMigrator<?, ?>> migrators() {
- return Collections.emptyList();
+ return List.of(
+ new KafkaAdapterMigrationV1(),
+ new KafkaSinkMigrationV1()
+ );
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index f2e0a76918..490df5a089 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -31,12 +31,12 @@ import
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
import
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
-import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import
org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -67,13 +67,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.Set;
import java.util.stream.Collectors;
public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig {
private static final Logger logger =
LoggerFactory.getLogger(KafkaProtocol.class);
- KafkaConfig config;
+ KafkaAdapterConfig config;
public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
@@ -84,20 +83,18 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
}
private void applyConfiguration(IStaticPropertyExtractor extractor) {
- this.config = KafkaConnectUtils.getConfig(extractor, true);
+ this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor,
true);
}
- private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig)
throws KafkaException {
+ private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig
kafkaConfig) throws KafkaException {
final Properties props = new Properties();
- kafkaConfig.getSecurityConfig().appendConfig(props);
- kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
+ kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "KafkaExampleConsumer" + System.currentTimeMillis());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
@@ -114,61 +111,67 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
IStaticPropertyExtractor
extractor)
throws SpConfigurationException {
RuntimeResolvableOneOfStaticProperty config = extractor
- .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY,
RuntimeResolvableOneOfStaticProperty.class);
- KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
- boolean hideInternalTopics =
extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
+ .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY,
RuntimeResolvableOneOfStaticProperty.class);
+ var kafkaConfig = new
KafkaConfigExtractor().extractAdapterConfig(extractor, false);
+ boolean hideInternalTopics =
extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());
try {
- Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
- Set<String> topics = consumer.listTopics().keySet();
+ var consumer = createConsumer(kafkaConfig);
+ List<String> topics = new
ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
consumer.close();
if (hideInternalTopics) {
topics = topics
.stream()
- .filter(t ->
!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
- .collect(Collectors.toSet());
+ .filter(t ->
(!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
+ &&
!t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
+ .toList();
}
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
return config;
} catch (KafkaException e) {
- throw new SpConfigurationException(e.getMessage(), e);
+ var message = e.getCause() != null ? e.getCause().getMessage() :
e.getMessage();
+ throw new SpConfigurationException(message, e);
}
}
@Override
public IAdapterConfiguration declareConfig() {
- StaticPropertyAlternative latestAlternative =
KafkaConnectUtils.getAlternativesLatest();
+ StaticPropertyAlternative latestAlternative =
KafkaConfigProvider.getAlternativesLatest();
latestAlternative.setSelected(true);
return AdapterConfigurationBuilder
- .create(ID, 0, KafkaProtocol::new)
+ .create(ID, 1, KafkaProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
- .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
- KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
- KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
- KafkaConnectUtils.getAlternativesSaslPlain(),
- KafkaConnectUtils.getAlternativesSaslSSL())
+ .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
+ KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+ KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+ KafkaConfigProvider.getAlternativesSaslPlain(),
+ KafkaConfigProvider.getAlternativesSaslSSL())
- .requiredTextParameter(KafkaConnectUtils.getHostLabel())
- .requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
+ .requiredTextParameter(KafkaConfigProvider.getHostLabel())
+ .requiredIntegerParameter(KafkaConfigProvider.getPortLabel())
- .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(),
true)
+ .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
+ KafkaConfigProvider.getAlternativesRandomGroupId(),
+ KafkaConfigProvider.getAlternativesGroupId())
-
.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(),
Arrays.asList(
- KafkaConnectUtils.HOST_KEY,
- KafkaConnectUtils.PORT_KEY))
-
.requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
- KafkaConnectUtils.getAlternativesEarliest(),
- latestAlternative,
- KafkaConnectUtils.getAlternativesNone())
+ .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(),
true)
+
+
.requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(),
Arrays.asList(
+ KafkaConfigProvider.HOST_KEY,
+ KafkaConfigProvider.PORT_KEY))
+
.requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
+ KafkaConfigProvider.getAlternativesEarliest(),
+ latestAlternative,
+ KafkaConfigProvider.getAlternativesNone())
.buildConfiguration();
}
@@ -182,17 +185,11 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
protocol.setBrokerHostname(config.getKafkaHost());
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
- List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2);
- kafkaConfigAppenderList.add(this.config.getSecurityConfig());
- kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());
-
this.kafkaConsumer = new SpKafkaConsumer(protocol,
config.getTopic(),
- new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
- collector.collect(event);
- }),
- kafkaConfigAppenderList
- );
+ new BrokerEventProcessor(extractor.selectedParser(), collector),
+ config.getConfigAppenders()
+ );
thread = new Thread(this.kafkaConsumer);
thread.start();
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
new file mode 100644
index 0000000000..59a860d050
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaAdapterMigrationV1.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
+import
org.apache.streampipes.extensions.management.connect.adapter.parser.AvroParser;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class KafkaAdapterMigrationV1 implements IAdapterMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ KafkaProtocol.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor
extractor) throws RuntimeException {
+
+ migrateSecurity((StaticPropertyAlternatives) element.getConfig().get(0));
+ migrateAvro((StaticPropertyAlternatives) element.getConfig().get(6));
+ element.getConfig().add(3, makeConsumerGroup());
+ return MigrationResult.success(element);
+ }
+
+ public void migrateSecurity(StaticPropertyAlternatives securityAlternatives)
{
+ migrateGroup(securityAlternatives.getAlternatives().get(2));
+ migrateGroup(securityAlternatives.getAlternatives().get(3));
+ }
+
+ public void migrateAvro(StaticPropertyAlternatives formatAlternatives) {
+ var parser = new AvroParser();
+ var avroParserDescription = new StaticPropertyAlternative(
+ parser.declareDescription().getName(),
+ parser.declareDescription().getName(),
+ parser.declareDescription().getDescription());
+
+
avroParserDescription.setStaticProperty(parser.declareDescription().getConfig());
+ formatAlternatives.getAlternatives().add(
+ avroParserDescription
+ );
+ }
+
+ private StaticPropertyAlternatives makeConsumerGroup() {
+ var consumerGroupAlternatives = StaticProperties.alternatives(
+ KafkaConfigProvider.getConsumerGroupLabel(),
+ KafkaConfigProvider.getAlternativesRandomGroupId(),
+ KafkaConfigProvider.getAlternativesGroupId()
+ );
+ consumerGroupAlternatives.getAlternatives().get(0).setSelected(true);
+ return consumerGroupAlternatives;
+ }
+
+ private void migrateGroup(StaticPropertyAlternative alternative) {
+ boolean selected = alternative.getSelected();
+ var securityMechanism = StaticProperties.singleValueSelection(
+ Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+ KafkaConfigProvider.makeSecurityMechanism());
+ securityMechanism.getOptions().get(0).setSelected(selected);
+ ((StaticPropertyGroup)
alternative.getStaticProperty()).setHorizontalRendering(false);
+ ((StaticPropertyGroup)
alternative.getStaticProperty()).getStaticProperties().add(
+ 0,
+ securityMechanism
+ );
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
new file mode 100644
index 0000000000..d1e1d581a0
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/migration/KafkaSinkMigrationV1.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import
org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+public class KafkaSinkMigrationV1 implements IDataSinkMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ KafkaPublishSink.ID,
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+
IDataSinkParameterExtractor extractor) throws RuntimeException {
+
+ new KafkaAdapterMigrationV1().migrateSecurity(
+ (StaticPropertyAlternatives) element.getStaticProperties().get(3));
+
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
similarity index 74%
rename from
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
rename to
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
index 235a6eda39..e24dccf040 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
@@ -16,14 +16,17 @@
*
*/
-package org.apache.streampipes.messaging.kafka.security;
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-import java.util.Properties;
+public class KafkaAdapterConfig extends KafkaBaseConfig {
-public class KafkaSecurityUnauthenticatedPlainConfig extends
KafkaSecurityConfig {
+ private String groupId;
- @Override
- public void appendConfig(Properties props) {
+ public String getGroupId() {
+ return groupId;
+ }
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
similarity index 55%
rename from
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
rename to
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
index 770cfc0561..dd43893e89 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
@@ -18,28 +18,20 @@
package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-public class KafkaConfig {
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaBaseConfig {
private String kafkaHost;
private Integer kafkaPort;
private String topic;
+ private List<KafkaConfigAppender> configAppenders;
- KafkaSecurityConfig securityConfig;
- AutoOffsetResetConfig autoOffsetResetConfig;
-
- public KafkaConfig(String kafkaHost,
- Integer kafkaPort,
- String topic,
- KafkaSecurityConfig securityConfig,
- AutoOffsetResetConfig autoOffsetResetConfig) {
- this.kafkaHost = kafkaHost;
- this.kafkaPort = kafkaPort;
- this.topic = topic;
- this.securityConfig = securityConfig;
- this.autoOffsetResetConfig = autoOffsetResetConfig;
+ public KafkaBaseConfig() {
+ this.configAppenders = new ArrayList<>();
}
public String getKafkaHost() {
@@ -66,19 +58,11 @@ public class KafkaConfig {
this.topic = topic;
}
- public KafkaSecurityConfig getSecurityConfig() {
- return securityConfig;
- }
-
- public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
- this.securityConfig = securityConfig;
- }
-
- public AutoOffsetResetConfig getAutoOffsetResetConfig() {
- return autoOffsetResetConfig;
+ public List<KafkaConfigAppender> getConfigAppenders() {
+ return configAppenders;
}
- public void setAutoOffsetResetConfig(AutoOffsetResetConfig
autoOffsetResetConfig) {
- this.autoOffsetResetConfig = autoOffsetResetConfig;
+ public void setConfigAppenders(List<KafkaConfigAppender> configAppenders) {
+ this.configAppenders = configAppenders;
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
new file mode 100644
index 0000000000..d06e544e74
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
+import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.ArrayList;
+
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY;
+import static
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY;
+
+public class KafkaConfigExtractor {
+
+ public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor
extractor,
+ boolean containsTopic) {
+
+ var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
+
+ var topic = "";
+ if (containsTopic) {
+ topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
+ }
+ config.setTopic(topic);
+
+ if
(extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID))
{
+ config.setGroupId("StreamPipesKafkaConsumer" +
System.currentTimeMillis());
+ } else {
+ config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT,
String.class));
+ }
+
+ StaticPropertyAlternatives alternatives =
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
+ StaticPropertyAlternatives.class);
+
+ // Set default value if no value is provided.
+ if (alternatives == null) {
+ config.getConfigAppenders().add(new
AutoOffsetResetConfig(KafkaConfigProvider.LATEST));
+ } else {
+ String auto =
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
+ config.getConfigAppenders().add(new AutoOffsetResetConfig(auto));
+ }
+ return config;
+ }
+
+ public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) {
+ var config = extractCommonConfigs(extractor, new KafkaBaseConfig());
+ config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class));
+
+ return config;
+ }
+
+ private <T extends KafkaBaseConfig> T
extractCommonConfigs(IParameterExtractor extractor,
+ T config) {
+ var configAppenders = new ArrayList<KafkaConfigAppender>();
+ var env = Environments.getEnvironment();
+ config.setKafkaHost(extractor.singleValueParameter(HOST_KEY,
String.class));
+ config.setKafkaPort(extractor.singleValueParameter(PORT_KEY,
Integer.class));
+
+ var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+ var securityProtocol = getSecurityProtocol(authentication);
+ configAppenders.add(new
KafkaSecurityProtocolConfigAppender(securityProtocol, env));
+
+ // check if SASL authentication is defined
+ if (isSaslSecurityMechanism(securityProtocol)) {
+ String username = extractor.singleValueParameter(USERNAME_KEY,
String.class);
+ String password = extractor.secretValue(PASSWORD_KEY);
+ String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM,
String.class);
+
+ configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism,
username, password));
+ }
+ config.setConfigAppenders(configAppenders);
+
+ return config;
+ }
+
+ private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) {
+ return SecurityProtocol.SASL_PLAINTEXT == securityProtocol ||
SecurityProtocol.SASL_SSL == securityProtocol;
+ }
+
+ public SecurityProtocol getSecurityProtocol(String
selectedSecurityConfiguration) {
+ return switch (selectedSecurityConfiguration) {
+ case "unauthenticated-ssl" -> SecurityProtocol.SSL;
+ case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT;
+ case "sasl-ssl" -> SecurityProtocol.SASL_SSL;
+ default -> SecurityProtocol.PLAINTEXT;
+ };
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
new file mode 100644
index 0000000000..05f652f780
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.List;
+
+public class KafkaConfigProvider {
+
+ public static final String TOPIC_KEY = "topic";
+ public static final String HOST_KEY = "host";
+ public static final String PORT_KEY = "port";
+
+ public static final String ACCESS_MODE = "access-mode";
+ public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+ public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+ public static final String SASL_PLAIN = "sasl-plain";
+ public static final String SASL_SSL = "sasl-ssl";
+
+ public static final String SECURITY_MECHANISM = "security-mechanism";
+ public static final String USERNAME_GROUP = "username-group";
+ public static final String USERNAME_KEY = "username";
+ public static final String PASSWORD_KEY = "password";
+
+ public static final String CONSUMER_GROUP = "consumer-group";
+ public static final String RANDOM_GROUP_ID = "random-group-id";
+ public static final String GROUP_ID = "group-id";
+ public static final String GROUP_ID_INPUT = "group-id-input";
+
+
+ private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
+
+ public static final String AUTO_OFFSET_RESET_CONFIG =
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+ public static final String EARLIEST = "earliest";
+ public static final String LATEST = "latest";
+ public static final String NONE = "none";
+
+ public static Label getTopicLabel() {
+ return Labels.withId(TOPIC_KEY);
+ }
+
+ public static Label getHideInternalTopicsLabel() {
+ return Labels.withId(HIDE_INTERNAL_TOPICS);
+ }
+
+ public static String getHideInternalTopicsKey() {
+ return HIDE_INTERNAL_TOPICS;
+ }
+
+ public static Label getHostLabel() {
+ return Labels.withId(HOST_KEY);
+ }
+
+ public static Label getPortLabel() {
+ return Labels.withId(PORT_KEY);
+ }
+
+ public static Label getAccessModeLabel() {
+ return Labels.withId(ACCESS_MODE);
+ }
+
+ public static Label getConsumerGroupLabel() {
+ return Labels.withId(CONSUMER_GROUP);
+ }
+
+ public static Label getAutoOffsetResetConfigLabel() {
+ return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
+ }
+
+ public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain()
{
+ return
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN));
+ }
+
+ public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
+ return
Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL));
+ }
+
+ public static StaticPropertyAlternative getAlternativesSaslPlain() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN),
+ makeAuthenticationGroup()
+ );
+ }
+
+ public static StaticPropertyAlternative getAlternativesSaslSSL() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL),
+ makeAuthenticationGroup());
+ }
+
+ public static StaticPropertyAlternative getAlternativesRandomGroupId() {
+ return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
+ }
+
+ public static StaticPropertyAlternative getAlternativesGroupId() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID),
+
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT)));
+ }
+
+ public static StaticPropertyAlternative getAlternativesLatest() {
+ return Alternatives.from(Labels.withId(LATEST));
+ }
+
+ public static StaticPropertyAlternative getAlternativesEarliest() {
+ return Alternatives.from(Labels.withId(EARLIEST));
+ }
+
+ public static StaticPropertyAlternative getAlternativesNone() {
+ return Alternatives.from(Labels.withId(NONE));
+ }
+
+ private static StaticPropertyGroup makeAuthenticationGroup() {
+ var group =
StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP),
+ StaticProperties.singleValueSelection(
+ Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+ makeSecurityMechanism()),
+
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)),
+
StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY)));
+ group.setHorizontalRendering(false);
+ return group;
+ }
+
+ public static List<Option> makeSecurityMechanism() {
+ return List.of(
+ new Option("PLAIN"),
+ new Option("SCRAM-SHA-256"),
+ new Option("SCRAM-SHA-512")
+ );
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
deleted file mode 100644
index f56a230cbb..0000000000
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-
-import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
-import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Label;
-import org.apache.streampipes.sdk.helpers.Labels;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-public class KafkaConnectUtils {
-
- public static final String TOPIC_KEY = "topic";
- public static final String HOST_KEY = "host";
- public static final String PORT_KEY = "port";
-
- public static final String KEY_SERIALIZATION = "key-serialization";
- public static final String VALUE_SERIALIZATION = "value-serialization";
-
- public static final String KEY_DESERIALIZATION = "key-deserialization";
- public static final String VALUE_DESERIALIZATION = "value-deserialization";
-
- public static final String ACCESS_MODE = "access-mode";
- public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
- public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
- public static final String SASL_PLAIN = "sasl-plain";
- public static final String SASL_SSL = "sasl-ssl";
-
- public static final String USERNAME_GROUP = "username-group";
- public static final String USERNAME_KEY = "username";
- public static final String PASSWORD_KEY = "password";
-
-
- private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
-
- public static final String AUTO_OFFSET_RESET_CONFIG =
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
- public static final String EARLIEST = "earliest";
- public static final String LATEST = "latest";
- public static final String NONE = "none";
-
- public static Label getTopicLabel() {
- return Labels.withId(TOPIC_KEY);
- }
-
- public static Label getHideInternalTopicsLabel() {
- return Labels.withId(HIDE_INTERNAL_TOPICS);
- }
-
- public static String getHideInternalTopicsKey() {
- return HIDE_INTERNAL_TOPICS;
- }
-
- public static Label getHostLabel() {
- return Labels.withId(HOST_KEY);
- }
-
- public static Label getPortLabel() {
- return Labels.withId(PORT_KEY);
- }
-
- public static Label getAccessModeLabel() {
- return Labels.withId(ACCESS_MODE);
- }
-
- public static Label getAutoOffsetResetConfigLabel() {
- return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
- }
-
-
- public static KafkaConfig getConfig(IStaticPropertyExtractor extractor,
boolean containsTopic) {
- String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
- String topic = "";
- if (containsTopic) {
- topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
- }
-
- Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-
- String authentication =
extractor.selectedAlternativeInternalId(ACCESS_MODE);
- boolean isUseSSL = isUseSSL(authentication);
-
- KafkaSecurityConfig securityConfig;
-
- //KafkaSerializerConfig serializerConfig = new
KafkaSerializerByteArrayConfig()
-
- // check if a user for the authentication is defined
- if (authentication.equals(KafkaConnectUtils.SASL_SSL) ||
authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
- String username = extractor.singleValueParameter(USERNAME_KEY,
String.class);
- String password = extractor.secretValue(PASSWORD_KEY);
-
- securityConfig = isUseSSL
- ? new KafkaSecuritySaslSSLConfig(username, password) :
- new KafkaSecuritySaslPlainConfig(username, password);
- } else {
- // set security config for none authenticated access
- securityConfig = isUseSSL
- ? new KafkaSecurityUnauthenticatedSSLConfig() :
- new KafkaSecurityUnauthenticatedPlainConfig();
- }
-
- StaticPropertyAlternatives alternatives =
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
- StaticPropertyAlternatives.class);
-
- // Set default value if no value is provided.
- if (alternatives == null) {
- AutoOffsetResetConfig autoOffsetResetConfig = new
AutoOffsetResetConfig(KafkaConnectUtils.LATEST);
-
- return new KafkaConfig(brokerUrl, port, topic, securityConfig,
autoOffsetResetConfig);
- } else {
- String auto =
extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
- AutoOffsetResetConfig autoOffsetResetConfig = new
AutoOffsetResetConfig(auto);
-
- return new KafkaConfig(brokerUrl, port, topic, securityConfig,
autoOffsetResetConfig);
- }
- }
-
- private static boolean isUseSSL(String authentication) {
- if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)
- || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
- return false;
- } else {
- return true;
- }
- }
-
-
- public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain()
{
- return
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_PLAIN));
- }
-
- public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
- return
Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_SSL));
- }
-
- public static StaticPropertyAlternative getAlternativesSaslPlain() {
- return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_PLAIN),
- StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
- }
-
- public static StaticPropertyAlternative getAlternativesSaslSSL() {
- return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_SSL),
- StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
-
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
-
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
- }
-
-
- public static StaticPropertyAlternative getAlternativesLatest() {
- return Alternatives.from(Labels.withId(LATEST));
- }
-
- public static StaticPropertyAlternative getAlternativesEarliest() {
- return Alternatives.from(Labels.withId(EARLIEST));
- }
-
- public static StaticPropertyAlternative getAlternativesNone() {
- return Alternatives.from(Labels.withId(NONE));
- }
-}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
deleted file mode 100644
index b1e439547f..0000000000
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.sink;
-
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
-
-public class KafkaParameters {
-
- private final String kafkaHost;
-
- private final Integer kafkaPort;
-
- private final String topic;
-
- private final String authentication;
-
- private String username;
-
- private String password;
-
- private final boolean useSSL;
-
- public KafkaParameters(IDataSinkParameters params) {
- var extractor = params.extractor();
- this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY,
String.class);
- this.kafkaHost =
extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
- this.kafkaPort =
extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
- this.authentication =
extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
-
- if (!useAuthentication()) {
- this.useSSL =
KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication);
- } else {
- String username =
extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
- String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
- this.username = username;
- this.password = password;
- this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication);
- }
- }
-
- public String getKafkaHost() {
- return kafkaHost;
- }
-
- public Integer getKafkaPort() {
- return kafkaPort;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public String getAuthentication() {
- return authentication;
- }
-
- public boolean isUseSSL() {
- return useSSL;
- }
-
- public boolean useAuthentication() {
- return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication)
- || KafkaConnectUtils.SASL_SSL.equals(this.authentication);
- }
-}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
index 238bbb8048..61460413b3 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaPublishSink.java
@@ -24,13 +24,9 @@ import
org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import
org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
@@ -41,17 +37,20 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Map;
public class KafkaPublishSink implements IStreamPipesDataSink {
+ public static final String ID =
"org.apache.streampipes.sinks.brokers.jvm.kafka";
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaPublishSink.class);
+
private SpKafkaProducer producer;
private JsonDataFormatDefinition dataFormatDefinition;
- private KafkaParameters params;
-
public KafkaPublishSink() {
}
@@ -59,7 +58,7 @@ public class KafkaPublishSink implements IStreamPipesDataSink
{
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
KafkaPublishSink::new,
-
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka", 0)
+ DataSinkBuilder.create(ID, 1)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
@@ -68,15 +67,15 @@ public class KafkaPublishSink implements
IStreamPipesDataSink {
.requiredProperty(EpRequirements.anyProperty())
.build())
- .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY),
false, false)
- .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY),
false, false)
-
.requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+
.requiredTextParameter(Labels.withId(KafkaConfigProvider.TOPIC_KEY), false,
false)
+
.requiredTextParameter(Labels.withId(KafkaConfigProvider.HOST_KEY), false,
false)
+
.requiredIntegerParameter(Labels.withId(KafkaConfigProvider.PORT_KEY), 9092)
- .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
- KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
- KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
- KafkaConnectUtils.getAlternativesSaslPlain(),
- KafkaConnectUtils.getAlternativesSaslSSL())
+
.requiredAlternatives(Labels.withId(KafkaConfigProvider.ACCESS_MODE),
+ KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+ KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+ KafkaConfigProvider.getAlternativesSaslPlain(),
+ KafkaConfigProvider.getAlternativesSaslSSL())
.build()
);
}
@@ -84,26 +83,13 @@ public class KafkaPublishSink implements
IStreamPipesDataSink {
@Override
public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
- this.params = new KafkaParameters(parameters);
+ var kafkaConfig = new
KafkaConfigExtractor().extractSinkConfig(parameters.extractor());
this.dataFormatDefinition = new JsonDataFormatDefinition();
- KafkaSecurityConfig securityConfig;
- // check if a user for the authentication is defined
- if (params.useAuthentication()) {
- securityConfig = params.isUseSSL()
- ? new KafkaSecuritySaslSSLConfig(params.getUsername(),
params.getPassword()) :
- new KafkaSecuritySaslPlainConfig(params.getUsername(),
params.getPassword());
- } else {
- // set security config for none authenticated access
- securityConfig = params.isUseSSL()
- ? new KafkaSecurityUnauthenticatedSSLConfig() :
- new KafkaSecurityUnauthenticatedPlainConfig();
- }
-
this.producer = new SpKafkaProducer(
- params.getKafkaHost() + ":" + params.getKafkaPort(),
- params.getTopic(),
- List.of(securityConfig));
+ kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort(),
+ kafkaConfig.getTopic(),
+ kafkaConfig.getConfigAppenders());
}
@Override
@@ -112,7 +98,7 @@ public class KafkaPublishSink implements
IStreamPipesDataSink {
Map<String, Object> rawEvent = event.getRaw();
this.producer.publish(dataFormatDefinition.fromMap(rawEvent));
} catch (SpRuntimeException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index fd8ef54bc4..4809237f5b 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -34,23 +34,38 @@ username.description=
password.title=Password
password.description=
-access-mode.title=Access Mode
-access-mode.description=Unauthenticated or SASL/PLAIN
+access-mode.title=Security protocol
+access-mode.description=Security protocol used to communicate with brokers.
Corresponds to Kafka Client security.protocol property
-unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.title=PLAINTEXT
unauthenticated-plain.description=No authentication and plaintext
-unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.title=SSL
unauthenticated-ssl.description=Using SSL with no authentication
-sasl-plain.title=SASL/PLAIN
-sasl-plain.description=Username and password, no encryption
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication.
Corresponds to Kafka Client sasl.mechanism property
+
+sasl-plain.title=SASL/PLAINTEXT
+sasl-plain.description=SASL authentication, no encryption
sasl-ssl.title=SASL/SSL
-sasl-ssl.description=Username and password, with ssl encryption
+sasl-ssl.description=SASL authentication, with ssl encryption
username-group.title=Username and password
+consumer-group.title=Consumer Group
+consumer-group.description=Use random group id or insert a specific one
+
+random-group-id.title=Random group id
+random-group-id.description=StreamPipes generates a random group id
+
+group-id.title=Insert group id
+group-id.description=Insert the group id
+
+group-id-input.title=Group id
+group-id-input.description=
+
key-deserialization.title=Key Deserializer
key-deserialization.description=
@@ -70,4 +85,4 @@ latest.title=Latest
latest.description=Offsets are initialized to the Latest
none.title=None
-none.description=Consumer throws exceptions
\ No newline at end of file
+none.description=Consumer throws exceptions
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index a2e8d1b5f1..c4cef7cf4e 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -39,16 +39,19 @@ password.description=The password to authenticate with the
broker
access-mode.title=Access Mode
access-mode.description=Unauthenticated or SASL/PLAIN
-unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.title=PLAINTEXT
unauthenticated-plain.description=No authentication and plaintext
-unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.title=SSL
unauthenticated-ssl.description=Using SSL with no authentication
-sasl-plain.title=SASL/PLAIN
+sasl-plain.title=SASL/PLAINTEXT
sasl-plain.description=Username and password, no encryption
sasl-ssl.title=SASL/SSL
sasl-ssl.description=Username and password, with ssl encryption
+security-mechanism.title=Security Mechanism
+security-mechanism.description=SASL mechanism used for authentication.
Corresponds to Kafka Client sasl.mechanism property
+
username-group.title=Username and password
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
index 4b132d19f8..72f01b0c28 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -44,7 +44,7 @@ public class KeyStoreLoader {
public KeyStoreLoader load(Environment env,
Path securityDir) throws Exception {
- var keystore =
KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault());
+ var keystore =
KeyStore.getInstance(env.getOpcUaKeystoreType().getValueOrDefault());
var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault();
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index e487c1aaaa..1e92fd2d48 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -64,6 +64,11 @@
<artifactId>streampipes-connectors-kafka</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-mqtt</artifactId>
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index 75883a244a..db830f1b7a 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -22,7 +22,7 @@ import
org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
-import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.integration.containers.KafkaContainer;
import org.apache.streampipes.integration.containers.KafkaDevContainer;
import org.apache.streampipes.manager.template.AdapterTemplateHandler;
@@ -66,12 +66,12 @@ public class KafkaAdapterTester extends AdapterTesterBase {
list.add(new Option(TOPIC));
((RuntimeResolvableOneOfStaticProperty)
configuration.getAdapterDescription()
.getConfig()
- .get(4))
+ .get(5))
.setOptions(list);
List<Map<String, Object>> configs = new ArrayList<>();
- configs.add(Map.of(KafkaConnectUtils.HOST_KEY,
kafkaContainer.getBrokerHost()));
- configs.add(Map.of(KafkaConnectUtils.PORT_KEY,
kafkaContainer.getBrokerPort()));
- configs.add(Map.of(KafkaConnectUtils.TOPIC_KEY, TOPIC));
+ configs.add(Map.of(KafkaConfigProvider.HOST_KEY,
kafkaContainer.getBrokerHost()));
+ configs.add(Map.of(KafkaConfigProvider.PORT_KEY,
kafkaContainer.getBrokerPort()));
+ configs.add(Map.of(KafkaConfigProvider.TOPIC_KEY, TOPIC));
var template = new PipelineElementTemplate("name", "description", configs);
@@ -89,17 +89,25 @@ public class KafkaAdapterTester extends AdapterTesterBase {
.get(0)
.setSelected(true);
+ // Set consumer group to random group id
+ ((StaticPropertyAlternatives) (desc)
+ .getConfig()
+ .get(3))
+ .getAlternatives()
+ .get(0)
+ .setSelected(true);
+
// Set AUTO_OFFSET_RESET_CONFIG configuration to Earliest option
((StaticPropertyAlternatives) (desc)
.getConfig()
- .get(5))
+ .get(6))
.getAlternatives()
.get(0)
.setSelected(true);
((StaticPropertyAlternatives) (desc)
.getConfig()
- .get(5))
+ .get(6))
.getAlternatives()
.get(1)
.setSelected(false);
@@ -107,7 +115,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
// Set format to Json
((StaticPropertyAlternatives) (desc)
.getConfig()
- .get(6))
+ .get(7))
.getAlternatives()
.get(0)
.setSelected(true);
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
index 1e53ebd225..77c5cb5afa 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
@@ -18,9 +18,11 @@
package org.apache.streampipes.messaging.kafka.config;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
import java.util.Properties;
public interface KafkaConfigAppender {
- void appendConfig(Properties props);
+ void appendConfig(Properties props) throws SpRuntimeException;
}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
deleted file mode 100644
index 646033f437..0000000000
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-public abstract class KafkaSecurityConfig implements KafkaConfigAppender {
-
-}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
new file mode 100644
index 0000000000..1a1a9bd92a
--- /dev/null
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecurityProtocolConfigAppender implements
KafkaConfigAppender {
+
+ private final SecurityProtocol securityProtocol;
+ private final Environment env;
+
+ public KafkaSecurityProtocolConfigAppender(SecurityProtocol securityProtocol,
+ Environment env) {
+ this.securityProtocol = securityProtocol;
+ this.env = env;
+ }
+
+ @Override
+ public void appendConfig(Properties props) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
securityProtocol.toString());
+
+ if (isSslProtocol()) {
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
env.getKeystoreType().getValueOrDefault());
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
env.getKeystoreFilename().getValueOrDefault());
+ props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
env.getKeystorePassword().getValueOrDefault());
+
+ if (env.getKeyPassword().exists()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
env.getKeyPassword().getValueOrDefault());
+ }
+
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
env.getTruststoreType().getValueOrDefault());
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
env.getTruststoreFilename().getValueOrDefault());
+
+ if (env.getTruststorePassword().exists()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
env.getTruststorePassword().getValueOrDefault());
+ }
+
+ if (env.getAllowSelfSignedCertificates().getValueOrDefault()) {
+ props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
+ }
+ }
+
+ private boolean isSslProtocol() {
+ return securityProtocol == SecurityProtocol.SSL || securityProtocol ==
SecurityProtocol.SASL_SSL;
+ }
+}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
similarity index 50%
rename from
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
rename to
streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
index 2c5ef691e3..68698f1217 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslConfigAppender.java
@@ -18,34 +18,56 @@
package org.apache.streampipes.messaging.kafka.security;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.util.Properties;
-public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {
+public class KafkaSecuritySaslConfigAppender implements KafkaConfigAppender {
+ private final String securityMechanism;
private final String username;
private final String password;
- public KafkaSecuritySaslSSLConfig(String username, String password) {
+ public KafkaSecuritySaslConfigAppender(String securityMechanism,
+ String username,
+ String password) {
+ this.securityMechanism = securityMechanism;
this.username = username;
this.password = password;
}
@Override
- public void appendConfig(Properties props) {
+ public void appendConfig(Properties props) throws SpRuntimeException {
+ props.put(SaslConfigs.SASL_MECHANISM, securityMechanism);
+ String saslJaasConfig = makeJaasConfig();
+
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+ }
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_SSL.toString());
+ private String makeJaasConfig() {
+ if (securityMechanism.equals("PLAIN")) {
+ return makeSaslPlainConfig();
+ } else {
+ return makeSaslScramConfig();
+ }
+ }
- String saslJaasConfig =
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ private String makeSaslPlainConfig() {
+ return "org.apache.kafka.common.security.plain.PlainLoginModule required
username=\""
+ username
+ "\" password=\""
+ password
+ "\";";
+ }
- props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
+ private String makeSaslScramConfig() {
+ return "org.apache.kafka.common.security.scram.ScramLoginModule required
username=\""
+ + username
+ + "\" password=\""
+ + password
+ + "\";";
}
}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
deleted file mode 100644
index a7e5fa94fc..0000000000
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {
-
- private final String username;
- private final String password;
-
- public KafkaSecuritySaslPlainConfig(String username, String password) {
- this.username = username;
- this.password = password;
- }
-
- @Override
- public void appendConfig(Properties props) {
-
- props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_PLAINTEXT.toString());
-
- String saslJaasConfig =
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
- + username
- + "\" password=\""
- + password
- + "\";";
-
- props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
- }
-}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
deleted file mode 100644
index 8fdd02fd12..0000000000
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.messaging.kafka.security;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Properties;
-
-public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig
{
-
- @Override
- public void appendConfig(Properties props) {
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SSL.toString());
- }
-}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index 32cfb73044..ce66dbeed8 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.resource.management.SpResourceManager;
+import org.apache.streampipes.resource.management.secret.SecretProvider;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
@@ -75,6 +76,7 @@ public class RuntimeResolvableResource extends
AbstractAdapterResource<WorkerAdm
SpServiceUrlProvider.ADAPTER,
runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags()
);
+
SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties());
RuntimeOptionsResponse result =
WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest);
return ok(result);
diff --git
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
index 8a3d351638..591f8b4b67 100644
---
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
+++
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java
@@ -53,6 +53,7 @@ public class ParserDescriptionBuilder extends
elementDescription.getName(),
elementDescription.getDescription(),
getStaticProperties());
+ group.setHorizontalRendering(false);
this.elementDescription.setConfig(group);
}
}
diff --git a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
index 7029ac309a..51d149369b 100644
--- a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
+++ b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
@@ -35,7 +35,7 @@ describe('Test Kafka Integration', () => {
const sink: PipelineElementInput = PipelineElementBuilder.create(
'kafka_publisher',
)
- .addInput('radio', 'access-mode-unauthenticated_plain', '')
+ .addInput('radio', 'access-mode-plaintext', '')
.addInput('input', 'host', host)
.addInput(
'input',
@@ -48,7 +48,7 @@ describe('Test Kafka Integration', () => {
const adapter = AdapterBuilder.create('Apache_Kafka')
.setName('Kafka4')
.setTimestampProperty('timestamp')
- .addProtocolInput('radio', 'access-mode-unauthenticated_plain', '')
+ .addProtocolInput('radio', 'access-mode-plaintext', '')
.addProtocolInput('input', 'host', host)
.addProtocolInput('input', 'port', port)
.addProtocolInput('click', 'sp-reload', '')
diff --git
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index eb6ffcc10c..57588eb155 100644
---
a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++
b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -18,6 +18,8 @@
import { Component, OnChanges, OnInit } from '@angular/core';
import {
+ Option,
+ RuntimeResolvableAnyStaticProperty,
RuntimeResolvableOneOfStaticProperty,
StaticPropertyUnion,
} from '@streampipes/platform-services';
@@ -48,13 +50,39 @@ export class StaticRuntimeResolvableOneOfInputComponent
}
afterOptionsLoaded(staticProperty: RuntimeResolvableOneOfStaticProperty) {
- this.staticProperty.options = staticProperty.options;
if (
- this.staticProperty.options &&
- this.staticProperty.options.length > 0
+ this.staticProperty.options?.length > 0 &&
+ this.isOptionSelected()
) {
- this.staticProperty.options[0].selected = true;
+ const selectedOption = this.staticProperty.options.find(
+ o => o.selected,
+ );
+ this.addSelectedOption(staticProperty, selectedOption);
+ } else {
+ if (staticProperty.options?.length > 0) {
+ staticProperty.options[0].selected = true;
+ }
}
+ this.staticProperty.options = staticProperty.options;
+ }
+
+ isOptionSelected(): boolean {
+ return this.staticProperty.options.find(o => o.selected) !== undefined;
+ }
+
+ addSelectedOption(
+ staticProperty: RuntimeResolvableOneOfStaticProperty,
+ selectedOption: Option,
+ ): void {
+ staticProperty.options
+ .filter(o => {
+ return o.internalName !== null
+ ? o.internalName === selectedOption.internalName
+ : o.name === selectedOption.name;
+ })
+ .forEach(o => {
+ o.selected = true;
+ });
}
select(id) {
@@ -65,7 +93,6 @@ export class StaticRuntimeResolvableOneOfInputComponent
option => option.elementId === id,
).selected = true;
this.performValidation();
- this.applyCompletedConfiguration(true);
}
parse(