Repository: kafka
Updated Branches:
  refs/heads/trunk 9c23d9355 -> 75e1cc8bc


kafka-2043; CompressionType is passed in each RecordAccumulator append; patched 
by Grant Henke; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75e1cc8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75e1cc8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75e1cc8b

Branch: refs/heads/trunk
Commit: 75e1cc8bc497e6aaa0dd05454d6c817ed0fb5e23
Parents: 9c23d93
Author: Grant Henke <granthe...@gmail.com>
Authored: Mon Apr 6 13:34:31 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Apr 6 13:34:31 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  3 +-
 .../producer/internals/RecordAccumulator.java   |  7 ++--
 .../internals/RecordAccumulatorTest.java        | 34 ++++++++++----------
 .../clients/producer/internals/SenderTest.java  |  8 ++---
 4 files changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ab26342..b91e2c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -216,6 +216,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         metricTags.put("client-id", clientId);
         this.accumulator = new 
RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                                  this.totalMemorySize,
+                                                 this.compressionType,
                                                  
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                                  retryBackoffMs,
                                                  
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
@@ -376,7 +377,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} 
partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = 
accumulator.append(tp, serializedKey, serializedValue, compressionType, 
callback);
+            RecordAccumulator.RecordAppendResult result = 
accumulator.append(tp, serializedKey, serializedValue, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is 
either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 88b4e4f..0e7ab29 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -59,6 +59,7 @@ public final class RecordAccumulator {
     private volatile AtomicInteger flushesInProgress;
     private int drainIndex;
     private final int batchSize;
+    private final CompressionType compression;
     private final long lingerMs;
     private final long retryBackoffMs;
     private final BufferPool free;
@@ -71,6 +72,7 @@ public final class RecordAccumulator {
      * 
      * @param batchSize The size to use when allocating {@link 
org.apache.kafka.common.record.MemoryRecords} instances
      * @param totalSize The maximum memory the record accumulator can use.
+     * @param compression The compression codec for the records
      * @param lingerMs An artificial delay time to add before declaring a 
records instance that isn't full ready for
      *        sending. This allows time for more records to arrive. Setting a 
non-zero lingerMs will trade off some
      *        latency for potentially better throughput due to more batching 
(and hence fewer, larger requests).
@@ -84,6 +86,7 @@ public final class RecordAccumulator {
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
+                             CompressionType compression,
                              long lingerMs,
                              long retryBackoffMs,
                              boolean blockOnBufferFull,
@@ -94,6 +97,7 @@ public final class RecordAccumulator {
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
         this.batchSize = batchSize;
+        this.compression = compression;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, 
Deque<RecordBatch>>();
@@ -139,10 +143,9 @@ public final class RecordAccumulator {
      * @param tp The topic/partition to which this record is being sent
      * @param key The key for the record
      * @param value The value for the record
-     * @param compression The compression codec for the record
      * @param callback The user-supplied callback to execute when the request 
is complete
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
value, CompressionType compression, Callback callback) throws 
InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
value, Callback callback) throws InterruptedException {
         if (closed)
             throw new IllegalStateException("Cannot send after the producer is 
closed.");
         // check if we have an in-progress batch

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index e379ac8..05e2929 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -64,13 +64,13 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 
100L, false, metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, 10L, 100L, false, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, CompressionType.NONE, null);
+            accum.append(tp1, key, value, null);
             assertEquals("No partitions should be ready.", 0, 
accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, CompressionType.NONE, null);
+        accum.append(tp1, key, value, null);
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, 
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -88,16 +88,16 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 
0L, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, 
null);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 
CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, new byte[2 * batchSize], null);
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
     }
 
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
lingerMs, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, value, CompressionType.NONE, null);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, value, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
@@ -114,12 +114,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 
100L, false, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, CompressionType.NONE, null);
+                accum.append(tp, key, value, null);
         }
         assertEquals("Partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
 
@@ -133,14 +133,14 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
0L, 100L, true, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % 
numParts), key, value, CompressionType.NONE, null);
+                            accum.append(new TopicPartition(topic, i % 
numParts), key, value, null);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -174,13 +174,13 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any 
retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
lingerMs, 100L, false, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  
CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, CompressionType.NONE, null);
+            accum.append(tp1, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, 
time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, 
result.nextReadyCheckDelayMs);
@@ -189,14 +189,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, CompressionType.NONE, null);
+            accum.append(tp3, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half 
remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data 
sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, CompressionType.NONE, null);
+            accum.append(tp2, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), 
result.readyNodes);
         // Note this can actually be < linger time because it may use delays 
from partitions that aren't sendable
@@ -207,9 +207,9 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 
1024, lingerMs, 100L, false, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 
1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, 
CompressionType.NONE, null);
+            accum.append(new TopicPartition(topic, i % 3), key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, 
time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         

http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 24274a6..8b1805d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -52,7 +52,7 @@ public class SenderTest {
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 
1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 
1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags);
     private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
@@ -72,7 +72,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, 
client.inFlightRequestCount());
@@ -99,7 +99,7 @@ public class SenderTest {
                                    time,
                                    "clientId");
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -116,7 +116,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), 
CompressionType.NONE, null).future;
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), 
null).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             
client.disconnect(client.requests().peek().request().destination());

Reply via email to