This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c77953d5fa MINOR: Revert "migrate LogFetchInfo, Assignment and 
RequestAndCompletionHandler to java record" (#19177)
8c77953d5fa is described below

commit 8c77953d5fa84ce1dfdf83f73560444a4acabc1f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sat Apr 5 23:01:17 2025 +0800

    MINOR: Revert "migrate LogFetchInfo, Assignment and 
RequestAndCompletionHandler to java record" (#19177)
    
    Revert some java record migration in #19062  #18783
    
    We assume java record is purely immutable data carriers.
    As discussed in
    https://github.com/apache/kafka/pull/19062#issuecomment-2709637352, if a
    class has fields that may be mutable, we shouldn't migrate it to Java
    record because the hashcode/equals behavior are changed.
    
    * LogFetchInfo (Records)
    * Assignment (successCallback)
    * Remove `equals` method from Assignment since `Assignment` is not and
    shouldn't be used in Map/Set key.
    * RequestAndCompletionHandler (handler)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  6 +-
 .../java/org/apache/kafka/raft/LogFetchInfo.java   | 13 ++++-
 .../internals/KRaftControlRecordStateMachine.java  |  2 +-
 .../kafka/raft/KafkaRaftClientReconfigTest.java    |  4 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  4 +-
 .../test/java/org/apache/kafka/raft/MockLog.java   |  2 +-
 .../java/org/apache/kafka/raft/MockLogTest.java    | 28 ++++-----
 .../kafka/server/util/InterBrokerSendThread.java   | 10 ++--
 .../server/util/RequestAndCompletionHandler.java   | 35 +++++++++--
 .../server/util/InterBrokerSendThreadTest.java     | 34 +++++------
 .../java/org/apache/kafka/server/Assignment.java   | 68 +++++++++++++++++-----
 11 files changed, 139 insertions(+), 67 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 388ac04526a..e46a2a1ff83 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -429,7 +429,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
                 if (nextExpectedOffset < highWatermark) {
                     LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
-                    listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records());
+                    listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
                 }
             });
         }
@@ -1622,11 +1622,11 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                 LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-                if (state.updateReplicaState(replicaKey, currentTimeMs, 
info.startOffsetMetadata())) {
+                if (state.updateReplicaState(replicaKey, currentTimeMs, 
info.startOffsetMetadata)) {
                     onUpdateLeaderHighWatermark(state, currentTimeMs);
                 }
 
-                records = info.records();
+                records = info.records;
             } else {
                 records = MemoryRecords.EMPTY;
             }
diff --git a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java 
b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
index 7862fc2ab29..99b69070378 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
@@ -19,6 +19,15 @@ package org.apache.kafka.raft;
 import org.apache.kafka.common.record.Records;
 
 /**
- * Metadata for the records fetched from log, including the records itself
+ * The class is not converted to a Java record since records are typically 
intended to be immutable, but this one contains a mutable field records
  */
