This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 5cfb17d75d4 MINOR: Move ShareAcquireMode to consumer.internal package 
(#20973) (#21015)
5cfb17d75d4 is described below

commit 5cfb17d75d4a10a0276381c38580df0a768a7600
Author: jimmy <[email protected]>
AuthorDate: Fri Nov 28 22:45:51 2025 +0800

    MINOR: Move ShareAcquireMode to consumer.internal package (#20973) (#21015)
    
    This PR moves `ShareAcquireMode` to the `consumer.internal` package and
    addresses comments from
    https://github.com/apache/kafka/pull/20246#pullrequestreview-3422168106
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
    <[email protected]>, Chirag Wadhwa <[email protected]>,
    Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   1 +
 .../consumer/{ => internals}/ShareAcquireMode.java |   9 +-
 .../internals/ShareConsumeRequestManager.java      |   1 -
 .../consumer/internals/ShareFetchConfig.java       |   1 -
 .../kafka/common/requests/ShareFetchRequest.java   |   2 +-
 .../consumer/internals/ShareAcquireModeTest.java   |  57 ++++++++
 .../internals/ShareConsumeRequestManagerTest.java  |   1 -
 .../internals/ShareSessionHandlerTest.java         |   1 -
 .../java/kafka/server/share/SharePartition.java    |   4 +-
 .../kafka/server/share/DelayedShareFetchTest.java  |   2 +-
 .../kafka/server/share/ShareFetchUtilsTest.java    |   2 +-
 .../server/share/SharePartitionManagerTest.java    |   2 +-
 .../kafka/server/share/SharePartitionTest.java     | 143 ++++++++++++++++++---
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 .../server/ShareFetchAcknowledgeRequestTest.scala  |   2 +-
 .../kafka/server/share/fetch/ShareFetch.java       |   2 +-
 .../kafka/server/share/fetch/ShareFetchTest.java   |   2 +-
 18 files changed, 200 insertions(+), 36 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 2f527bdf9a1..7ad52c21226 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
similarity index 93%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
index ec19a43e730..ac8568d536f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcquireMode.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer;
+package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
@@ -29,7 +29,7 @@ public enum ShareAcquireMode {
 
     public final String name;
 
-    public final byte id;
+    final byte id;
 
     ShareAcquireMode(final String name, final byte id) {
         this.name = name;
@@ -40,6 +40,9 @@ public enum ShareAcquireMode {
      * Case-insensitive acquire mode lookup by string name.
      */
     public static ShareAcquireMode of(final String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("ShareAcquireMode is null");
+        }
         try {
             return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT));
         } catch (IllegalArgumentException e) {
@@ -65,7 +68,7 @@ public enum ShareAcquireMode {
 
     @Override
     public String toString() {
-        return "ShareAcquireMode(" + name + " (" + id + "))";
+        return name;
     }
 
     public static class Validator implements ConfigDef.Validator {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index fa8f1fd3eed..183f15833aa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
index 3c606e1f18d..5dd4c1db04c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchConfig.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
 import org.apache.kafka.common.IsolationLevel;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 53f9373d35f..de977fe31b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
new file mode 100644
index 00000000000..ccfa866c90d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcquireModeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShareAcquireModeTest {
+
+    @Test
+    public void testFromString() {
+        assertEquals(ShareAcquireMode.BATCH_OPTIMIZED, 
ShareAcquireMode.of("batch_optimized"));
+        assertEquals(ShareAcquireMode.BATCH_OPTIMIZED, 
ShareAcquireMode.of("BATCH_OPTIMIZED"));
+        assertEquals(ShareAcquireMode.RECORD_LIMIT, 
ShareAcquireMode.of("record_limit"));
+        assertEquals(ShareAcquireMode.RECORD_LIMIT, 
ShareAcquireMode.of("RECORD_LIMIT"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcquireMode.of("invalid_mode"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcquireMode.of(""));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcquireMode.of(null));
+    }
+
+    @Test
+    public void testValidator() {
+        ShareAcquireMode.Validator validator = new 
ShareAcquireMode.Validator();
+        assertDoesNotThrow(() -> validator.ensureValid("test", 
"batch_optimized"));
+        assertDoesNotThrow(() -> validator.ensureValid("test", 
"BATCH_OPTIMIZED"));
+        assertDoesNotThrow(() -> validator.ensureValid("test", 
"record_limit"));
+        assertDoesNotThrow(() -> validator.ensureValid("test", 
"RECORD_LIMIT"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "invalid_mode"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", ""));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", null));
+    }
+
+    @Test
+    public void testValidatorToString() {
+        ShareAcquireMode.Validator validator = new 
ShareAcquireMode.Validator();
+        assertEquals("[batch_optimized, record_limit]", validator.toString());
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index e024818c427..b6040950ccd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 870f4ac7b57..72d5d7c4e21 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index ba3a142cf0c..f5ad66d4761 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -20,7 +20,7 @@ import kafka.server.ReplicaManager;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
@@ -1836,13 +1836,13 @@ public class SharePartition {
                         sharePartitionMetrics);
                     int delayMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
                     long lastOffset = acquiredRecords.firstOffset() + 
maxFetchRecords - 1;
-                    acquiredRecords.setLastOffset(lastOffset);
                     inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, 
delayMs);
                     updateFindNextFetchOffset(true);
 
                     cachedState.put(acquiredRecords.firstOffset(), 
inFlightBatch);
                     sharePartitionMetrics.recordInFlightBatchMessageCount(
                         acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1);
+                    acquiredRecords.setLastOffset(lastOffset);
                     return List.of(acquiredRecords);
                 }
             }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index a7b138a3708..63869fdf17f 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -21,7 +21,7 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicaQuota;
 
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 37aa3c8c28b..a0257ac65dc 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -18,7 +18,7 @@ package kafka.server.share;
 
 import kafka.server.ReplicaManager;
 
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 5fdf84363a4..84e2c40a843 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -22,7 +22,7 @@ import kafka.server.ReplicaQuota;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 8e8240377e6..d757e1c2eb7 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -22,7 +22,7 @@ import kafka.server.share.SharePartition.SharePartitionState;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -10293,13 +10293,14 @@ public class SharePartitionTest {
     }
 
     @Test
-    public void testAcquireSingleBatchInRecordLimitMode() {
+    public void testAcquireSingleBatchInRecordLimitMode() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
-        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+        SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withPersister(persister)
-            .build());
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
 
         // Member-1 attempts to acquire records in strict mode with a maximum 
fetch limit of 5 records.
         MemoryRecords records = memoryRecords(10);
@@ -10359,16 +10360,26 @@ public class SharePartitionTest {
         assertEquals("member-2", 
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
         assertEquals("member-2", 
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
10,
+            "In-flight message count should be 10.");
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
     public void testAcquireMultipleBatchesInRecordLimitMode() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
-        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+        SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withPersister(persister)
-            .build());
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
 
         // Create 3 batches of records.
         ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10402,16 +10413,26 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).offsetState().get(19L).state());
         assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(10L).offsetState().get(19L).memberId());
         assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).offsetState().get(20L).state());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
20,
+            "In-flight message count should be 20.");
+        assertEquals(20, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(20, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(20, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
-    public void testAcquireWhenInsufficientRecordsInRecordLimitMode() {
+    public void testAcquireWhenInsufficientRecordsInRecordLimitMode() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
-        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+        SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withPersister(persister)
-            .build());
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
 
         // Create 3 batches of records.
         ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10442,12 +10463,22 @@ public class SharePartitionTest {
         assertNull(sharePartition.cachedState().get(10L).offsetState());
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
         assertEquals(1, sharePartition.timer().size());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
15,
+            "In-flight message count should be 15.");
+        assertEquals(15, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(15, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(15, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
-    public void 
testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() {
+    public void 
testAcquireAndAcknowledgeMultipleSubsetRecordInRecordLimitMode() throws 
InterruptedException {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
+            .withSharePartitionMetrics(sharePartitionMetrics)
             .build();
 
         ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10531,11 +10562,25 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(20L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, "member-2"));
 
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        // End offset(20) - Start offset(10) + 1 = 11
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
11,
+            "In-flight message count should be 11.");
+        // 16 messages(5-20)
+        assertEquals(16, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(16, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(16, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
-    public void 
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() {
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+    public void 
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() throws 
InterruptedException {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
         MemoryRecords records = memoryRecords(5);
 
         List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
@@ -10579,6 +10624,15 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
         expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(0L).offsetState());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 2,
+            "In-flight batch count should be 2.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
10,
+            "In-flight message count should be 10.");
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(2, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
@@ -10659,6 +10713,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withState(SharePartitionState.ACTIVE)
+            .withSharePartitionMetrics(sharePartitionMetrics)
             .build();
         fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
 
@@ -10671,6 +10726,7 @@ public class SharePartitionTest {
             DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
             () -> assertionFailedMessage(sharePartition, Map.of(10L, 
List.of())));
 
+        assertEquals(5, 
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
         List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
             MEMBER_ID,
             ShareAcquireMode.RECORD_LIMIT,
@@ -10694,16 +10750,26 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
5,
+            "In-flight message count should be 5.");
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
     public void 
testAcquisitionLockTimeoutMultipleRecordBatchInRecordLimitMode() throws 
InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
-        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+        SharePartition sharePartition = SharePartitionBuilder.builder()
             .withState(SharePartitionState.ACTIVE)
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withPersister(persister)
-            .build());
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
 
         // Create 3 batches of records.
         ByteBuffer buffer = ByteBuffer.allocate(4096);
@@ -10749,6 +10815,12 @@ public class SharePartitionTest {
                 sharePartition.timer().size() == 0,
             DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
             () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L))));
+
+        assertEquals(2, 
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
         // Acquisition lock timeout task has run already and next fetch offset 
is moved to 0.
         assertEquals(0, sharePartition.nextFetchOffset());
         assertEquals(1, 
sharePartition.cachedState().get(0L).offsetState().get(0L).deliveryCount());
@@ -10783,11 +10855,20 @@ public class SharePartitionTest {
             DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
             () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L))));
 
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 1,
+            "In-flight batch count should be 1.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
5,
+            "In-flight message count should be 5.");
+        assertEquals(3, 
sharePartitionMetrics.acquisitionLockTimeoutPerSec().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(1, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
         assertEquals(0, sharePartition.nextFetchOffset());
     }
 
     @Test
-    public void 
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
 {
+    public void 
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
 throws InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
         ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
         
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
@@ -10799,7 +10880,10 @@ public class SharePartitionTest {
                     ))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
 
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
         sharePartition.maybeInitialize();
 
         // Creating 2 batches starting from 16, such that there is a natural 
gap from 11 to 15
@@ -10838,10 +10922,19 @@ public class SharePartitionTest {
 
         GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
         assertNull(persisterReadResultGapWindow);
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 5,
+            "In-flight batch count should be 5.");
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
45,
+            "In-flight message count should be 45.");
+        assertEquals(45, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
-    public void 
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
 {
+    public void 
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
 throws InterruptedException {
         Persister persister = Mockito.mock(Persister.class);
         ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
         
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
@@ -10853,7 +10946,10 @@ public class SharePartitionTest {
                     ))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
 
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
         sharePartition.maybeInitialize();
 
         // Creating 3 batches starting from 11, such that there is a natural 
gap from 36 to 40
@@ -10893,6 +10989,17 @@ public class SharePartitionTest {
         // Gap still exists from 36 to 40
         assertEquals(36L, persisterReadResultGapWindow.gapStartOffset());
         assertEquals(50L, persisterReadResultGapWindow.endOffset());
+
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_BATCH_COUNT).intValue() == 4,
+            "In-flight batch count should be 4.");
+        // End offset(50) - Start offset(11) + 1 = 40
+        TestUtils.waitForCondition(() -> 
yammerMetricValue(SharePartitionMetrics.IN_FLIGHT_MESSAGE_COUNT).longValue() == 
40,
+            "In-flight message count should be 40.");
+        // 35 messages: 10 (11-20) + 10 (21-30) + 5 (31-35) + 10 (41-50)
+        assertEquals(35, 
sharePartitionMetrics.inFlightBatchMessageCount().sum());
+        assertEquals(4, 
sharePartitionMetrics.inFlightBatchMessageCount().count());
+        assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().min());
+        assertEquals(10, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
     @Test
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2b96099afcb..347ba953ca2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, 
ListGroupsOptions, NewTopic}
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData, 
StreamsRebalanceListener}
+import org.apache.kafka.clients.consumer.internals.{ShareAcquireMode, 
StreamsRebalanceData, StreamsRebalanceListener}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d37991a98c6..753b831b594 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -30,7 +30,7 @@ import kafka.server.share.{DelayedShareFetch, SharePartition}
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.FetchSessionHandler
-import org.apache.kafka.clients.consumer.ShareAcquireMode
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, 
TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index a5520ab764c..597d1dc7036 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions
-import org.apache.kafka.clients.consumer.ShareAcquireMode
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, 
ClusterFeature, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords
 import org.apache.kafka.common.message.{FindCoordinatorRequestData, 
ShareAcknowledgeRequestData, ShareAcknowledgeResponseData, 
ShareFetchRequestData, ShareFetchResponseData, ShareGroupHeartbeatRequestData}
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
index 9b16a94f395..518266d9c0d 100644
--- a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.server.share.fetch;
 
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
 import org.apache.kafka.common.protocol.Errors;
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java 
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
index 013ee0c2c27..053d2f5d953 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.server.share.fetch;
 
-import org.apache.kafka.clients.consumer.ShareAcquireMode;
+import org.apache.kafka.clients.consumer.internals.ShareAcquireMode;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;


Reply via email to