Repository: kafka Updated Branches: refs/heads/trunk 374983263 -> 5b22b53f0
MINOR: Fix potential integer overflow and String.format issue Author: Ismael Juma <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Jason Gustafson <[email protected]> Closes #2585 from ijuma/overflow-and-format-fixes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b22b53f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b22b53f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b22b53f Branch: refs/heads/trunk Commit: 5b22b53f0e581e3239ca6bbeceec6c01ce0f8ce0 Parents: 3749832 Author: Ismael Juma <[email protected]> Authored: Fri Feb 24 16:41:51 2017 -0800 Committer: Jason Gustafson <[email protected]> Committed: Fri Feb 24 16:41:51 2017 -0800 ---------------------------------------------------------------------- .../clients/producer/internals/BufferPool.java | 24 +++++++++++----- .../authenticator/SaslClientAuthenticator.java | 2 +- .../producer/internals/BufferPoolTest.java | 30 ++++++++++++++++++++ 3 files changed, 48 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 077215c..92d59d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.Time; * buffers are deallocated. * </ol> */ -public final class BufferPool { +public class BufferPool { private final long totalMemory; private final int poolableSize; @@ -65,8 +65,8 @@ public final class BufferPool { public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); - this.free = new ArrayDeque<ByteBuffer>(); - this.waiters = new ArrayDeque<Condition>(); + this.free = new ArrayDeque<>(); + this.waiters = new ArrayDeque<>(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; @@ -104,14 +104,14 @@ public final class BufferPool { // now check if the request is immediately satisfiable with the // memory on hand or if we need to block - int freeListSize = this.free.size() * this.poolableSize; + int freeListSize = freeSize() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); this.availableMemory -= size; lock.unlock(); - return ByteBuffer.allocate(size); + return allocateByteBuffer(size); } else { // we are out of memory and will have to block int accumulated = 0; @@ -174,7 +174,7 @@ public final class BufferPool { // unlock and return the buffer lock.unlock(); if (buffer == null) - return ByteBuffer.allocate(size); + return allocateByteBuffer(size); else return buffer; } @@ -184,6 +184,11 @@ public final class BufferPool { } } + // Protected for testing. + protected ByteBuffer allocateByteBuffer(int size) { + return ByteBuffer.allocate(size); + } + /** * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled * buffers (if needed) @@ -228,12 +233,17 @@ public final class BufferPool { public long availableMemory() { lock.lock(); try { - return this.availableMemory + this.free.size() * this.poolableSize; + return this.availableMemory + freeSize() * (long) this.poolableSize; } finally { lock.unlock(); } } + // Protected for testing. + protected int freeSize() { + return this.free.size(); + } + /** * Get the unallocated memory (not in the free list or in use) */ http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 2b445e5..27a24e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -335,7 +335,7 @@ public class SaslClientAuthenticator implements Authenticator { throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s", mechanism, response.enabledMechanisms())); default: - throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s", + throw new AuthenticationException(String.format("Unknown error code %s, client mechanism is %s, enabled mechanisms are %s", response.error(), mechanism, response.enabledMechanisms())); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 41ac4f0..41143d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import static org.junit.Assert.assertNotEquals; @@ -268,6 +269,35 @@ public class BufferPoolTest { assertEquals(totalMemory, pool.availableMemory()); } + @Test + public void testLargeAvailableMemory() throws Exception { + long memory = 20_000_000_000L; + int poolableSize = 2_000_000_000; + final AtomicInteger freeSize = new AtomicInteger(0); + BufferPool pool = new BufferPool(memory, poolableSize, metrics, time, metricGroup) { + @Override + protected ByteBuffer allocateByteBuffer(int size) { + // Ignore size to avoid OOM due to large buffers + return ByteBuffer.allocate(0); + } + + @Override + protected int freeSize() { + return freeSize.get(); + } + }; + pool.allocate(poolableSize, 0); + assertEquals(18_000_000_000L, pool.availableMemory()); + pool.allocate(poolableSize, 0); + assertEquals(16_000_000_000L, pool.availableMemory()); + + // Emulate `deallocate` by increasing `freeSize` + freeSize.incrementAndGet(); + assertEquals(18_000_000_000L, pool.availableMemory()); + freeSize.incrementAndGet(); + assertEquals(20_000_000_000L, pool.availableMemory()); + } + public static class StressTestThread extends Thread { private final int iterations; private final BufferPool pool;
