This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6cf40815406 KAFKA-17985: Set share.auto.offset.reset to earliest in
ShareRoundTripWorker (#17758)
6cf40815406 is described below
commit 6cf40815406b718c49c5ea3d55003c2f665f6f44
Author: ShivsundarR <[email protected]>
AuthorDate: Tue Nov 12 11:54:28 2024 -0500
KAFKA-17985: Set share.auto.offset.reset to earliest in
ShareRoundTripWorker (#17758)
After the share.auto.offset.reset dynamic config was added for share groups
in this commit - 9db5ed0, we needed to update this config value to "earliest"
in ShareRoundTripWorker when it creates the consumer.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
build.gradle | 5 +++
checkstyle/import-control.xml | 1 +
.../apache/kafka/trogdor/common/WorkerUtils.java | 2 +-
.../trogdor/workload/ShareRoundTripWorker.java | 43 ++++++++++++++++++++--
4 files changed, 47 insertions(+), 4 deletions(-)
diff --git a/build.gradle b/build.gradle
index 20eeb3eff47..97643dfc2e8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2469,11 +2469,16 @@ project(':trogdor') {
implementation libs.jettyServlet
implementation libs.jettyServlets
+ implementation project(':group-coordinator')
+ implementation project(':group-coordinator:group-coordinator-api')
+
testImplementation project(':clients')
testImplementation libs.junitJupiter
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.mockitoCore
+ testImplementation project(':group-coordinator')
+
testRuntimeOnly runtimeTestLibs
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9465f016c8a..eb4131fbc47 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -368,6 +368,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="org.apache.kafka.trogdor" />
+ <allow pkg="org.apache.kafka.coordinator" />
<allow pkg="org.eclipse.jetty" />
<allow pkg="org.glassfish.jersey" />
</subpackage>
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 693ae2e6d3b..2c8e0cd7c74 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -333,7 +333,7 @@ public final class WorkerUtils {
return out;
}
- private static Admin createAdminClient(
+ public static Admin createAdminClient(
String bootstrapServers,
Map<String, String> commonClientConf, Map<String, String>
adminClientConf) {
Properties props = new Properties();
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
index 2ce028243a2..27df3147462 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareRoundTripWorker.java
@@ -17,22 +17,35 @@
package org.apache.kafka.trogdor.workload;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
-
+import java.util.concurrent.TimeUnit;
public class ShareRoundTripWorker extends RoundTripWorkerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ShareRoundTripWorker.class);
KafkaShareConsumer<byte[], byte[]> consumer;
-
ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
this.id = id;
this.spec = spec;
@@ -49,7 +62,16 @@ public class ShareRoundTripWorker extends
RoundTripWorkerBase {
// user may over-write the defaults with common client config and
consumer config
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.consumerConf());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-share-group-" +
id);
+ String groupId = "round-trip-share-group-" + id;
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+ try (Admin adminClient =
WorkerUtils.createAdminClient(spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf())) {
+ alterShareAutoOffsetReset(groupId, "earliest", adminClient);
+ } catch (Exception e) {
+ log.warn("Failed to set share.auto.offset.reset config to
'earliest' mode", e);
+ throw e;
+ }
+
consumer = new KafkaShareConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.subscribe(spec.activeTopics().materialize().keySet());
@@ -65,4 +87,19 @@ public class ShareRoundTripWorker extends
RoundTripWorkerBase {
Utils.closeQuietly(consumer, "consumer");
consumer = null;
}
+
+ private void alterShareAutoOffsetReset(String groupId, String newValue,
Admin adminClient) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+ try {
+ adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ .all()
+ .get(60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception was thrown while attempting
to set share.auto.offset.reset config: ", e);
+ }
+ }
}