Repository: kafka Updated Branches: refs/heads/trunk b3847f76b -> e7d04c251
KAFKA-3602; Rename RecordAccumulator dequeFor() and fix usage Author: Jason Gustafson <[email protected]> Reviewers: Grant Henke <[email protected]>, Ashish Singh <[email protected]>, Ismael Juma <[email protected]> Closes #1254 from hachikuji/KAFKA-3602 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7d04c25 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7d04c25 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7d04c25 Branch: refs/heads/trunk Commit: e7d04c2515be158d9a7f0ff0d7571bd41287d91a Parents: b3847f7 Author: Jason Gustafson <[email protected]> Authored: Tue Apr 26 06:37:08 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 26 06:37:08 2016 -0700 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 16 ++++++++++------ .../kafka/clients/producer/internals/Sender.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 14 +++++++------- 3 files changed, 18 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d963981..1766609 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -167,7 +167,7 @@ public final class RecordAccumulator { appendsInProgress.incrementAndGet(); try { // check if we have an in-progress batch - Deque<RecordBatch> dq = dequeFor(tp); + Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); @@ -213,7 +213,7 @@ public final class RecordAccumulator { * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { + public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) { List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>(); int count = 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { @@ -259,7 +259,7 @@ public final class RecordAccumulator { batch.lastAttemptMs = now; batch.lastAppendTime = now; batch.setRetry(); - Deque<RecordBatch> deque = dequeFor(batch.topicPartition); + Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); } @@ -369,7 +369,7 @@ public final class RecordAccumulator { TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. if (!muted.contains(tp)) { - Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition())); + Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { RecordBatch first = deque.peekFirst(); @@ -401,10 +401,14 @@ public final class RecordAccumulator { return batches; } + private Deque<RecordBatch> getDeque(TopicPartition tp) { + return batches.get(tp); + } + /** * Get the deque for the given topic-partition, creating it if necessary. */ - private Deque<RecordBatch> dequeFor(TopicPartition tp) { + private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) { Deque<RecordBatch> d = this.batches.get(tp); if (d != null) return d; @@ -478,7 +482,7 @@ public final class RecordAccumulator { */ private void abortBatches() { for (RecordBatch batch : incomplete.all()) { - Deque<RecordBatch> dq = dequeFor(batch.topicPartition); + Deque<RecordBatch> dq = getDeque(batch.topicPartition); // Close the batch before aborting synchronized (dq) { batch.records.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index db8918c..29077b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -202,7 +202,7 @@ public class Sender implements Runnable { } } - List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); + List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); http://git-wip-us.apache.org/repos/asf/kafka/blob/e7d04c25/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 904aa73..a39d2e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -316,11 +316,11 @@ public class RecordAccumulatorTest { // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -330,11 +330,11 @@ public class RecordAccumulatorTest { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -351,16 +351,16 @@ public class RecordAccumulatorTest { // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); }
