Repository: kafka
Updated Branches:
  refs/heads/trunk 017c00caf -> d31a2c238


kafka-2232; make MockProducer generic; patched by Alexander Pakulov; reviewed 
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d31a2c23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d31a2c23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d31a2c23

Branch: refs/heads/trunk
Commit: d31a2c2381bebc9c4b27e36fdf986183732e13eb
Parents: 017c00c
Author: Alexander Pakulov <a.paku...@gmail.com>
Authored: Fri Jun 12 14:16:03 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Fri Jun 12 14:16:03 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    | 53 ++++++++++++--------
 .../clients/producer/MockProducerTest.java      | 31 ++++++++++--
 .../org/apache/kafka/test/MockSerializer.java   |  1 -
 3 files changed, 58 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index e66491c..36e7ffa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.*;
+import org.apache.kafka.common.serialization.Serializer;
 
 
 /**
@@ -38,14 +39,16 @@ import org.apache.kafka.common.*;
  * By default this mock will synchronously complete each send call 
successfully. However it can be configured to allow
  * the user to control the completion of the call and supply an optional error 
for the producer to throw.
  */
-public class MockProducer implements Producer<byte[], byte[]> {
+public class MockProducer<K, V> implements Producer<K, V> {
 
     private final Cluster cluster;
-    private final Partitioner partitioner = new DefaultPartitioner();
-    private final List<ProducerRecord<byte[], byte[]>> sent;
+    private final Partitioner partitioner;
+    private final List<ProducerRecord<K, V>> sent;
     private final Deque<Completion> completions;
     private boolean autoComplete;
     private Map<TopicPartition, Long> offsets;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
 
     /**
      * Create a mock producer
@@ -55,31 +58,37 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
      *        the user must call {@link #completeNext()} or {@link 
#errorNext(RuntimeException)} after
      *        {@link #send(ProducerRecord) send()} to complete the call and 
unblock the @{link
      *        java.util.concurrent.Future Future&lt;RecordMetadata&gt;} that 
is returned.
+     * @param partitioner The partition strategy
+     * @param keySerializer The serializer for key that implements {@link 
Serializer}.
+     * @param valueSerializer The serializer for value that implements {@link 
Serializer}.
      */
-    public MockProducer(Cluster cluster, boolean autoComplete) {
+    public MockProducer(Cluster cluster, boolean autoComplete, Partitioner 
partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
+        this.partitioner = partitioner;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
         this.offsets = new HashMap<TopicPartition, Long>();
-        this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>();
+        this.sent = new ArrayList<ProducerRecord<K, V>>();
         this.completions = new ArrayDeque<Completion>();
     }
 
     /**
-     * Create a new mock producer with invented metadata the given 
autoComplete setting.
+     * Create a new mock producer with invented metadata the given 
autoComplete setting and key\value serializers
      *
-     * Equivalent to {@link #MockProducer(Cluster, boolean) new 
MockProducer(null, autoComplete)}
+     * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new 
DefaultPartitioner(), keySerializer, valueSerializer)}
      */
-    public MockProducer(boolean autoComplete) {
-        this(Cluster.empty(), autoComplete);
+    public MockProducer(boolean autoComplete, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) {
+        this(Cluster.empty(), autoComplete, new DefaultPartitioner(), 
keySerializer, valueSerializer);
     }
 
     /**
-     * Create a new auto completing mock producer
+     * Create a new mock producer with invented metadata the given 
autoComplete setting, partitioner and key\value serializers
      *
-     * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
+     * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, 
partitioner, keySerializer, valueSerializer)}
      */
-    public MockProducer() {
-        this(true);
+    public MockProducer(boolean autoComplete, Partitioner partitioner, 
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(Cluster.empty(), autoComplete, partitioner, keySerializer, 
valueSerializer);
     }
 
     /**
@@ -88,7 +97,7 @@ public class MockProducer implements Producer<byte[], byte[]> 
{
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], 
byte[]> record) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> 
record) {
         return send(record, null);
     }
 
@@ -98,7 +107,7 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], 
byte[]> record, Callback callback) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> 
record, Callback callback) {
         int partition = 0;
         if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partition(record, this.cluster);
@@ -154,8 +163,8 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
     /**
      * Get the list of sent records since the last call to {@link #clear()}
      */
-    public synchronized List<ProducerRecord<byte[], byte[]>> history() {
-        return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent);
+    public synchronized List<ProducerRecord<K, V>> history() {
+        return new ArrayList<ProducerRecord<K, V>>(this.sent);
     }
 
     /**
@@ -193,10 +202,11 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
     /**
      * computes partition for given record.
      */
-    private int partition(ProducerRecord<byte[], byte[]> record, Cluster 
cluster) {
+    private int partition(ProducerRecord<K, V> record, Cluster cluster) {
         Integer partition = record.partition();
+        String topic = record.topic();
         if (partition != null) {
-            List<PartitionInfo> partitions = 
cluster.partitionsForTopic(record.topic());
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
             int numPartitions = partitions.size();
             // they have given us a partition, use it
             if (partition < 0 || partition >= numPartitions)
@@ -206,10 +216,11 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
                                                    + "].");
             return partition;
         }
-        return this.partitioner.partition(record.topic(), null, record.key(), 
null, record.value(), cluster);
+        byte[] keyBytes = keySerializer.serialize(topic, record.key());
+        byte[] valueBytes = valueSerializer.serialize(topic, record.value());
+        return this.partitioner.partition(topic, record.key(), keyBytes, 
record.value(), valueBytes, cluster);
     }
 
-
     private static class Completion {
         private final long offset;
         private final RecordMetadata metadata;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 6372f1a..7a46c56 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -17,14 +17,22 @@
 package org.apache.kafka.clients.producer;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.test.MockSerializer;
 import org.junit.Test;
 
 public class MockProducerTest {
@@ -34,23 +42,36 @@ public class MockProducerTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
-        MockProducer producer = new MockProducer(true);
+        MockProducer<byte[], byte[]> producer = new MockProducer<byte[], 
byte[]>(true, new MockSerializer(), new MockSerializer());
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], 
byte[]>(topic, "key".getBytes(), "value".getBytes());
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
         assertEquals("Offset should be 0", 0L, metadata.get().offset());
         assertEquals(topic, metadata.get().topic());
-        assertEquals("We should have the record in our history", 
asList(record), producer.history());
+        assertEquals("We should have the record in our history", 
singletonList(record), producer.history());
+        producer.clear();
+        assertEquals("Clear should erase our history", 0, 
producer.history().size());
+    }
+
+    @Test
+    public void testPartitioner() throws Exception {
+        PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, 
null);
+        PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, 
null);
+        Cluster cluster = new Cluster(new ArrayList<Node>(0), 
asList(partitionInfo0, partitionInfo1));
+        MockProducer<String, String> producer = new MockProducer<String, 
String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new 
StringSerializer());
+        ProducerRecord<String, String> record = new ProducerRecord<String, 
String>(topic, "key", "value");
+        Future<RecordMetadata> metadata = producer.send(record);
+        assertEquals("Partition should be correct", 1, 
metadata.get().partition());
         producer.clear();
         assertEquals("Clear should erase our history", 0, 
producer.history().size());
     }
 
     @Test
     public void testManualCompletion() throws Exception {
-        MockProducer producer = new MockProducer(false);
-        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], 
byte[]>("topic", "key1".getBytes(), "value1".getBytes());
-        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], 
byte[]>("topic", "key2".getBytes(), "value2".getBytes());
+        MockProducer<byte[], byte[]> producer = new MockProducer<byte[], 
byte[]>(false, new MockSerializer(), new MockSerializer());
+        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], 
byte[]>(topic, "key1".getBytes(), "value1".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], 
byte[]>(topic, "key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d31a2c23/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
index e75d2e4..0348258 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
@@ -31,7 +31,6 @@ public class MockSerializer implements Serializer<byte[]> {
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-
     }
 
     @Override

Reply via email to