Repository: kafka Updated Branches: refs/heads/trunk 9c23d9355 -> 75e1cc8bc
kafka-2043; CompressionType is passed in each RecordAccumulator append; patched by Grant Henke; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75e1cc8b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75e1cc8b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75e1cc8b Branch: refs/heads/trunk Commit: 75e1cc8bc497e6aaa0dd05454d6c817ed0fb5e23 Parents: 9c23d93 Author: Grant Henke <granthe...@gmail.com> Authored: Mon Apr 6 13:34:31 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Apr 6 13:34:31 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../producer/internals/RecordAccumulator.java | 7 ++-- .../internals/RecordAccumulatorTest.java | 34 ++++++++++---------- .../clients/producer/internals/SenderTest.java | 8 ++--- 4 files changed, 28 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ab26342..b91e2c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -216,6 +216,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, + this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), @@ -376,7 +377,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 88b4e4f..0e7ab29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -59,6 +59,7 @@ public final class RecordAccumulator { private volatile AtomicInteger flushesInProgress; private int drainIndex; private final int batchSize; + private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; private final BufferPool free; @@ -71,6 +72,7 @@ public final class RecordAccumulator { * * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances * @param totalSize The maximum memory the record accumulator can use. + * @param compression The compression codec for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). @@ -84,6 +86,7 @@ public final class RecordAccumulator { */ public RecordAccumulator(int batchSize, long totalSize, + CompressionType compression, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, @@ -94,6 +97,7 @@ public final class RecordAccumulator { this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.batchSize = batchSize; + this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>(); @@ -139,10 +143,9 @@ public final class RecordAccumulator { * @param tp The topic/partition to which this record is being sent * @param key The key for the record * @param value The value for the record - * @param compression The compression codec for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e379ac8..05e2929 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -64,13 +64,13 @@ public class RecordAccumulatorTest { @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -88,16 +88,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -114,12 +114,12 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List<TopicPartition> partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); + accum.append(tp, key, value, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -133,14 +133,14 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null); } catch (Exception e) { e.printStackTrace(); } @@ -174,13 +174,13 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -189,14 +189,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, CompressionType.NONE, null); + accum.append(tp3, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, CompressionType.NONE, null); + accum.append(tp2, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -207,9 +207,9 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/75e1cc8b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 24274a6..8b1805d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -52,7 +52,7 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map<String, String> metricTags = new LinkedHashMap<String, String>(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination());