This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6cf40815406 KAFKA-17985: Set share.auto.offset.reset to earliest in 
ShareRoundTripWorker (#17758)
6cf40815406 is described below

commit 6cf40815406b718c49c5ea3d55003c2f665f6f44
Author: ShivsundarR <[email protected]>
AuthorDate: Tue Nov 12 11:54:28 2024 -0500

    KAFKA-17985: Set share.auto.offset.reset to earliest in 
ShareRoundTripWorker (#17758)
    
    After the share.auto.offset.reset dynamic config was added for share groups 
in this commit - 9db5ed0, we needed to update this config value to "earliest" 
in ShareRoundTripWorker when it creates the consumer.
    
    Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 build.gradle                                       |  5 +++
 checkstyle/import-control.xml                      |  1 +
 .../apache/kafka/trogdor/common/WorkerUtils.java   |  2 +-
 .../trogdor/workload/ShareRoundTripWorker.java     | 43 ++++++++++++++++++++--
 4 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/build.gradle b/build.gradle
index 20eeb3eff47..97643dfc2e8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2469,11 +2469,16 @@ project(':trogdor') {
     implementation libs.jettyServlet
     implementation libs.jettyServlets
 
+    implementation project(':group-coordinator')
+    implementation project(':group-coordinator:group-coordinator-api')
+
     testImplementation project(':clients')
     testImplementation libs.junitJupiter
     testImplementation project(':clients').sourceSets.test.output
     testImplementation libs.mockitoCore
 
+    testImplementation project(':group-coordinator')
+
     testRuntimeOnly runtimeTestLibs
   }
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9465f016c8a..eb4131fbc47 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -368,6 +368,7 @@
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.test"/>
     <allow pkg="org.apache.kafka.trogdor" />
+    <allow pkg="org.apache.kafka.coordinator" />
     <allow pkg="org.eclipse.jetty" />
     <allow pkg="org.glassfish.jersey" />
   </subpackage>
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 693ae2e6d3b..2c8e0cd7c74 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -333,7 +333,7 @@ public final class WorkerUtils {
         return out;
     }
 
-    private static Admin createAdminClient(
+    public static Admin createAdminClient(
         String bootstrapServers,
         Map<String, String> commonClientConf, Map<String, String> 
adminClientConf) {
         Properties props = new Properties();
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
index 2ce028243a2..27df3147462 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
@@ -17,22 +17,35 @@
 
 package org.apache.kafka.trogdor.workload;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaShareConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-
+import java.util.concurrent.TimeUnit;
 
 public class ShareRoundTripWorker extends RoundTripWorkerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(ShareRoundTripWorker.class);
     KafkaShareConsumer<byte[], byte[]> consumer;
-
     ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -49,7 +62,16 @@ public class ShareRoundTripWorker extends 
RoundTripWorkerBase {
         // user may over-write the defaults with common client config and 
consumer config
         WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
 
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-share-group-" + 
id);
+        String groupId = "round-trip-share-group-" + id;
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        try (Admin adminClient = 
WorkerUtils.createAdminClient(spec.bootstrapServers(), spec.commonClientConf(), 
spec.adminClientConf())) {
+            alterShareAutoOffsetReset(groupId, "earliest", adminClient);
+        } catch (Exception e) {
+            log.warn("Failed to set share.auto.offset.reset config to 
'earliest' mode", e);
+            throw e;
+        }
+
         consumer = new KafkaShareConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
         consumer.subscribe(spec.activeTopics().materialize().keySet());
@@ -65,4 +87,19 @@ public class ShareRoundTripWorker extends 
RoundTripWorkerBase {
         Utils.closeQuietly(consumer, "consumer");
         consumer = null;
     }
+
+    private void alterShareAutoOffsetReset(String groupId, String newValue, 
Admin adminClient) {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
+        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+        try {
+            adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                    .all()
+                    .get(60, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new RuntimeException("Exception was thrown while attempting 
to set share.auto.offset.reset config: ", e);
+        }
+    }
 }

Reply via email to