Repository: kafka Updated Branches: refs/heads/trunk 017c00caf -> d31a2c238
kafka-2232; make MockProducer generic; patched by Alexander Pakulov; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d31a2c23 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d31a2c23 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d31a2c23 Branch: refs/heads/trunk Commit: d31a2c2381bebc9c4b27e36fdf986183732e13eb Parents: 017c00c Author: Alexander Pakulov <a.paku...@gmail.com> Authored: Fri Jun 12 14:16:03 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jun 12 14:16:03 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/MockProducer.java | 53 ++++++++++++-------- .../clients/producer/MockProducerTest.java | 31 ++++++++++-- .../org/apache/kafka/test/MockSerializer.java | 1 - 3 files changed, 58 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index e66491c..36e7ffa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.*; +import org.apache.kafka.common.serialization.Serializer; /** @@ -38,14 +39,16 @@ import org.apache.kafka.common.*; * By default this mock will synchronously complete each send call successfully. However it can be configured to allow * the user to control the completion of the call and supply an optional error for the producer to throw. */ -public class MockProducer implements Producer<byte[], byte[]> { +public class MockProducer<K, V> implements Producer<K, V> { private final Cluster cluster; - private final Partitioner partitioner = new DefaultPartitioner(); - private final List<ProducerRecord<byte[], byte[]>> sent; + private final Partitioner partitioner; + private final List<ProducerRecord<K, V>> sent; private final Deque<Completion> completions; private boolean autoComplete; private Map<TopicPartition, Long> offsets; + private final Serializer<K> keySerializer; + private final Serializer<V> valueSerializer; /** * Create a mock producer @@ -55,31 +58,37 @@ public class MockProducer implements Producer<byte[], byte[]> { * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link * java.util.concurrent.Future Future<RecordMetadata>} that is returned. + * @param partitioner The partition strategy + * @param keySerializer The serializer for key that implements {@link Serializer}. + * @param valueSerializer The serializer for value that implements {@link Serializer}. */ - public MockProducer(Cluster cluster, boolean autoComplete) { + public MockProducer(Cluster cluster, boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this.cluster = cluster; this.autoComplete = autoComplete; + this.partitioner = partitioner; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; this.offsets = new HashMap<TopicPartition, Long>(); - this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>(); + this.sent = new ArrayList<ProducerRecord<K, V>>(); this.completions = new ArrayDeque<Completion>(); } /** - * Create a new mock producer with invented metadata the given autoComplete setting. + * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers * - * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)} + * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ - public MockProducer(boolean autoComplete) { - this(Cluster.empty(), autoComplete); + public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); } /** - * Create a new auto completing mock producer + * Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers * - * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)} + * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer)} */ - public MockProducer() { - this(true); + public MockProducer(boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + this(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer); } /** @@ -88,7 +97,7 @@ public class MockProducer implements Producer<byte[], byte[]> { * @see #history() */ @Override - public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record) { + public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } @@ -98,7 +107,7 @@ public class MockProducer implements Producer<byte[], byte[]> { * @see #history() */ @Override - public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) { + public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { int partition = 0; if (this.cluster.partitionsForTopic(record.topic()) != null) partition = partition(record, this.cluster); @@ -154,8 +163,8 @@ public class MockProducer implements Producer<byte[], byte[]> { /** * Get the list of sent records since the last call to {@link #clear()} */ - public synchronized List<ProducerRecord<byte[], byte[]>> history() { - return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent); + public synchronized List<ProducerRecord<K, V>> history() { + return new ArrayList<ProducerRecord<K, V>>(this.sent); } /** @@ -193,10 +202,11 @@ public class MockProducer implements Producer<byte[], byte[]> { /** * computes partition for given record. */ - private int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) { + private int partition(ProducerRecord<K, V> record, Cluster cluster) { Integer partition = record.partition(); + String topic = record.topic(); if (partition != null) { - List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // they have given us a partition, use it if (partition < 0 || partition >= numPartitions) @@ -206,10 +216,11 @@ public class MockProducer implements Producer<byte[], byte[]> { + "]."); return partition; } - return this.partitioner.partition(record.topic(), null, record.key(), null, record.value(), cluster); + byte[] keyBytes = keySerializer.serialize(topic, record.key()); + byte[] valueBytes = valueSerializer.serialize(topic, record.value()); + return this.partitioner.partition(topic, record.key(), keyBytes, record.value(), valueBytes, cluster); } - private static class Completion { private final long offset; private final RecordMetadata metadata; http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 6372f1a..7a46c56 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -17,14 +17,22 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.test.MockSerializer; import org.junit.Test; public class MockProducerTest { @@ -34,23 +42,36 @@ public class MockProducerTest { @Test @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { - MockProducer producer = new MockProducer(true); + MockProducer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, new MockSerializer(), new MockSerializer()); ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes()); Future<RecordMetadata> metadata = producer.send(record); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); assertEquals("Offset should be 0", 0L, metadata.get().offset()); assertEquals(topic, metadata.get().topic()); - assertEquals("We should have the record in our history", asList(record), producer.history()); + assertEquals("We should have the record in our history", singletonList(record), producer.history()); + producer.clear(); + assertEquals("Clear should erase our history", 0, producer.history().size()); + } + + @Test + public void testPartitioner() throws Exception { + PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); + PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); + Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1)); + MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); + ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value"); + Future<RecordMetadata> metadata = producer.send(record); + assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); } @Test public void testManualCompletion() throws Exception { - MockProducer producer = new MockProducer(false); - ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes()); - ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes()); + MockProducer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(false, new MockSerializer(), new MockSerializer()); + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>(topic, "key1".getBytes(), "value1".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>(topic, "key2".getBytes(), "value2".getBytes()); Future<RecordMetadata> md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future<RecordMetadata> md2 = producer.send(record2); http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/test/java/org/apache/kafka/test/MockSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index e75d2e4..0348258 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -31,7 +31,6 @@ public class MockSerializer implements Serializer<byte[]> { @Override public void configure(Map<String, ?> configs, boolean isKey) { - } @Override