Repository: kafka
Updated Branches:
  refs/heads/trunk 374983263 -> 5b22b53f0


MINOR: Fix potential integer overflow and String.format issue

Author: Ismael Juma <[email protected]>

Reviewers: Apurva Mehta <[email protected]>, Jason Gustafson 
<[email protected]>

Closes #2585 from ijuma/overflow-and-format-fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b22b53f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b22b53f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b22b53f

Branch: refs/heads/trunk
Commit: 5b22b53f0e581e3239ca6bbeceec6c01ce0f8ce0
Parents: 3749832
Author: Ismael Juma <[email protected]>
Authored: Fri Feb 24 16:41:51 2017 -0800
Committer: Jason Gustafson <[email protected]>
Committed: Fri Feb 24 16:41:51 2017 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/BufferPool.java  | 24 +++++++++++-----
 .../authenticator/SaslClientAuthenticator.java  |  2 +-
 .../producer/internals/BufferPoolTest.java      | 30 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 077215c..92d59d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.Time;
  * buffers are deallocated.
  * </ol>
  */
-public final class BufferPool {
+public class BufferPool {
 
     private final long totalMemory;
     private final int poolableSize;
@@ -65,8 +65,8 @@ public final class BufferPool {
     public BufferPool(long memory, int poolableSize, Metrics metrics, Time 
time, String metricGrpName) {
         this.poolableSize = poolableSize;
         this.lock = new ReentrantLock();
-        this.free = new ArrayDeque<ByteBuffer>();
-        this.waiters = new ArrayDeque<Condition>();
+        this.free = new ArrayDeque<>();
+        this.waiters = new ArrayDeque<>();
         this.totalMemory = memory;
         this.availableMemory = memory;
         this.metrics = metrics;
@@ -104,14 +104,14 @@ public final class BufferPool {
 
             // now check if the request is immediately satisfiable with the
             // memory on hand or if we need to block
-            int freeListSize = this.free.size() * this.poolableSize;
+            int freeListSize = freeSize() * this.poolableSize;
             if (this.availableMemory + freeListSize >= size) {
                 // we have enough unallocated or pooled memory to immediately
                 // satisfy the request
                 freeUp(size);
                 this.availableMemory -= size;
                 lock.unlock();
-                return ByteBuffer.allocate(size);
+                return allocateByteBuffer(size);
             } else {
                 // we are out of memory and will have to block
                 int accumulated = 0;
@@ -174,7 +174,7 @@ public final class BufferPool {
                 // unlock and return the buffer
                 lock.unlock();
                 if (buffer == null)
-                    return ByteBuffer.allocate(size);
+                    return allocateByteBuffer(size);
                 else
                     return buffer;
             }
@@ -184,6 +184,11 @@ public final class BufferPool {
         }
     }
 
+    // Protected for testing.
+    protected ByteBuffer allocateByteBuffer(int size) {
+        return ByteBuffer.allocate(size);
+    }
+
     /**
      * Attempt to ensure we have at least the requested number of bytes of 
memory for allocation by deallocating pooled
      * buffers (if needed)
@@ -228,12 +233,17 @@ public final class BufferPool {
     public long availableMemory() {
         lock.lock();
         try {
-            return this.availableMemory + this.free.size() * this.poolableSize;
+            return this.availableMemory + freeSize() * (long) 
this.poolableSize;
         } finally {
             lock.unlock();
         }
     }
 
+    // Protected for testing.
+    protected int freeSize() {
+        return this.free.size();
+    }
+
     /**
      * Get the unallocated memory (not in the free list or in use)
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 2b445e5..27a24e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -335,7 +335,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 throw new IllegalSaslStateException(String.format("Unexpected 
handshake request with client mechanism %s, enabled mechanisms are %s",
                     mechanism, response.enabledMechanisms()));
             default:
-                throw new AuthenticationException(String.format("Unknown error 
code %d, client mechanism is %s, enabled mechanisms are %s",
+                throw new AuthenticationException(String.format("Unknown error 
code %s, client mechanism is %s, enabled mechanisms are %s",
                     response.error(), mechanism, 
response.enabledMechanisms()));
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b22b53f/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 41ac4f0..41143d8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
 import static org.junit.Assert.assertNotEquals;
@@ -268,6 +269,35 @@ public class BufferPoolTest {
         assertEquals(totalMemory, pool.availableMemory());
     }
 
+    @Test
+    public void testLargeAvailableMemory() throws Exception {
+        long memory = 20_000_000_000L;
+        int poolableSize = 2_000_000_000;
+        final AtomicInteger freeSize = new AtomicInteger(0);
+        BufferPool pool = new BufferPool(memory, poolableSize, metrics, time, 
metricGroup) {
+            @Override
+            protected ByteBuffer allocateByteBuffer(int size) {
+                // Ignore size to avoid OOM due to large buffers
+                return ByteBuffer.allocate(0);
+            }
+
+            @Override
+            protected int freeSize() {
+                return freeSize.get();
+            }
+        };
+        pool.allocate(poolableSize, 0);
+        assertEquals(18_000_000_000L, pool.availableMemory());
+        pool.allocate(poolableSize, 0);
+        assertEquals(16_000_000_000L, pool.availableMemory());
+
+        // Emulate `deallocate` by increasing `freeSize`
+        freeSize.incrementAndGet();
+        assertEquals(18_000_000_000L, pool.availableMemory());
+        freeSize.incrementAndGet();
+        assertEquals(20_000_000_000L, pool.availableMemory());
+    }
+
     public static class StressTestThread extends Thread {
         private final int iterations;
         private final BufferPool pool;

Reply via email to