jsancio commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273531461
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -755,7 +755,9 @@ public Long apply(List<ApiMessageAndVersion> records) { recordIndex++; } long nextEndOffset = prevEndOffset + recordIndex; - raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records); + raftClient.scheduleAtomicAppend(controllerEpoch, + OptionalLong.of(prevEndOffset + 1), + records); Review Comment: This indentation doesn't look right. We indent 4 spaces in this case. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() { return log.latestSnapshotId(); } + @Override + public long logEndOffset() { + return log.endOffset().offset; + } Review Comment: This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I think you want something along these lines: ```java public long logEndOffset() { return quorum.maybeLeaderState() .map(leader -> leader.accumulator().nextOffset()) .orElse(log.endOffset().offset); } ``` Then we can add this method to BatchAccumulator: ```java public long nextOffset() { appendLock.lock(); try { return nextOffset; finally { appendLock.unlock(); } } ``` ########## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ########## @@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) { appendLock.lock(); try { + long endOffset = nextOffset + records.size() - 1; Review Comment: I think must readers will assume that this "end offset" is exclusive. I think this offset is inclusive. We normally use `lastOffset` for this kind of offset. ```java long lastOffset = nextOffset + records.size() - 1; ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() { return log.latestSnapshotId(); } + @Override + public long logEndOffset() { + return log.endOffset().offset; + } Review Comment: Can we also add tests for this new functionality? ########## raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java: ########## @@ -232,7 +233,7 @@ public void testLingerBeginsOnFirstWrite() { ); time.sleep(15); - assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"))); + assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false)); Review Comment: We need tests for the new functionality added to the BatchAccumulator. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception for (int i = 0; i < size; i++) batchToLarge.add("a"); - assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge)); + assertThrows(RecordBatchTooLargeException.class, + () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge)); Review Comment: We need tests for the new functionality added to `KafkaRaftClient`. That is both the new method `logEndOffset` and the changes to `scheduleAtomicAppend`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org