jsancio commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273531461


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -755,7 +755,9 @@ public Long apply(List<ApiMessageAndVersion> records) {
                                 recordIndex++;
                             }
                             long nextEndOffset = prevEndOffset + recordIndex;
-                            raftClient.scheduleAtomicAppend(controllerEpoch, 
OptionalLong.of(nextEndOffset), records);
+                            raftClient.scheduleAtomicAppend(controllerEpoch,
+                                    OptionalLong.of(prevEndOffset + 1),
+                                    records);

Review Comment:
   This indentation doesn't look right. We indent 4 spaces in this case.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   This is not correct in all cases. The leader can have records in the base 
accumulator that have not been sent to the log. I think you want something 
along these lines:
   ```java
       public long logEndOffset() {
           return quorum.maybeLeaderState()
               .map(leader -> leader.accumulator().nextOffset())
               .orElse(log.endOffset().offset);
       }
   ```
   
   Then we can add this method to BatchAccumulator:
   ```java
       public long nextOffset() {
           appendLock.lock();
           try {
               return nextOffset;
           finally {
               appendLock.unlock();
           }
       }
   ```



##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean 
isAtomic) {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;

Review Comment:
   I think must readers will assume that this "end offset" is exclusive. I 
think this offset is inclusive. We normally use `lastOffset` for this kind of 
offset.
   ```java
               long lastOffset = nextOffset + records.size() - 1;
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   Can we also add tests for this new functionality?



##########
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java:
##########
@@ -232,7 +233,7 @@ public void testLingerBeginsOnFirstWrite() {
         );
 
         time.sleep(15);
-        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
+        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), 
OptionalLong.empty(), false));

Review Comment:
   We need tests for the new functionality added to the BatchAccumulator.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -355,7 +355,8 @@ public void 
testAppendFailedWithRecordBatchTooLargeException() throws Exception
         for (int i = 0; i < size; i++)
             batchToLarge.add("a");
 
-        assertThrows(RecordBatchTooLargeException.class, () -> 
context.client.scheduleAtomicAppend(epoch, batchToLarge));
+        assertThrows(RecordBatchTooLargeException.class,
+                () -> context.client.scheduleAtomicAppend(epoch, 
OptionalLong.empty(), batchToLarge));

Review Comment:
   We need tests for the new functionality added to `KafkaRaftClient`. That is 
both the new method `logEndOffset` and the changes to `scheduleAtomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to