This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch fix-kafka-group-id
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 0d7dd87bc95f8d287d5e2a5e04caab28bbe94682
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 9 15:48:36 2025 +0100

    fix: Use correct user-defined groupId in Kafka adapter
---
 .../connectors/kafka/adapter/KafkaProtocol.java    |  8 ++----
 .../kafka/shared/kafka/KafkaAdapterConfig.java     | 32 ----------------------
 .../kafka/shared/kafka/KafkaConfigExtractor.java   | 11 +++++---
 .../messaging/kafka/SpKafkaConsumer.java           |  1 +
 .../kafka/config/AbstractConfigFactory.java        |  2 +-
 5 files changed, 12 insertions(+), 42 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index 780d454e60..0e00e94b01 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -31,7 +31,7 @@ import 
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
 import 
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
+import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig;
 import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
 import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
 import 
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
 public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProtocol.class);
-  private KafkaAdapterConfig config;
+  private KafkaBaseConfig config;
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
 
@@ -88,7 +88,7 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, 
true);
   }
 
-  private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig 
kafkaConfig) throws KafkaException {
+  private Consumer<byte[], byte[]> createConsumer(KafkaBaseConfig kafkaConfig) 
throws KafkaException {
     final Properties props = new Properties();
 
     kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));
@@ -96,8 +96,6 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
 
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
-
     props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);
 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
deleted file mode 100644
index e24dccf040..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-
-public class KafkaAdapterConfig extends KafkaBaseConfig {
-
-  private String groupId;
-
-  public String getGroupId() {
-    return groupId;
-  }
-
-  public void setGroupId(String groupId) {
-    this.groupId = groupId;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
index 0289fb56ca..98c964ef44 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -28,6 +28,7 @@ import 
org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConf
 import 
org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.ArrayList;
@@ -51,10 +52,10 @@ import static 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.Ka
 
 public class KafkaConfigExtractor {
 
-  public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor 
extractor,
+  public KafkaBaseConfig extractAdapterConfig(IStaticPropertyExtractor 
extractor,
                                                  boolean containsTopic) {
 
-    var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
+    var config = extractCommonConfigs(extractor, new KafkaBaseConfig());
 
     var topic = "";
     if (containsTopic) {
@@ -62,11 +63,13 @@ public class KafkaConfigExtractor {
     }
     config.setTopic(topic);
 
+    var groupId = "";
     if 
(extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID))
 {
-      config.setGroupId("StreamPipesKafkaConsumer" + 
System.currentTimeMillis());
+      groupId = "StreamPipesKafkaConsumer" + System.currentTimeMillis();
     } else {
-      config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, 
String.class));
+      groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class);
     }
+    config.getConfigAppenders().add(new 
SimpleConfigAppender(Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)));
 
     StaticPropertyAlternatives alternatives = 
extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
         StaticPropertyAlternatives.class);
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index fcbd38224f..6b8f1bbc23 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -82,6 +82,7 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
 
     Properties props = makeProperties(protocol, appenders);
 
+    LOG.info("Using kafka properties: {}", props.toString());
     KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
     if (!patternTopic) {
       consumer.subscribe(Collections.singletonList(topic));
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
index 8225986c6d..4a12c6341c 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
@@ -48,6 +48,6 @@ public abstract class AbstractConfigFactory {
     Properties props = makeDefaultProperties();
     appenders.forEach(appender -> appender.appendConfig(props));
 
-    return  props;
+    return props;
   }
 }

Reply via email to