This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e59c887bfd5 KAFKA-16557 Implemented OffsetFetchRequestState
toStringBase and added a test for it (#16291)
e59c887bfd5 is described below
commit e59c887bfd5b7bdb9e1407d70e24d9706e930618
Author: brenden20 <[email protected]>
AuthorDate: Thu Jun 13 02:30:05 2024 -0500
KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a
test for it (#16291)
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 25 ++++++++-----
.../internals/CommitRequestManagerTest.java | 43 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 000797dba09..ddd1a03ecd1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -490,7 +490,8 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return result;
}
- private OffsetFetchRequestState createOffsetFetchRequest(final
Set<TopicPartition> partitions,
+ // Visible for testing
+ OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition>
partitions,
final long
deadlineMs) {
return jitter.isPresent() ?
new OffsetFetchRequestState(
@@ -865,6 +866,11 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
}
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", " + memberInfo;
+ }
+
abstract void onResponse(final ClientResponse response);
abstract void removeRequest();
@@ -1078,14 +1084,9 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
@Override
- public String toString() {
- return "OffsetFetchRequestState{" +
- "requestedPartitions=" + requestedPartitions +
- ", memberId=" + memberInfo.memberId.orElse("undefined") +
- ", memberEpoch=" + (memberInfo.memberEpoch.isPresent() ?
memberInfo.memberEpoch.get() : "undefined") +
- ", future=" + future +
- ", " + toStringBase() +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() +
+ ", requestedPartitions=" + requestedPartitions;
}
}
@@ -1278,5 +1279,11 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
this.memberId = Optional.empty();
this.memberEpoch = Optional.empty();
}
+
+ @Override
+ public String toString() {
+ return "memberId=" + memberId.orElse("undefined") +
+ ", memberEpoch=" + (memberEpoch.isPresent() ?
memberEpoch.get() : "undefined");
+ }
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 1027874906c..e86858c3427 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -75,6 +75,7 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -122,6 +123,48 @@ public class CommitRequestManagerTest {
this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
+ @Test
+ public void testOffsetFetchRequestStateToStringBase() {
+ ConsumerConfig config = mock(ConsumerConfig.class);
+
+ CommitRequestManager commitRequestManager = new CommitRequestManager(
+ time,
+ logContext,
+ subscriptionState,
+ config,
+ coordinatorRequestManager,
+ offsetCommitCallbackInvoker,
+ DEFAULT_GROUP_ID,
+ Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ OptionalDouble.of(0),
+ metrics);
+
+ commitRequestManager.onMemberEpochUpdated(Optional.of(1),
Optional.empty());
+ Set<TopicPartition> requestedPartitions = Collections.singleton(new
TopicPartition("topic-1", 1));
+
+ CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState =
commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0);
+
+ TimedRequestState timedRequestState = new TimedRequestState(
+ logContext,
+ CommitRequestManager.class.getSimpleName(),
+ retryBackoffMs,
+ 2,
+ retryBackoffMaxMs,
+ 0,
+ TimedRequestState.deadlineTimer(time, 0)
+ );
+
+ String target = timedRequestState.toStringBase() +
+ ", " + offsetFetchRequestState.memberInfo +
+ ", requestedPartitions=" +
offsetFetchRequestState.requestedPartitions;
+
+ assertDoesNotThrow(timedRequestState::toString);
+ assertFalse(target.contains("Optional"));
+ assertEquals(target, offsetFetchRequestState.toStringBase());
+ }
+
@Test
public void testPollSkipIfCoordinatorUnknown() {
CommitRequestManager commitRequestManager = create(false, 0);