This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32c2383bfad KAFKA-19658 Tweak 
org.apache.kafka.clients.consumer.OffsetAndMetadata (#20451)
32c2383bfad is described below

commit 32c2383bfad0eddc7d10fd3927981673bbaa1b41
Author: Lan Ding <isdin...@163.com>
AuthorDate: Fri Sep 5 06:06:08 2025 +0800

    KAFKA-19658 Tweak org.apache.kafka.clients.consumer.OffsetAndMetadata 
(#20451)
    
    1. Optimize the `equals()`, `hashCode()`, and `toString()` methods in
    `OffsetAndMetadata`.
    2. Add UT and IT to these modifications.
    
    Reviewers: TengYao Chi <kiting...@gmail.com>, Sean Quah
     <sq...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/clients/consumer/PlaintextConsumerTest.java   | 13 +++++++++----
 .../kafka/clients/consumer/OffsetAndMetadata.java       | 17 +++++++++--------
 .../kafka/clients/consumer/OffsetAndMetadataTest.java   | 15 +++++++++++++++
 .../consumer/internals/CommitRequestManagerTest.java    |  3 +++
 4 files changed, 36 insertions(+), 12 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 5fd2ad20089..c69c9c35fd4 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -553,14 +553,19 @@ public class PlaintextConsumerTest {
 
             // commit sync and verify onCommit is called
             var commitCountBefore = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
-            consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
-            assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
+            consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, 
"metadata")));
+            OffsetAndMetadata metadata = 
consumer.committed(Set.of(TP)).get(TP);
+            assertEquals(2, metadata.offset());
+            assertEquals("metadata", metadata.metadata());
             assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
 
             // commit async and verify onCommit is called
-            var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
+            var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
             sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
-            assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
+            metadata = consumer.committed(Set.of(TP)).get(TP);
+            assertEquals(5, metadata.offset());
+            // null metadata will be converted to an empty string
+            assertEquals("", metadata.metadata());
             assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
         }
         // cleanup
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index d6b3b947c20..f459dd5ba55 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -54,10 +54,7 @@ public class OffsetAndMetadata implements Serializable {
 
         // The server converts null metadata to an empty string. So we store 
it as an empty string as well on the client
         // to be consistent.
-        if (metadata == null)
-            this.metadata = OffsetFetchResponse.NO_METADATA;
-        else
-            this.metadata = metadata;
+        this.metadata = Objects.requireNonNullElse(metadata, 
OffsetFetchResponse.NO_METADATA);
     }
 
     /**
@@ -82,6 +79,11 @@ public class OffsetAndMetadata implements Serializable {
         return offset;
     }
 
+    /**
+     * Get the metadata of the previously consumed record.
+     *
+     * @return the metadata or empty string if no metadata
+     */
     public String metadata() {
         return metadata;
     }
@@ -106,21 +108,20 @@ public class OffsetAndMetadata implements Serializable {
         OffsetAndMetadata that = (OffsetAndMetadata) o;
         return offset == that.offset &&
                 Objects.equals(metadata, that.metadata) &&
-                Objects.equals(leaderEpoch, that.leaderEpoch);
+                Objects.equals(leaderEpoch(), that.leaderEpoch());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(offset, metadata, leaderEpoch);
+        return Objects.hash(offset, metadata, leaderEpoch());
     }
 
     @Override
     public String toString() {
         return "OffsetAndMetadata{" +
                 "offset=" + offset +
-                ", leaderEpoch=" + leaderEpoch +
+                ", leaderEpoch=" + leaderEpoch().orElse(null) +
                 ", metadata='" + metadata + '\'' +
                 '}';
     }
-
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
index 3035703ff37..c1a13c054ee 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java
@@ -65,4 +65,19 @@ public class OffsetAndMetadataTest {
         assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit 
metadata"), deserializedObject);
     }
 
+    @Test
+    public void testEqualsWithNullAndNegativeLeaderEpoch() {
+        OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L, 
Optional.empty(), "metadata");
+        OffsetAndMetadata metadataWithNegativeEpoch = new 
OffsetAndMetadata(100L, Optional.of(-1), "metadata");
+        assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch);
+        assertEquals(metadataWithNullEpoch.hashCode(), 
metadataWithNegativeEpoch.hashCode());
+    }
+
+    @Test
+    public void testEqualsWithNullAndEmptyMetadata() {
+        OffsetAndMetadata metadataWithNullMetadata = new 
OffsetAndMetadata(100L, Optional.of(1), null);
+        OffsetAndMetadata metadataWithEmptyMetadata = new 
OffsetAndMetadata(100L, Optional.of(1), "");
+        assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata);
+        assertEquals(metadataWithNullMetadata.hashCode(), 
metadataWithEmptyMetadata.hashCode());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index d4ceeedde56..7032c13b285 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -766,6 +766,7 @@ public class CommitRequestManagerTest {
 
         // Complete request with a response
         long expectedOffset = 100;
+        String expectedMetadata = "metadata";
         NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
         OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
             .setGroupId(DEFAULT_GROUP_ID)
@@ -777,6 +778,7 @@ public class CommitRequestManagerTest {
                             .setPartitionIndex(tp.partition())
                             .setCommittedOffset(expectedOffset)
                             .setCommittedLeaderEpoch(1)
+                            .setMetadata(expectedMetadata)
                     ))
             ));
         req.handler().onComplete(buildOffsetFetchClientResponse(req, 
groupResponse, false));
@@ -794,6 +796,7 @@ public class CommitRequestManagerTest {
         assertEquals(1, offsetsAndMetadata.size());
         assertTrue(offsetsAndMetadata.containsKey(tp));
         assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
+        assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata());
         assertEquals(0, 
commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
             "request should be removed from the queue when a response is 
received.");
     }

Reply via email to