This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 5cfb17d75d4 MINOR: Move ShareAcquireMode to consumer.internal package
(#20973) (#21015)
5cfb17d75d4 is described below
commit 5cfb17d75d4a10a0276381c38580df0a768a7600
Author: jimmy <[email protected]>
AuthorDate: Fri Nov 28 22:45:51 2025 +0800
MINOR: Move ShareAcquireMode to consumer.internal package (#20973) (#21015)
This PR moves `ShareAcquireMode` to the `consumer.internal` package and
addresses comments from
https://github.com/apache/kafka/pull/20246#pullrequestreview-3422168106
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>, Chirag Wadhwa <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 1 +
.../consumer/{ => internals}/ShareAcquireMode.java | 9 +-
.../internals/ShareConsumeRequestManager.java | 1 -
.../consumer/internals/ShareFetchConfig.java | 1 -
.../kafka/common/requests/ShareFetchRequest.java | 2 +-
.../consumer/internals/ShareAcquireModeTest.java | 57 ++++++++
.../internals/ShareConsumeRequestManagerTest.java | 1 -
.../internals/ShareSessionHandlerTest.java | 1 -
.../java/kafka/server/share/SharePartition.java | 4 +-
.../kafka/server/share/DelayedShareFetchTest.java | 2 +-
.../kafka/server/share/ShareFetchUtilsTest.java | 2 +-
.../server/share/SharePartitionManagerTest.java | 2 +-
.../kafka/server/share/SharePartitionTest.java | 143 ++++++++++++++++++---
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../server/ShareFetchAcknowledgeRequestTest.scala | 2 +-
.../kafka/server/share/fetch/ShareFetch.java | 2 +-
.../kafka/server/share/fetch/ShareFetchTest.java | 2 +-
18 files changed, 200 insertions(+), 36 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 2f527bdf9a1..7ad52c21226 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
similarity index 93%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
index ec19a43e730..ac8568d536f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.clients.consumer;
+package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -29,7 +29,7 @@ public enum ShareAcquireMode {
public final String name;
- public final byte id;
+ final byte id;
ShareAcquireMode(final String name, final byte id) {
this.name = name;
@@ -40,6 +40,9 @@ public enum ShareAcquireMode {
* Case-insensitive acquire mode lookup by string name.
*/
public static ShareAcquireMode of(final String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("ShareAcquireMode is null");
+ }
try {
return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
@@ -65,7 +68,7 @@ public enum ShareAcquireMode {
@Override
public String toString() {
- return "ShareAcquireMode(" + name + " (" + id + "))";
+ return name;
}
public static class Validator implements ConfigDef.Validator {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index fa8f1fd3eed..183f15833aa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
index 3c606e1f18d..5dd4c1db04c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 53f9373d35f..de977fe31b4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
new file mode 100644
index 00000000000..ccfa866c90d
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShareAcquireModeTest {
+
+ @Test
+ public void testFromString() {
+ assertEquals(ShareAcquireMode.BATCH_OPTIMIZED,
ShareAcquireMode.of("batch_optimized"));
+ assertEquals(ShareAcquireMode.BATCH_OPTIMIZED,
ShareAcquireMode.of("BATCH_OPTIMIZED"));
+ assertEquals(ShareAcquireMode.RECORD_LIMIT,
ShareAcquireMode.of("record_limit"));
+ assertEquals(ShareAcquireMode.RECORD_LIMIT,
ShareAcquireMode.of("RECORD_LIMIT"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcquireMode.of("invalid_mode"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcquireMode.of(""));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcquireMode.of(null));
+ }
+
+ @Test
+ public void testValidator() {
+ ShareAcquireMode.Validator validator = new
ShareAcquireMode.Validator();
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"batch_optimized"));
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"BATCH_OPTIMIZED"));
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"record_limit"));
+ assertDoesNotThrow(() -> validator.ensureValid("test",
"RECORD_LIMIT"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "invalid_mode"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", ""));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", null));
+ }
+
+ @Test
+ public void testValidatorToString() {
+ ShareAcquireMode.Validator validator = new
ShareAcquireMode.Validator();
+ assertEquals("[batch_optimized, record_limit]", validator.toString());
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index e024818c427..b6040950ccd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 870f4ac7b57..72d5d7c4e21 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index ba3a142cf0c..f5ad66d4761 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -20,7 +20,7 @@ import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
@@ -1836,13 +1836,13 @@ public class SharePartition {
sharePartitionMetrics);
int delayMs =
recordLockDurationMsOrDefault(groupConfigManager, groupId,
defaultRecordLockDurationMs);
long lastOffset = acquiredRecords.firstOffset() +
maxFetchRecords - 1;
- acquiredRecords.setLastOffset(lastOffset);
inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset,
delayMs);
updateFindNextFetchOffset(true);
cachedState.put(acquiredRecords.firstOffset(),
inFlightBatch);
sharePartitionMetrics.recordInFlightBatchMessageCount(
acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1);
+ acquiredRecords.setLastOffset(lastOffset);
return List.of(acquiredRecords);
}
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index a7b138a3708..63869fdf17f 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -21,7 +21,7 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 37aa3c8c28b..a0257ac65dc 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -18,7 +18,7 @@ package kafka.server.share;
import kafka.server.ReplicaManager;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 5fdf84363a4..84e2c40a843 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -22,7 +22,7 @@ import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 8e8240377e6..d757e1c2eb7 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -22,7 +22,7 @@ import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -10293,13 +10293,14 @@ public class SharePartitionTest {
}
@Test
- public void testAcquireSingleBatchInRecordLimitMode() {
+ public void testAcquireSingleBatchInRecordLimitMode() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);
- SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
- .build());
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
// Member-1 attempts to acquire records in strict mode with a maximum
fetch limit of 5 records.
MemoryRecords records = memoryRecords(10);
@@ -10359,16 +10360,26 @@ public class SharePartitionTest {
assertEquals("member-2",
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
assertEquals("member-2",
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
10,
+ "In-flight message count should be 10.");
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
public void testAcquireMultipleBatchesInRecordLimitMode() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);
- SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
- .build());
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
// Create 3 batches of records.
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10402,16 +10413,26 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).offsetState().get(19L).state());
assertEquals(MEMBER_ID,
sharePartition.cachedState().get(10L).offsetState().get(19L).memberId());
assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).offsetState().get(20L).state());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
20,
+ "In-flight message count should be 20.");
+ assertEquals(20,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(20,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(20,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
- public void testAcquireWhenInsufficientRecordsInRecordLimitMode() {
+ public void testAcquireWhenInsufficientRecordsInRecordLimitMode() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);
- SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
- .build());
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
// Create 3 batches of records.
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10442,12 +10463,22 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(10L).offsetState());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
assertEquals(1, sharePartition.timer().size());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
15,
+ "In-flight message count should be 15.");
+ assertEquals(15,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(15,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(15,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
- public void
testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() {
+ public void
testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() throws
InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
+ .withSharePartitionMetrics(sharePartitionMetrics)
.build();
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10531,11 +10562,25 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(20L, new
InFlightState(RecordState.ACQUIRED, (short) 1, "member-2"));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(5L).offsetState());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ // End offset(20) - Start offset(10) + 1 = 11
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
11,
+ "In-flight message count should be 11.");
+ // 16 messages(5-20)
+ assertEquals(16,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(16,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(16,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
- public void
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() {
- SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+ public void
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() throws
InterruptedException {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
MemoryRecords records = memoryRecords(5);
List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
@@ -10579,6 +10624,15 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(0L).offsetState());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2,
+ "In-flight batch count should be 2.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
10,
+ "In-flight message count should be 10.");
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(2,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
@@ -10659,6 +10713,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE)
+ .withSharePartitionMetrics(sharePartitionMetrics)
.build();
fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
@@ -10671,6 +10726,7 @@ public class SharePartitionTest {
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> assertionFailedMessage(sharePartition, Map.of(10L,
List.of())));
+ assertEquals(5,
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
ShareAcquireMode.RECORD_LIMIT,
@@ -10694,16 +10750,26 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
5,
+ "In-flight message count should be 5.");
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
public void
testAcquisitionLockTimeoutMultipleRecordBatchInRecordLimitMode() throws
InterruptedException {
Persister persister = Mockito.mock(Persister.class);
- SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withPersister(persister)
- .build());
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
// Create 3 batches of records.
ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10749,6 +10815,12 @@ public class SharePartitionTest {
sharePartition.timer().size() == 0,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L))));
+
+ assertEquals(2,
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().max());
// Acquisition lock timeout task has run already and next fetch offset
is moved to 0.
assertEquals(0, sharePartition.nextFetchOffset());
assertEquals(1,
sharePartition.cachedState().get(0L).offsetState().get(0L).deliveryCount());
@@ -10783,11 +10855,20 @@ public class SharePartitionTest {
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L))));
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+ "In-flight batch count should be 1.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
5,
+ "In-flight message count should be 5.");
+ assertEquals(3,
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(1,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().max());
assertEquals(0, sharePartition.nextFetchOffset());
}
@Test
- public void
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
{
+ public void
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
@@ -10799,7 +10880,10 @@ public class SharePartitionTest {
))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
- SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
sharePartition.maybeInitialize();
// Creating 2 batches starting from 16, such that there is a natural
gap from 11 to 15
@@ -10838,10 +10922,19 @@ public class SharePartitionTest {
GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
assertNull(persisterReadResultGapWindow);
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 5,
+ "In-flight batch count should be 5.");
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
45,
+ "In-flight message count should be 45.");
+ assertEquals(45,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
- public void
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
{
+ public void
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
throws InterruptedException {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
@@ -10853,7 +10946,10 @@ public class SharePartitionTest {
))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
- SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
sharePartition.maybeInitialize();
// Creating 3 batches starting from 11, such that there is a natural
gap from 36 to 40
@@ -10893,6 +10989,17 @@ public class SharePartitionTest {
// Gap still exists from 36 to 40
assertEquals(36L, persisterReadResultGapWindow.gapStartOffset());
assertEquals(50L, persisterReadResultGapWindow.endOffset());
+
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 4,
+ "In-flight batch count should be 4.");
+ // End offset(50) - Start offset(11) + 1 = 40
+ TestUtils.waitForCondition(() ->
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() ==
40,
+ "In-flight message count should be 40.");
+ // 35 messages: 10 (11-20) + 10 (21-30) + 5 (31-35) + 10 (41-50)
+ assertEquals(35,
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+ assertEquals(4,
sharePartitionMetrics.inFlightBatchMessageCount().count());
+ assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().min());
+ assertEquals(10,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
@Test
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2b96099afcb..347ba953ca2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp,
ListGroupsOptions, NewTopic}
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData,
StreamsRebalanceListener}
+import org.apache.kafka.clients.consumer.internals.{ShareAcquireMode,
StreamsRebalanceData, StreamsRebalanceListener}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d37991a98c6..753b831b594 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -30,7 +30,7 @@ import kafka.server.share.{DelayedShareFetch, SharePartition}
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.TestUtils
import org.apache.kafka.clients.FetchSessionHandler
-import org.apache.kafka.clients.consumer.ShareAcquireMode
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node,
TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index a5520ab764c..597d1dc7036 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -18,7 +18,7 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions
-import org.apache.kafka.clients.consumer.ShareAcquireMode
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode
import org.apache.kafka.common.test.api.{ClusterConfigProperty,
ClusterFeature, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords
import org.apache.kafka.common.message.{FindCoordinatorRequestData,
ShareAcknowledgeRequestData, ShareAcknowledgeResponseData,
ShareFetchRequestData, ShareFetchResponseData, ShareGroupHeartbeatRequestData}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
index 9b16a94f395..518266d9c0d 100644
--- a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
@@ -17,7 +17,7 @@
package org.apache.kafka.server.share.fetch;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
diff --git
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
index 013ee0c2c27..053d2f5d953 100644
---
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.server.share.fetch;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;