This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e59c887bfd5 KAFKA-16557 Implemented OffsetFetchRequestState 
toStringBase and added a test for it (#16291)
e59c887bfd5 is described below

commit e59c887bfd5b7bdb9e1407d70e24d9706e930618
Author: brenden20 <[email protected]>
AuthorDate: Thu Jun 13 02:30:05 2024 -0500

    KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a 
test for it (#16291)
    
    Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   | 25 ++++++++-----
 .../internals/CommitRequestManagerTest.java        | 43 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 000797dba09..ddd1a03ecd1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -490,7 +490,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         return result;
     }
 
-    private OffsetFetchRequestState createOffsetFetchRequest(final 
Set<TopicPartition> partitions,
+    // Visible for testing
+    OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition> 
partitions,
                                                              final long 
deadlineMs) {
         return jitter.isPresent() ?
             new OffsetFetchRequestState(
@@ -865,6 +866,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             }
         }
 
+        @Override
+        public String toStringBase() {
+            return super.toStringBase() + ", " + memberInfo;
+        }
+
         abstract void onResponse(final ClientResponse response);
 
         abstract void removeRequest();
@@ -1078,14 +1084,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         }
 
         @Override
-        public String toString() {
-            return "OffsetFetchRequestState{" +
-                    "requestedPartitions=" + requestedPartitions +
-                    ", memberId=" + memberInfo.memberId.orElse("undefined") +
-                    ", memberEpoch=" + (memberInfo.memberEpoch.isPresent() ? 
memberInfo.memberEpoch.get() : "undefined") +
-                    ", future=" + future +
-                    ", " + toStringBase() +
-                    '}';
+        public String toStringBase() {
+            return super.toStringBase() +
+                    ", requestedPartitions=" + requestedPartitions;
         }
     }
 
@@ -1278,5 +1279,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             this.memberId = Optional.empty();
             this.memberEpoch = Optional.empty();
         }
+
+        @Override
+        public String toString() {
+            return "memberId=" + memberId.orElse("undefined") +
+                    ", memberEpoch=" + (memberEpoch.isPresent() ? 
memberEpoch.get() : "undefined");
+        }
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 1027874906c..e86858c3427 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -75,6 +75,7 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -122,6 +123,48 @@ public class CommitRequestManagerTest {
         this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
     }
 
+    @Test
+    public void testOffsetFetchRequestStateToStringBase() {
+        ConsumerConfig config = mock(ConsumerConfig.class);
+
+        CommitRequestManager commitRequestManager = new CommitRequestManager(
+                time,
+                logContext,
+                subscriptionState,
+                config,
+                coordinatorRequestManager,
+                offsetCommitCallbackInvoker,
+                DEFAULT_GROUP_ID,
+                Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                OptionalDouble.of(0),
+                metrics);
+
+        commitRequestManager.onMemberEpochUpdated(Optional.of(1), 
Optional.empty());
+        Set<TopicPartition> requestedPartitions = Collections.singleton(new 
TopicPartition("topic-1", 1));
+
+        CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = 
commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0);
+
+        TimedRequestState timedRequestState = new TimedRequestState(
+                logContext,
+                CommitRequestManager.class.getSimpleName(),
+                retryBackoffMs,
+                2,
+                retryBackoffMaxMs,
+                0,
+                TimedRequestState.deadlineTimer(time, 0)
+        );
+
+        String target = timedRequestState.toStringBase() +
+                ", " + offsetFetchRequestState.memberInfo +
+                ", requestedPartitions=" + 
offsetFetchRequestState.requestedPartitions;
+
+        assertDoesNotThrow(timedRequestState::toString);
+        assertFalse(target.contains("Optional"));
+        assertEquals(target, offsetFetchRequestState.toStringBase());
+    }
+
     @Test
     public void testPollSkipIfCoordinatorUnknown() {
         CommitRequestManager commitRequestManager = create(false, 0);

Reply via email to