jsancio commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1276417011
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() { return log.latestSnapshotId(); } + @Override + public long logEndOffset() { + return log.endOffset().offset; + } Review Comment: > Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading. @cmccabe, I don't follow this comment. When the client calls `KafkaRaftClient::schedule{Atomic}Append` the `KafkaRaftClient` compare the provided offset with the `nextOffset` stored in the `BatchAccumulator`. If we want this method to succeed in most cases `KafkaRaftClient::logEndOffset` should return that offset, `BatchAccumulator::nextOffset` and not the log end offset. Maybe `logEndOffset` is not a great name. I am okay renaming this to `KafkaRaftClient::endOffset()` but I am open to suggestions. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception for (int i = 0; i < size; i++) batchToLarge.add("a"); - assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge)); + assertThrows(RecordBatchTooLargeException.class, + () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge)); Review Comment: I couldn't find a test for the new `KafkaRaftClient::scheduleAtomicAppend`. ########## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ########## @@ -171,16 +172,21 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum * batch size; if this exception is throw none of the elements in records were * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ - long scheduleAtomicAppend(int epoch, List<T> records); + long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List<T> records); Review Comment: What's the argument/reason for adding this functionality to `scheduleAtomicAppend` and not `scheduleAppend`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org