-public record LogFetchInfo(Records records, LogOffsetMetadata 
startOffsetMetadata) { }
+public class LogFetchInfo {
+
+    public final Records records;
+    public final LogOffsetMetadata startOffsetMetadata;
+
+    public LogFetchInfo(Records records, LogOffsetMetadata 
startOffsetMetadata) {
+        this.records = records;
+        this.startOffsetMetadata = startOffsetMetadata;
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 032f8230233..a8768f902a8 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -233,7 +233,7 @@ public final class KRaftControlRecordStateMachine {
         while (log.endOffset().offset() > nextOffset) {
             LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
             try (RecordsIterator<?> iterator = new RecordsIterator<>(
-                    info.records(),
+                    info.records,
                     serde,
                     bufferSupplier,
                     maxBatchSizeBytes,
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 199398422fb..0c4c1780919 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -128,7 +128,7 @@ public class KafkaRaftClientReconfigTest {
         context.unattachedToLeader();
 
         // check if leader writes 3 bootstrap records to the log
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         Iterator<Record> recordIterator = batch.iterator();
@@ -191,7 +191,7 @@ public class KafkaRaftClientReconfigTest {
         // check leader does not write bootstrap records to log
         context.unattachedToLeader();
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         Iterator<Record> recordIterator = batch.iterator();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 5032a5c95ac..afa5cce0521 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -913,7 +913,7 @@ public class KafkaRaftClientTest {
         context.client.poll();
         context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
@@ -962,7 +962,7 @@ public class KafkaRaftClientTest {
         context.client.poll();
         context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId, 
secondNodeId));
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index e108664b502..209accdf7a2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -513,7 +513,7 @@ public class MockLog implements ReplicatedLog {
             );
         }
 
-        long baseOffset = read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata().offset();
+        long baseOffset = read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
         if (snapshotId.offset() != baseOffset) {
             throw new IllegalArgumentException(
                 String.format(
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index e95387aca8c..3abfe5a1b12 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -214,7 +214,7 @@ public class MockLogTest {
         assertEquals(1, log.endOffset().offset());
         assertEquals(currentEpoch, log.lastFetchedEpoch());
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = log.read(0, Isolation.UNCOMMITTED).records;
         for (RecordBatch batch : records.batches()) {
             assertTrue(batch.isControlBatch());
         }
@@ -249,7 +249,7 @@ public class MockLogTest {
         assertEquals(initialOffset + 1, log.endOffset().offset());
         assertEquals(3, log.lastFetchedEpoch());
 
-        Records records = log.read(5L, Isolation.UNCOMMITTED).records();
+        Records records = log.read(5L, Isolation.UNCOMMITTED).records;
         List<ByteBuffer> extractRecords = new ArrayList<>();
         for (Record record : records.records()) {
             extractRecords.add(record.value());
@@ -275,7 +275,7 @@ public class MockLogTest {
 
         appendAsLeader(List.of(recordOne, recordTwo), epoch);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records();
+        Records records = log.read(0, Isolation.UNCOMMITTED).records;
 
         List<ByteBuffer> extractRecords = new ArrayList<>();
         for (Record record : records.records()) {
@@ -346,12 +346,12 @@ public class MockLogTest {
         appendBatch(5, 1);
 
         LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
-        assertEquals(5L, readInfo.startOffsetMetadata().offset());
-        assertTrue(readInfo.startOffsetMetadata().metadata().isPresent());
+        assertEquals(5L, readInfo.startOffsetMetadata.offset());
+        assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
 
         // Update to a high watermark with valid offset metadata
-        log.updateHighWatermark(readInfo.startOffsetMetadata());
-        assertEquals(readInfo.startOffsetMetadata().offset(), 
log.highWatermark().offset());
+        log.updateHighWatermark(readInfo.startOffsetMetadata);
+        assertEquals(readInfo.startOffsetMetadata.offset(), 
log.highWatermark().offset());
 
         // Now update to a high watermark with invalid metadata
         assertThrows(IllegalArgumentException.class, () ->
@@ -360,17 +360,17 @@ public class MockLogTest {
 
         // Ensure we can update the high watermark to the end offset
         LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
-        assertEquals(15, readFromEndInfo.startOffsetMetadata().offset());
-        
assertTrue(readFromEndInfo.startOffsetMetadata().metadata().isPresent());
-        log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
+        assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
+        assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
+        log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
 
         // Ensure that the end offset metadata is valid after new entries are 
appended
         appendBatch(5, 1);
-        log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
+        log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
 
         // Check handling of a fetch from the middle of a batch
         LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
-        assertEquals(readFromEndInfo.startOffsetMetadata(), 
readFromMiddleInfo.startOffsetMetadata());
+        assertEquals(readFromEndInfo.startOffsetMetadata, 
readFromMiddleInfo.startOffsetMetadata);
     }
 
     @Test
@@ -1002,7 +1002,7 @@ public class MockLogTest {
         while (foundRecord) {
             foundRecord = false;
 
-            Records records = log.read(currentStart, isolation).records();
+            Records records = log.read(currentStart, isolation).records;
             for (Record record : records.records()) {
                 foundRecord = true;
 
@@ -1081,7 +1081,7 @@ public class MockLogTest {
 
         int currentOffset = 0;
         while (currentOffset < log.endOffset().offset()) {
-            Records records = log.read(currentOffset, 
Isolation.UNCOMMITTED).records();
+            Records records = log.read(currentOffset, 
Isolation.UNCOMMITTED).records;
             List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
 
             assertFalse(batches.isEmpty());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
index cdd7ae3e98e..093946eb5f0 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
@@ -89,14 +89,14 @@ public abstract class InterBrokerSendThread extends 
ShutdownableThread {
     private void drainGeneratedRequests() {
         generateRequests().forEach(request ->
             unsentRequests.put(
-                request.destination(),
+                request.destination,
                 networkClient.newClientRequest(
-                    request.destination().idString(),
-                    request.request(),
-                    request.creationTimeMs(),
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
                     true,
                     requestTimeoutMs,
-                    request.handler()
+                    request.handler
                 )
             )
         );
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
index 7e2fea6af9b..da14fb5e4a4 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
@@ -20,9 +20,32 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.requests.AbstractRequest;
 
-public record RequestAndCompletionHandler(
-    long creationTimeMs,
-    Node destination,
-    AbstractRequest.Builder<? extends AbstractRequest> request,
-    RequestCompletionHandler handler
-) { }
+public final class RequestAndCompletionHandler {
+
+    public final long creationTimeMs;
+    public final Node destination;
+    public final AbstractRequest.Builder<? extends AbstractRequest> request;
+    public final RequestCompletionHandler handler;
+
+    public RequestAndCompletionHandler(
+        long creationTimeMs,
+        Node destination,
+        AbstractRequest.Builder<? extends AbstractRequest> request,
+        RequestCompletionHandler handler
+    ) {
+        this.creationTimeMs = creationTimeMs;
+        this.destination = destination;
+        this.request = request;
+        this.handler = handler;
+    }
+
+    @Override
+    public String toString() {
+        return "RequestAndCompletionHandler(" +
+            "creationTimeMs=" + creationTimeMs +
+            ", destination=" + destination +
+            ", request=" + request +
+            ", handler=" + handler +
+            ')';
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
index 7aaa4c01d93..e2417e40cfe 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
@@ -166,15 +166,15 @@ public class InterBrokerSendThreadTest {
         final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
 
         final ClientRequest clientRequest =
-            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler());
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request()),
+            same(handler.request),
             anyLong(),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler())
+            same(handler.handler)
         )).thenReturn(clientRequest);
 
         when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@@ -187,11 +187,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request()),
+                same(handler.request),
                 anyLong(),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler()));
+                same(handler.handler));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).send(same(clientRequest), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
@@ -209,15 +209,15 @@ public class InterBrokerSendThreadTest {
         final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
 
         final ClientRequest clientRequest =
-            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler());
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request()),
+            same(handler.request),
             anyLong(),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler())
+            same(handler.handler)
         )).thenReturn(clientRequest);
 
         when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@@ -236,11 +236,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request()),
+                same(handler.request),
                 anyLong(),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler()));
+                same(handler.handler));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).connectionDelay(any(), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
@@ -261,16 +261,16 @@ public class InterBrokerSendThreadTest {
 
         final ClientRequest clientRequest =
             new ClientRequest(
-                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler());
+                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler);
         time.sleep(1500L);
 
         when(networkClient.newClientRequest(
             ArgumentMatchers.eq("1"),
-            same(handler.request()),
-            ArgumentMatchers.eq(handler.creationTimeMs()),
+            same(handler.request),
+            ArgumentMatchers.eq(handler.creationTimeMs),
             ArgumentMatchers.eq(true),
             ArgumentMatchers.eq(requestTimeoutMs),
-            same(handler.handler())
+            same(handler.handler)
         )).thenReturn(clientRequest);
 
         // make the node unready so the request is not cleared
@@ -289,11 +289,11 @@ public class InterBrokerSendThreadTest {
         verify(networkClient)
             .newClientRequest(
                 ArgumentMatchers.eq("1"),
-                same(handler.request()),
-                ArgumentMatchers.eq(handler.creationTimeMs()),
+                same(handler.request),
+                ArgumentMatchers.eq(handler.creationTimeMs),
                 ArgumentMatchers.eq(true),
                 ArgumentMatchers.eq(requestTimeoutMs),
-                same(handler.handler()));
+                same(handler.handler));
         verify(networkClient).ready(any(), anyLong());
         verify(networkClient).connectionDelay(any(), anyLong());
         verify(networkClient).poll(anyLong(), anyLong());
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java 
b/server/src/main/java/org/apache/kafka/server/Assignment.java
index 680a62e08a7..df517c05568 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -25,24 +25,64 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.TopicIdPartition;
 
 /**
- * @param topicIdPartition The topic ID and partition index of the replica.
- * @param directoryId      The ID of the directory we are placing the replica 
into.
- * @param submissionTimeNs The time in monotonic nanosecond when this 
assignment was created.
- * @param successCallback  The callback to invoke on success.
- */
-record Assignment(
-    TopicIdPartition topicIdPartition,
-    Uuid directoryId,
-    long submissionTimeNs,
-    Runnable successCallback
-) {
+ * The class is not converted to a Java record since record classes are meant 
for pure data, but this one contains a Runnable
+ **/
+final class Assignment {
+    /**
+     * The topic ID and partition index of the replica.
+     */
+    private final TopicIdPartition topicIdPartition;
+
+    /**
+     * The ID of the directory we are placing the replica into.
+     */
+    private final Uuid directoryId;
+
+    /**
+     * The time in monotonic nanosecond when this assignment was created.
+     */
+    private final long submissionTimeNs;
+
+    /**
+     * The callback to invoke on success.
+     */
+    private final Runnable successCallback;
+
+    Assignment(
+        TopicIdPartition topicIdPartition,
+        Uuid directoryId,
+        long submissionTimeNs,
+        Runnable successCallback
+    ) {
+        this.topicIdPartition = topicIdPartition;
+        this.directoryId = directoryId;
+        this.submissionTimeNs = submissionTimeNs;
+        this.successCallback = successCallback;
+    }
+
+    TopicIdPartition topicIdPartition() {
+        return topicIdPartition;
+    }
+
+    Uuid directoryId() {
+        return directoryId;
+    }
+
+    long submissionTimeNs() {
+        return submissionTimeNs;
+    }
+
+    Runnable successCallback() {
+        return successCallback;
+    }
 
     /**
      * Check if this Assignment is still valid to be sent.
      *
-     * @param nodeId The broker ID.
-     * @param image  The metadata image.
-     * @return True only if the Assignment is still valid.
+     * @param nodeId    The broker ID.
+     * @param image     The metadata image.
+     *
+     * @return          True only if the Assignment is still valid.
      */
     boolean valid(int nodeId, MetadataImage image) {
         TopicImage topicImage = 
image.topics().getTopic(topicIdPartition.topicId());

Reply via email to