This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf7ddf73af1 MINOR: use addExact to avoid overflow and some cleanup
(#12660)
bf7ddf73af1 is described below
commit bf7ddf73af1ee416388c09a8ad42bfa6cf295641
Author: Luke Chen <[email protected]>
AuthorDate: Thu Sep 22 09:22:58 2022 +0800
MINOR: use addExact to avoid overflow and some cleanup (#12660)
What changes in this PR:
1. Use addExact to avoid overflow in BatchAccumulator#bytesNeeded. We did
use addExact in bytesNeededForRecords method, but forgot that when returning
the result.
2. javadoc improvement
Reviewers: Jason Gustafson <[email protected]>
---
.../java/org/apache/kafka/raft/internals/BatchAccumulator.java | 4 +++-
.../main/java/org/apache/kafka/raft/internals/BatchBuilder.java | 4 ++--
.../java/org/apache/kafka/raft/internals/BatchMemoryPool.java | 8 ++++----
3 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 69732cd670b..b84a7d57b8a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -107,6 +107,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than
the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for
the records
+ * @throws IllegalStateException if we tried to append new records after
the batch has been built
*/
public long append(int epoch, List<T> records) {
return append(epoch, records, false);
@@ -127,6 +128,7 @@ public class BatchAccumulator<T> implements Closeable {
* @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than
the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for
the records
+ * @throws IllegalStateException if we tried to append new records after
the batch has been built
*/
public long appendAtomic(int epoch, List<T> records) {
return append(epoch, records, true);
@@ -260,7 +262,7 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link LeaderChangeMessage} record to the batch
*
- * @param LeaderChangeMessage The message to append
+ * @param leaderChangeMessage The message to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the
record
*/
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index 982040b84ee..92b63ec5526 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -142,7 +142,7 @@ public class BatchBuilder<T> {
);
if (!isOpenForAppends) {
- return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+ return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(),
bytesNeeded));
}
int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
@@ -157,7 +157,7 @@ public class BatchBuilder<T> {
}
}
- return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
+ return OptionalInt.of(Math.addExact(batchHeaderSizeInBytes(),
bytesNeeded));
}
private int flushedSizeInBytes() {
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
index ae6cba81de6..5120d6928d3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java
@@ -54,12 +54,12 @@ public class BatchMemoryPool implements MemoryPool {
}
/**
- * Allocate a byte buffer in this pool.
+ * Allocate a byte buffer with {@code batchSize} in this pool.
*
* This method should always succeed and never return null. The sizeBytes
parameter must be less than
* the batchSize used in the constructor.
*
- * @param sizeBytes is not used to determine the size of the byte buffer
+ * @param sizeBytes is used to determine if the requested size is
exceeding the batchSize
* @throws IllegalArgumentException if sizeBytes is greater than batchSize
*/
@Override
@@ -96,9 +96,9 @@ public class BatchMemoryPool implements MemoryPool {
try {
previouslyAllocated.clear();
- if (previouslyAllocated.limit() != batchSize) {
+ if (previouslyAllocated.capacity() != batchSize) {
throw new IllegalArgumentException("Released buffer with
unexpected size "
- + previouslyAllocated.limit());
+ + previouslyAllocated.capacity());
}
// Free the buffer if the number of pooled buffers is already the
maximum number of batches.