Repository: kafka
Updated Branches:
  refs/heads/trunk b3847f76b -> e7d04c251


KAFKA-3602; Rename RecordAccumulator dequeFor() and fix usage

Author: Jason Gustafson <[email protected]>

Reviewers: Grant Henke <[email protected]>, Ashish Singh 
<[email protected]>, Ismael Juma <[email protected]>

Closes #1254 from hachikuji/KAFKA-3602


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7d04c25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7d04c25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7d04c25

Branch: refs/heads/trunk
Commit: e7d04c2515be158d9a7f0ff0d7571bd41287d91a
Parents: b3847f7
Author: Jason Gustafson <[email protected]>
Authored: Tue Apr 26 06:37:08 2016 -0700
Committer: Ismael Juma <[email protected]>
Committed: Tue Apr 26 06:37:08 2016 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java       | 16 ++++++++++------
 .../kafka/clients/producer/internals/Sender.java    |  2 +-
 .../producer/internals/RecordAccumulatorTest.java   | 14 +++++++-------
 3 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d963981..1766609 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -167,7 +167,7 @@ public final class RecordAccumulator {
         appendsInProgress.incrementAndGet();
         try {
             // check if we have an in-progress batch
-            Deque<RecordBatch> dq = dequeFor(tp);
+            Deque<RecordBatch> dq = getOrCreateDeque(tp);
             synchronized (dq) {
                 if (closed)
                     throw new IllegalStateException("Cannot send after the 
producer is closed.");
@@ -213,7 +213,7 @@ public final class RecordAccumulator {
      * Abort the batches that have been sitting in RecordAccumulator for more 
than the configured requestTimeout
      * due to metadata being unavailable
      */
-    public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster 
cluster, long now) {
+    public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) 
{
         List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : 
this.batches.entrySet()) {
@@ -259,7 +259,7 @@ public final class RecordAccumulator {
         batch.lastAttemptMs = now;
         batch.lastAppendTime = now;
         batch.setRetry();
-        Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
+        Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
         }
@@ -369,7 +369,7 @@ public final class RecordAccumulator {
                 TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
                 // Only proceed if the partition has no in-flight batches.
                 if (!muted.contains(tp)) {
-                    Deque<RecordBatch> deque = dequeFor(new 
TopicPartition(part.topic(), part.partition()));
+                    Deque<RecordBatch> deque = getDeque(new 
TopicPartition(part.topic(), part.partition()));
                     if (deque != null) {
                         synchronized (deque) {
                             RecordBatch first = deque.peekFirst();
@@ -401,10 +401,14 @@ public final class RecordAccumulator {
         return batches;
     }
 
+    private Deque<RecordBatch> getDeque(TopicPartition tp) {
+        return batches.get(tp);
+    }
+
     /**
      * Get the deque for the given topic-partition, creating it if necessary.
      */
-    private Deque<RecordBatch> dequeFor(TopicPartition tp) {
+    private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
         Deque<RecordBatch> d = this.batches.get(tp);
         if (d != null)
             return d;
@@ -478,7 +482,7 @@ public final class RecordAccumulator {
      */
     private void abortBatches() {
         for (RecordBatch batch : incomplete.all()) {
-            Deque<RecordBatch> dq = dequeFor(batch.topicPartition);
+            Deque<RecordBatch> dq = getDeque(batch.topicPartition);
             // Close the batch before aborting
             synchronized (dq) {
                 batch.records.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index db8918c..29077b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -202,7 +202,7 @@ public class Sender implements Runnable {
             }
         }
 
-        List<RecordBatch> expiredBatches = 
this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
+        List<RecordBatch> expiredBatches = 
this.accumulator.abortExpiredBatches(this.requestTimeout, now);
         // update sensors
         for (RecordBatch expiredBatch : expiredBatches)
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), 
expiredBatch.recordCount);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 904aa73..a39d2e8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -316,11 +316,11 @@ public class RecordAccumulatorTest {
         // Advance the clock to expire the batch.
         time.sleep(requestTimeout + 1);
         accum.mutePartition(tp1);
-        List<RecordBatch> expiredBatches = 
accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        List<RecordBatch> expiredBatches = 
accum.abortExpiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should be expired", 1, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
 
@@ -330,11 +330,11 @@ public class RecordAccumulatorTest {
         time.sleep(requestTimeout + 1);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should not be expired when metadata is still 
available and partition is muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should be expired when the partition is not 
muted", 1, expiredBatches.size());
         assertEquals("No partitions should be ready.", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
 
@@ -351,16 +351,16 @@ public class RecordAccumulatorTest {
 
         // test expiration.
         time.sleep(requestTimeout + retryBackoffMs);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should not be expired.", 0, 
expiredBatches.size());
         time.sleep(1L);
 
         accum.mutePartition(tp1);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should not be expired when the partition is 
muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1);
-        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, 
time.milliseconds());
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, 
time.milliseconds());
         assertEquals("The batch should be expired when the partition is not 
muted.", 1, expiredBatches.size());
     }
 

Reply via email to