This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee2204f2130 KAFKA-20161 Group configuration for
share.renew.acknowledge.enable (#21467)
ee2204f2130 is described below
commit ee2204f2130d6f7c5710f57c90c85ae835be14d6
Author: Lan Ding <[email protected]>
AuthorDate: Mon Mar 9 05:57:30 2026 +0800
KAFKA-20161 Group configuration for share.renew.acknowledge.enable (#21467)
Introduce the `share.renew.acknowledge.enable` group configuration
(KIP-1222) to control whether renew acknowledgements are permitted for a
share group.
- Add new boolean dynamic group config
`share.renew.acknowledge.enable` (default false) in `GroupConfig`
- When disabled, RENEW acknowledgements are rejected with
`INVALID_RECORD_STATE` error code
- Update existing RENEW tests to explicitly enable the config
- Add unit tests and integration test for the disabled-by-default
behavior
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 42 ++++++++++
.../java/kafka/server/share/ShareFetchUtils.java | 33 --------
.../java/kafka/server/share/SharePartition.java | 25 ++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../kafka/server/share/ShareFetchUtilsTest.java | 20 -----
.../kafka/server/share/SharePartitionTest.java | 88 +++++++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../kafka/coordinator/group/GroupConfig.java | 20 +++++
.../modern/share/ShareGroupConfigProvider.java | 73 +++++++++++++++++
.../kafka/coordinator/group/GroupConfigTest.java | 10 +++
.../modern/share/ShareGroupConfigProviderTest.java | 94 ++++++++++++++++++++++
11 files changed, 351 insertions(+), 65 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index e3de0ea3442..8a08a046587 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3082,6 +3082,35 @@ public class ShareConsumerTest {
verifyYammerMetricCount("ackType=Renew", 0);
}
+ @ClusterTest
+ public void testRenewAcknowledgementDisabled() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareRenewAcknowledgeEnable("group1", false);
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+
+ for (ConsumerRecord<byte[], byte[]> rec : records) {
+ shareConsumer.acknowledge(rec, AcknowledgeType.RENEW);
+ }
+
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ Optional<KafkaException> error = result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic()));
+ assertTrue(error.isPresent());
+ assertInstanceOf(InvalidRecordStateException.class, error.get());
+ }
+ }
+
@ClusterTest(
brokers = 1,
serverProperties = {
@@ -4325,6 +4354,19 @@ public class ShareConsumerTest {
}
}
+ private void alterShareRenewAcknowledgeEnable(String groupId, boolean
newValue) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
Boolean.toString(newValue)), AlterConfigOp.OpType.SET)));
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+ try (Admin adminClient = createAdminClient()) {
+ assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ .all()
+ .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+ }
+ }
+
private List<Integer> topicPartitionLeader(Admin adminClient, String
topicName, int partition) throws InterruptedException, ExecutionException {
return
adminClient.describeTopics(List.of(topicName)).allTopicNames().get().get(topicName)
.partitions().stream()
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index e16a2a43a63..ba3f03a4cca 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -259,38 +258,6 @@ public class ShareFetchUtils {
}
}
- /**
- * The method is used to get the record lock duration for the group. If
the group config is present,
- * then the record lock duration is returned. Otherwise, the default value
is returned.
- *
- * @param groupConfigManager The group config manager.
- * @param groupId The group id for which the record lock duration is to be
fetched.
- * @param defaultValue The default value to be returned if the group
config is not present.
- * @return The record lock duration for the group.
- */
- public static int recordLockDurationMsOrDefault(GroupConfigManager
groupConfigManager, String groupId, int defaultValue) {
- if (groupConfigManager.groupConfig(groupId).isPresent()) {
- return
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
- }
- return defaultValue;
- }
-
- /**
- * The method is used to get the delivery count limit for the group. If
the group config is present,
- * then the delivery count limit is returned. Otherwise, the default value
is returned.
- *
- * @param groupConfigManager The group config manager.
- * @param groupId The group id for which the delivery count limit is to be
fetched.
- * @param defaultValue The default value to be returned if the group
config is not present.
- * @return The delivery count limit for the group.
- */
- public static int deliveryCountLimitOrDefault(GroupConfigManager
groupConfigManager, String groupId, int defaultValue) {
- if (groupConfigManager.groupConfig(groupId).isPresent()) {
- return
groupConfigManager.groupConfig(groupId).get().shareDeliveryCountLimit();
- }
- return defaultValue;
- }
-
/**
* Merges contiguous AcquiredRecords with the same delivery count into
single records.
* <p>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index fad0618460b..4c8fc717f5d 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
@@ -91,11 +92,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static kafka.server.share.ShareFetchUtils.deliveryCountLimitOrDefault;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
-import static kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
/**
* The SharePartition is used to track the state of a partition that is shared
between multiple
@@ -213,6 +212,11 @@ public class SharePartition {
*/
private final GroupConfigManager groupConfigManager;
+ /**
+ * The provider used to retrieve share group dynamic configuration values.
+ */
+ private final ShareGroupConfigProvider configProvider;
+
/**
* This is the default value which is used unless the group has a
configuration which overrides it.
* The record lock duration is used to limit the duration for which a
consumer can acquire a record.
@@ -381,6 +385,7 @@ public class SharePartition {
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
+ this.configProvider = new ShareGroupConfigProvider(groupConfigManager);
this.fetchOffsetMetadata = new OffsetMetadata();
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId,
topicIdPartition);
this.listener = listener;
@@ -1851,7 +1856,7 @@ public class SharePartition {
null,
timeoutHandler,
sharePartitionMetrics);
- int delayMs =
recordLockDurationMsOrDefault(groupConfigManager, groupId,
defaultRecordLockDurationMs);
+ int delayMs =
configProvider.recordLockDurationMsOrDefault(groupId,
defaultRecordLockDurationMs);
long lastOffset = acquiredRecords.firstOffset() +
maxFetchRecords - 1;
inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset,
delayMs);
updateFindNextFetchOffset(true);
@@ -2298,6 +2303,11 @@ public class SharePartition {
byte ackType = ackTypeMap.size() > 1 ?
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
if (ackType == AcknowledgeType.RENEW.id) {
+ if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+ log.debug("Renew acknowledge is not enabled for the
group: {}", groupId);
+ return Optional.of(new InvalidRecordStateException(
+ "Renewing acquisition locks is not enabled for the
group."));
+ }
// If RENEW, renew the acquisition lock timer for this
offset and continue without changing state.
// We do not care about recordState map here.
// Only valid for ACQUIRED offsets; the check above
ensures this.
@@ -2371,6 +2381,11 @@ public class SharePartition {
// Before reaching this point, it should be verified that it is
full batch ack and
// not per offset ack as well as startOffset not moved.
if (ackType == AcknowledgeType.RENEW.id) {
+ if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+ log.debug("Renew acknowledge is not enabled for the group:
{}", groupId);
+ return Optional.of(new InvalidRecordStateException(
+ "Renewing acquisition locks is not enabled for the
group."));
+ }
// Renew the acquisition lock timer for the complete batch. We
have already
// checked that the batchState is ACQUIRED above.
log.debug("Renewing acquisition lock for {}-{} with batch
{}-{} for member {}.",
@@ -2831,7 +2846,7 @@ public class SharePartition {
// The recordLockDuration value would depend on whether the dynamic
config SHARE_RECORD_LOCK_DURATION_MS in
// GroupConfig.java is set or not. If dynamic config is set, then that
is used, otherwise the value of
// SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG defined in
ShareGroupConfig is used
- int recordLockDurationMs =
recordLockDurationMsOrDefault(groupConfigManager, groupId,
defaultRecordLockDurationMs);
+ int recordLockDurationMs =
configProvider.recordLockDurationMsOrDefault(groupId,
defaultRecordLockDurationMs);
return scheduleAcquisitionLockTimeout(memberId, firstOffset,
lastOffset, recordLockDurationMs);
}
@@ -3313,7 +3328,7 @@ public class SharePartition {
* config if available, otherwise the broker default.
*/
int maxDeliveryCount() {
- return deliveryCountLimitOrDefault(groupConfigManager, groupId,
defaultMaxDeliveryCount);
+ return configProvider.deliveryCountLimitOrDefault(groupId,
defaultMaxDeliveryCount);
}
/**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 00659864384..00b3b11495a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.share.{ShareFetchUtils, SharePartitionManager}
+import kafka.server.share.SharePartitionManager
import kafka.utils.Logging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.EndpointType
@@ -57,6 +57,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider
import org.apache.kafka.coordinator.group.{Group, GroupConfig,
GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
@@ -120,6 +121,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val configManager = new ConfigAdminManager(brokerId, config,
configRepository)
val describeTopicPartitionsRequestHandler = new
DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)
+ val shareGroupConfigProvider = new
ShareGroupConfigProvider(groupConfigManager)
def close(): Unit = {
aclApis.close()
@@ -4061,7 +4063,7 @@ class KafkaApis(val requestChannel: RequestChannel,
0,
partitions,
nodeEndpoints.values.toList.asJava,
- ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager,
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs)
+ shareGroupConfigProvider.recordLockDurationMsOrDefault(groupId,
config.shareGroupConfig.shareGroupRecordLockDurationMs)
)
}
@@ -4159,7 +4161,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Prepare share fetch response
val response =
ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs,
responseData, nodeEndpoints.values.toList.asJava,
- ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager,
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs))
+ shareGroupConfigProvider.recordLockDurationMsOrDefault(groupId,
config.shareGroupConfig.shareGroupRecordLockDurationMs))
// record the bytes out metrics only when the response is being sent.
response.data.responses.forEach { topicResponse =>
topicResponse.partitions.forEach { data =>
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 0b6239ec68b..08ce763b8ad 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,8 +34,6 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.coordinator.group.GroupConfig;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -729,24 +727,6 @@ public class ShareFetchUtilsTest {
assertArrayEquals(input.toArray(), result.toArray());
}
- @Test
- void testDeliveryCountLimitOrDefaultWithGroupConfig() {
- GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
- GroupConfig groupConfig = mock(GroupConfig.class);
- when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
-
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
-
- assertEquals(8,
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group",
5));
- }
-
- @Test
- void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
- GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
-
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
-
- assertEquals(5,
ShareFetchUtils.deliveryCountLimitOrDefault(groupConfigManager, "test-group",
5));
- }
-
private static class RecordsArgumentsProvider implements ArgumentsProvider
{
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 88b173c5239..af998054020 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -7157,7 +7157,7 @@ public class SharePartitionTest {
AcquisitionLockTimerTask timerTask =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
- Mockito.verify(groupConfigManager,
Mockito.times(2)).groupConfig(GROUP_ID);
+ Mockito.verify(groupConfigManager,
Mockito.times(1)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig).shareRecordLockDurationMs();
assertEquals(expectedDurationMs, timerTask.delayMs);
}
@@ -7179,13 +7179,13 @@ public class SharePartitionTest {
AcquisitionLockTimerTask timerTask1 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
- Mockito.verify(groupConfigManager,
Mockito.times(2)).groupConfig(GROUP_ID);
+ Mockito.verify(groupConfigManager,
Mockito.times(1)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig).shareRecordLockDurationMs();
assertEquals(expectedDurationMs1, timerTask1.delayMs);
AcquisitionLockTimerTask timerTask2 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
- Mockito.verify(groupConfigManager,
Mockito.times(4)).groupConfig(GROUP_ID);
+ Mockito.verify(groupConfigManager,
Mockito.times(2)).groupConfig(GROUP_ID);
Mockito.verify(groupConfig,
Mockito.times(2)).shareRecordLockDurationMs();
assertEquals(expectedDurationMs2, timerTask2.delayMs);
}
@@ -10381,6 +10381,79 @@ public class SharePartitionTest {
Mockito.verify(persister, Mockito.times(1)).writeState(Mockito.any());
}
+ @Test
+ public void testRenewAcknowledgeDisabledWithCompleteBatchAck() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+ .build();
+
+ List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition,
memoryRecords(0, 1), 1);
+ assertEquals(1, records.size());
+ assertEquals(1, sharePartition.cachedState().size());
+
+ CompletableFuture<Void> future = sharePartition.acknowledge(MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 0,
List.of(AcknowledgeType.RENEW.id))));
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail("No exception thrown");
+ } catch (Exception e) {
+ assertNotNull(e);
+ assertInstanceOf(InvalidRecordStateException.class, e.getCause());
+ assertTrue(e.getCause().getMessage().contains("Renewing
acquisition locks is not enabled for the group."));
+ }
+
+ // The batch should still be in ACQUIRED state since the renew was
rejected.
+ InFlightBatch batch = sharePartition.cachedState().get(0L);
+ assertEquals(RecordState.ACQUIRED, batch.batchState());
+ Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any());
+ }
+
+ @Test
+ public void testRenewAcknowledgeDisabledWithPerOffsetAck() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withMaxDeliveryCount(2)
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+ .build();
+
+ List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition,
memoryRecords(0, 2), 2);
+ assertEquals(1, records.size());
+ assertEquals(1, sharePartition.cachedState().size());
+
+ CompletableFuture<Void> future = sharePartition.acknowledge(MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 1,
+ List.of(AcknowledgeType.RENEW.id,
AcknowledgeType.ACCEPT.id))));
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail("No exception thrown");
+ } catch (Exception e) {
+ assertNotNull(e);
+ assertInstanceOf(InvalidRecordStateException.class, e.getCause());
+ assertTrue(e.getCause().getMessage().contains("Renewing
acquisition locks is not enabled for the group."));
+ }
+
+ // The offsets should still be in ACQUIRED state since the renew was
rejected
+ // at the first offset. The batch has per-offset state at this point
because the
+ // ack had per-offset ack types.
+ InFlightBatch batch = sharePartition.cachedState().get(0L);
+ assertNotNull(batch.offsetState());
+ assertEquals(RecordState.ACQUIRED,
batch.offsetState().get(0L).state());
+ assertEquals(RecordState.ACQUIRED,
batch.offsetState().get(1L).state());
+ Mockito.verify(persister, Mockito.times(0)).writeState(Mockito.any());
+ }
+
@Test
public void testAcquireSingleBatchInRecordLimitMode() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);
@@ -12354,6 +12427,15 @@ public class SharePartitionTest {
assertFalse(sharePartition.cachedState().isEmpty());
}
+ private static GroupConfigManager groupConfigManagerWithRenewDisabled() {
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
Mockito.when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
+ return groupConfigManager;
+ }
+
private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7ac893f378d..48268685da2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
S [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -362,6 +362,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
+ cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT)
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index bff700cd73d..d7ef5c0a407 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -32,6 +32,7 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -71,6 +72,10 @@ public final class GroupConfig extends AbstractConfig {
"If set to \"read_uncommitted\", the share group will return all
messages, even transactional messages which have been aborted. " +
"Non-transactional records will be returned unconditionally in either
mode.";
+ public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG =
"share.renew.acknowledge.enable";
+ public static final boolean SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT = true;
+ public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC = "Whether
the renew acknowledge type is enabled for the share group.";
+
public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG =
"streams.session.timeout.ms";
public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG =
"streams.heartbeat.interval.ms";
@@ -103,6 +108,8 @@ public final class GroupConfig extends AbstractConfig {
public final String shareIsolationLevel;
+ public final boolean shareRenewAcknowledgeEnable;
+
private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@@ -152,6 +159,11 @@ public final class GroupConfig extends AbstractConfig {
in(IsolationLevel.READ_COMMITTED.toString(),
IsolationLevel.READ_UNCOMMITTED.toString()),
MEDIUM,
SHARE_ISOLATION_LEVEL_DOC)
+ .define(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
+ BOOLEAN,
+ SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT,
+ MEDIUM,
+ SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC)
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -191,6 +203,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs =
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
+ this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
}
public static ConfigDef configDef() {
@@ -434,6 +447,13 @@ public final class GroupConfig extends AbstractConfig {
throw new IllegalArgumentException("Unknown Share isolation level:
" + shareIsolationLevel);
}
}
+
+ /**
+ * The share group renew acknowledge enable.
+ */
+ public boolean shareRenewAcknowledgeEnable() {
+ return shareRenewAcknowledgeEnable;
+ }
public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" +
config));
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
new file mode 100644
index 00000000000..7e4ec55e5ff
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+
+/**
+ * A provider that retrieves share group dynamic configuration values,
+ * falling back to default values when group-specific configurations are not
present.
+ */
+public class ShareGroupConfigProvider {
+ private final GroupConfigManager manager;
+
+ public ShareGroupConfigProvider(GroupConfigManager manager) {
+ this.manager = manager;
+ }
+
+ /**
+ * The method is used to get the record lock duration for the group. If
the group config is present,
+ * then the record lock duration is returned. Otherwise, the default value
is returned.
+ *
+ * @param groupId The group id for which the record lock duration is to be
fetched.
+ * @param defaultValue The default value to be returned if the group
config is not present.
+ * @return The record lock duration for the group.
+ */
+ public int recordLockDurationMsOrDefault(String groupId, int defaultValue)
{
+ return manager.groupConfig(groupId).
+ map(GroupConfig::shareRecordLockDurationMs).
+ orElse(defaultValue);
+ }
+
+ /**
+ * The method is used to get the delivery count limit for the group. If
the group config is present,
+ * then the delivery count limit is returned. Otherwise, the default value
is returned.
+ *
+ * @param groupId The group id for which the delivery count limit is to be
fetched.
+ * @param defaultValue The default value to be returned if the group
config is not present.
+ * @return The delivery count limit for the group.
+ */
+ public int deliveryCountLimitOrDefault(String groupId, int defaultValue) {
+ return manager.groupConfig(groupId)
+ .map(GroupConfig::shareDeliveryCountLimit)
+ .orElse(defaultValue);
+ }
+
+ /**
+ * The method is used to check if renew acknowledge is enabled for the
group. If the group config
+ * is present, then the value from the group config is used. Otherwise,
the default value is used.
+ *
+ * @param groupId The group id for which the renew acknowledge enable is
to be checked.
+ * @return true if renew acknowledge is enabled for the group, false
otherwise.
+ */
+ public boolean isRenewAcknowledgeEnabled(String groupId) {
+ return manager.groupConfig(groupId)
+ .map(GroupConfig::shareRenewAcknowledgeEnable)
+ .orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 6f5e1cf1f13..cc46f04a9b3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -64,6 +64,8 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "hello", "1.0");
} else if (GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG.equals(name)) {
assertPropertyInvalid(name, "hello", "1.0");
+ } else if
(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_boolean", "1");
} else if
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -228,6 +230,11 @@ public class GroupConfigTest {
// Check for invalid shareIsolationLevel.
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_uncommit");
doTestInvalidProps(props, ConfigException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareRenewAcknowledgeEnable.
+ props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "1");
+ doTestInvalidProps(props, ConfigException.class);
}
private void doTestInvalidProps(Properties props, Class<? extends
Exception> exceptionClassName) {
@@ -253,6 +260,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+ defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -270,6 +278,7 @@ public class GroupConfigTest {
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+ assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
@Test
@@ -294,6 +303,7 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
+ props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
return props;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
new file mode 100644
index 00000000000..d8cd49bae79
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ShareGroupConfigProviderTest {
+ private ShareGroupConfigProvider provider;
+
+ @Test
+ void testRecordLockDurationMsOrDefaultWithGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+ when(groupConfig.shareRecordLockDurationMs()).thenReturn(1000);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(1000,
provider.recordLockDurationMsOrDefault("test-group", 100));
+ }
+
+ @Test
+ void testRecordLockDurationMsOrDefaultWithoutGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(100, provider.recordLockDurationMsOrDefault("test-group",
100));
+ }
+
+ @Test
+ void testDeliveryCountLimitOrDefaultWithGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(8, provider.deliveryCountLimitOrDefault("test-group", 5));
+ }
+
+ @Test
+ void testDeliveryCountLimitOrDefaultWithoutGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(5, provider.deliveryCountLimitOrDefault("test-group", 5));
+ }
+
+ @Test
+ void testIsRenewAcknowledgeDisabledWithGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+ when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertFalse(provider.isRenewAcknowledgeEnabled("test-group"));
+ }
+
+ @Test
+ void testIsRenewAcknowledgeEnabledWithoutGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertTrue(provider.isRenewAcknowledgeEnabled("test-group"));
+ }
+}