This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c77953d5fa MINOR: Revert "migrate LogFetchInfo, Assignment and
RequestAndCompletionHandler to java record" (#19177)
8c77953d5fa is described below
commit 8c77953d5fa84ce1dfdf83f73560444a4acabc1f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sat Apr 5 23:01:17 2025 +0800
MINOR: Revert "migrate LogFetchInfo, Assignment and
RequestAndCompletionHandler to java record" (#19177)
Revert some java record migration in #19062 #18783
We assume java record is purely immutable data carriers.
As discussed in
https://github.com/apache/kafka/pull/19062#issuecomment-2709637352, if a
class has fields that may be mutable, we shouldn't migrate it to Java
record because the hashcode/equals behavior are changed.
* LogFetchInfo (Records)
* Assignment (successCallback)
* Remove `equals` method from Assignment since `Assignment` is not and
shouldn't be used in Map/Set key.
* RequestAndCompletionHandler (handler)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 6 +-
.../java/org/apache/kafka/raft/LogFetchInfo.java | 13 ++++-
.../internals/KRaftControlRecordStateMachine.java | 2 +-
.../kafka/raft/KafkaRaftClientReconfigTest.java | 4 +-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 4 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 2 +-
.../java/org/apache/kafka/raft/MockLogTest.java | 28 ++++-----
.../kafka/server/util/InterBrokerSendThread.java | 10 ++--
.../server/util/RequestAndCompletionHandler.java | 35 +++++++++--
.../server/util/InterBrokerSendThreadTest.java | 34 +++++------
.../java/org/apache/kafka/server/Assignment.java | 68 +++++++++++++++++-----
11 files changed, 139 insertions(+), 67 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 388ac04526a..e46a2a1ff83 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -429,7 +429,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
if (nextExpectedOffset < highWatermark) {
LogFetchInfo readInfo = log.read(nextExpectedOffset,
Isolation.COMMITTED);
- listenerContext.fireHandleCommit(nextExpectedOffset,
readInfo.records());
+ listenerContext.fireHandleCommit(nextExpectedOffset,
readInfo.records);
}
});
}
@@ -1622,11 +1622,11 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
- if (state.updateReplicaState(replicaKey, currentTimeMs,
info.startOffsetMetadata())) {
+ if (state.updateReplicaState(replicaKey, currentTimeMs,
info.startOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
- records = info.records();
+ records = info.records;
} else {
records = MemoryRecords.EMPTY;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
index 7862fc2ab29..99b69070378 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
@@ -19,6 +19,15 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.record.Records;
/**
- * Metadata for the records fetched from log, including the records itself
+ * The class is not converted to a Java record since records are typically
intended to be immutable, but this one contains a mutable field records
*/
-public record LogFetchInfo(Records records, LogOffsetMetadata
startOffsetMetadata) { }
+public class LogFetchInfo {
+
+ public final Records records;
+ public final LogOffsetMetadata startOffsetMetadata;
+
+ public LogFetchInfo(Records records, LogOffsetMetadata
startOffsetMetadata) {
+ this.records = records;
+ this.startOffsetMetadata = startOffsetMetadata;
+ }
+}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 032f8230233..a8768f902a8 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -233,7 +233,7 @@ public final class KRaftControlRecordStateMachine {
while (log.endOffset().offset() > nextOffset) {
LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
try (RecordsIterator<?> iterator = new RecordsIterator<>(
- info.records(),
+ info.records,
serde,
bufferSupplier,
maxBatchSizeBytes,
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 199398422fb..0c4c1780919 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -128,7 +128,7 @@ public class KafkaRaftClientReconfigTest {
context.unattachedToLeader();
// check if leader writes 3 bootstrap records to the log
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
@@ -191,7 +191,7 @@ public class KafkaRaftClientReconfigTest {
// check leader does not write bootstrap records to log
context.unattachedToLeader();
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 5032a5c95ac..afa5cce0521 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -913,7 +913,7 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
@@ -962,7 +962,7 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId,
secondNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index e108664b502..209accdf7a2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -513,7 +513,7 @@ public class MockLog implements ReplicatedLog {
);
}
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata().offset();
+ long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
if (snapshotId.offset() != baseOffset) {
throw new IllegalArgumentException(
String.format(
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index e95387aca8c..3abfe5a1b12 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -214,7 +214,7 @@ public class MockLogTest {
assertEquals(1, log.endOffset().offset());
assertEquals(currentEpoch, log.lastFetchedEpoch());
- Records records = log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = log.read(0, Isolation.UNCOMMITTED).records;
for (RecordBatch batch : records.batches()) {
assertTrue(batch.isControlBatch());
}
@@ -249,7 +249,7 @@ public class MockLogTest {
assertEquals(initialOffset + 1, log.endOffset().offset());
assertEquals(3, log.lastFetchedEpoch());
- Records records = log.read(5L, Isolation.UNCOMMITTED).records();
+ Records records = log.read(5L, Isolation.UNCOMMITTED).records;
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
extractRecords.add(record.value());
@@ -275,7 +275,7 @@ public class MockLogTest {
appendAsLeader(List.of(recordOne, recordTwo), epoch);
- Records records = log.read(0, Isolation.UNCOMMITTED).records();
+ Records records = log.read(0, Isolation.UNCOMMITTED).records;
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
@@ -346,12 +346,12 @@ public class MockLogTest {
appendBatch(5, 1);
LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
- assertEquals(5L, readInfo.startOffsetMetadata().offset());
- assertTrue(readInfo.startOffsetMetadata().metadata().isPresent());
+ assertEquals(5L, readInfo.startOffsetMetadata.offset());
+ assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
// Update to a high watermark with valid offset metadata
- log.updateHighWatermark(readInfo.startOffsetMetadata());
- assertEquals(readInfo.startOffsetMetadata().offset(),
log.highWatermark().offset());
+ log.updateHighWatermark(readInfo.startOffsetMetadata);
+ assertEquals(readInfo.startOffsetMetadata.offset(),
log.highWatermark().offset());
// Now update to a high watermark with invalid metadata
assertThrows(IllegalArgumentException.class, () ->
@@ -360,17 +360,17 @@ public class MockLogTest {
// Ensure we can update the high watermark to the end offset
LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
- assertEquals(15, readFromEndInfo.startOffsetMetadata().offset());
-
assertTrue(readFromEndInfo.startOffsetMetadata().metadata().isPresent());
- log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
+ assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
+ assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
+ log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
// Ensure that the end offset metadata is valid after new entries are
appended
appendBatch(5, 1);
- log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
+ log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
// Check handling of a fetch from the middle of a batch
LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
- assertEquals(readFromEndInfo.startOffsetMetadata(),
readFromMiddleInfo.startOffsetMetadata());
+ assertEquals(readFromEndInfo.startOffsetMetadata,
readFromMiddleInfo.startOffsetMetadata);
}
@Test
@@ -1002,7 +1002,7 @@ public class MockLogTest {
while (foundRecord) {
foundRecord = false;
- Records records = log.read(currentStart, isolation).records();
+ Records records = log.read(currentStart, isolation).records;
for (Record record : records.records()) {
foundRecord = true;
@@ -1081,7 +1081,7 @@ public class MockLogTest {
int currentOffset = 0;
while (currentOffset < log.endOffset().offset()) {
- Records records = log.read(currentOffset,
Isolation.UNCOMMITTED).records();
+ Records records = log.read(currentOffset,
Isolation.UNCOMMITTED).records;
List<? extends RecordBatch> batches =
Utils.toList(records.batches().iterator());
assertFalse(batches.isEmpty());
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
index cdd7ae3e98e..093946eb5f0 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java
@@ -89,14 +89,14 @@ public abstract class InterBrokerSendThread extends
ShutdownableThread {
private void drainGeneratedRequests() {
generateRequests().forEach(request ->
unsentRequests.put(
- request.destination(),
+ request.destination,
networkClient.newClientRequest(
- request.destination().idString(),
- request.request(),
- request.creationTimeMs(),
+ request.destination.idString(),
+ request.request,
+ request.creationTimeMs,
true,
requestTimeoutMs,
- request.handler()
+ request.handler
)
)
);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
index 7e2fea6af9b..da14fb5e4a4 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java
@@ -20,9 +20,32 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
-public record RequestAndCompletionHandler(
- long creationTimeMs,
- Node destination,
- AbstractRequest.Builder<? extends AbstractRequest> request,
- RequestCompletionHandler handler
-) { }
+public final class RequestAndCompletionHandler {
+
+ public final long creationTimeMs;
+ public final Node destination;
+ public final AbstractRequest.Builder<? extends AbstractRequest> request;
+ public final RequestCompletionHandler handler;
+
+ public RequestAndCompletionHandler(
+ long creationTimeMs,
+ Node destination,
+ AbstractRequest.Builder<? extends AbstractRequest> request,
+ RequestCompletionHandler handler
+ ) {
+ this.creationTimeMs = creationTimeMs;
+ this.destination = destination;
+ this.request = request;
+ this.handler = handler;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestAndCompletionHandler(" +
+ "creationTimeMs=" + creationTimeMs +
+ ", destination=" + destination +
+ ", request=" + request +
+ ", handler=" + handler +
+ ')';
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
index 7aaa4c01d93..e2417e40cfe 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java
@@ -166,15 +166,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new
TestInterBrokerSendThread();
final ClientRequest clientRequest =
- new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler());
+ new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler);
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
+ same(handler.request),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler())
+ same(handler.handler)
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@@ -187,11 +187,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
+ same(handler.request),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler()));
+ same(handler.handler));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).send(same(clientRequest), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@@ -209,15 +209,15 @@ public class InterBrokerSendThreadTest {
final TestInterBrokerSendThread sendThread = new
TestInterBrokerSendThread();
final ClientRequest clientRequest =
- new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler());
+ new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler);
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
+ same(handler.request),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler())
+ same(handler.handler)
)).thenReturn(clientRequest);
when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@@ -236,11 +236,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
+ same(handler.request),
anyLong(),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler()));
+ same(handler.handler));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
@@ -261,16 +261,16 @@ public class InterBrokerSendThreadTest {
final ClientRequest clientRequest =
new ClientRequest(
- "dest", request, 0, "1", time.milliseconds(), true,
requestTimeoutMs, handler.handler());
+ "dest", request, 0, "1", time.milliseconds(), true,
requestTimeoutMs, handler.handler);
time.sleep(1500L);
when(networkClient.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
- ArgumentMatchers.eq(handler.creationTimeMs()),
+ same(handler.request),
+ ArgumentMatchers.eq(handler.creationTimeMs),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler())
+ same(handler.handler)
)).thenReturn(clientRequest);
// make the node unready so the request is not cleared
@@ -289,11 +289,11 @@ public class InterBrokerSendThreadTest {
verify(networkClient)
.newClientRequest(
ArgumentMatchers.eq("1"),
- same(handler.request()),
- ArgumentMatchers.eq(handler.creationTimeMs()),
+ same(handler.request),
+ ArgumentMatchers.eq(handler.creationTimeMs),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(requestTimeoutMs),
- same(handler.handler()));
+ same(handler.handler));
verify(networkClient).ready(any(), anyLong());
verify(networkClient).connectionDelay(any(), anyLong());
verify(networkClient).poll(anyLong(), anyLong());
diff --git a/server/src/main/java/org/apache/kafka/server/Assignment.java
b/server/src/main/java/org/apache/kafka/server/Assignment.java
index 680a62e08a7..df517c05568 100644
--- a/server/src/main/java/org/apache/kafka/server/Assignment.java
+++ b/server/src/main/java/org/apache/kafka/server/Assignment.java
@@ -25,24 +25,64 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.TopicIdPartition;
/**
- * @param topicIdPartition The topic ID and partition index of the replica.
- * @param directoryId The ID of the directory we are placing the replica
into.
- * @param submissionTimeNs The time in monotonic nanosecond when this
assignment was created.
- * @param successCallback The callback to invoke on success.
- */
-record Assignment(
- TopicIdPartition topicIdPartition,
- Uuid directoryId,
- long submissionTimeNs,
- Runnable successCallback
-) {
+ * The class is not converted to a Java record since record classes are meant
for pure data, but this one contains a Runnable
+ **/
+final class Assignment {
+ /**
+ * The topic ID and partition index of the replica.
+ */
+ private final TopicIdPartition topicIdPartition;
+
+ /**
+ * The ID of the directory we are placing the replica into.
+ */
+ private final Uuid directoryId;
+
+ /**
+ * The time in monotonic nanosecond when this assignment was created.
+ */
+ private final long submissionTimeNs;
+
+ /**
+ * The callback to invoke on success.
+ */
+ private final Runnable successCallback;
+
+ Assignment(
+ TopicIdPartition topicIdPartition,
+ Uuid directoryId,
+ long submissionTimeNs,
+ Runnable successCallback
+ ) {
+ this.topicIdPartition = topicIdPartition;
+ this.directoryId = directoryId;
+ this.submissionTimeNs = submissionTimeNs;
+ this.successCallback = successCallback;
+ }
+
+ TopicIdPartition topicIdPartition() {
+ return topicIdPartition;
+ }
+
+ Uuid directoryId() {
+ return directoryId;
+ }
+
+ long submissionTimeNs() {
+ return submissionTimeNs;
+ }
+
+ Runnable successCallback() {
+ return successCallback;
+ }
/**
* Check if this Assignment is still valid to be sent.
*
- * @param nodeId The broker ID.
- * @param image The metadata image.
- * @return True only if the Assignment is still valid.
+ * @param nodeId The broker ID.
+ * @param image The metadata image.
+ *
+ * @return True only if the Assignment is still valid.
*/
boolean valid(int nodeId, MetadataImage image) {
TopicImage topicImage =
image.topics().getTopic(topicIdPartition.topicId());