Github user dfdemar commented on a diff in the pull request:
https://github.com/apache/storm/pull/2790#discussion_r207708527
--- Diff:
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
---
@@ -17,73 +17,140 @@
*/
package org.apache.storm.kafka.bolt;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.storm.Testing;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MkTupleParam;
import org.apache.storm.tuple.Tuple;
import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaBoltTest {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaBoltTest.class);
-
- @SuppressWarnings({ "unchecked", "serial" })
- @Test
- public void testSimple() {
- final KafkaProducer<String, String> producer =
mock(KafkaProducer.class);
- when(producer.send(any(), any())).thenAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws
Throwable {
- Callback c = (Callback)invocation.getArguments()[1];
- c.onCompletion(null, null);
- return null;
- }
- });
- KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+
+ private <K, V> KafkaBolt<K, V> makeBolt(Producer<K, V> producer) {
+ KafkaBolt<K, V> bolt = new KafkaBolt<K, V>() {
@Override
- protected KafkaProducer<String, String> mkProducer(Properties
props) {
+ protected Producer<K, V> mkProducer(Properties props) {
return producer;
}
};
bolt.withTopicSelector("MY_TOPIC");
-
+
+ return bolt;
+ }
+
+ private Tuple createTestTuple(String... values) {
+ MkTupleParam param = new MkTupleParam();
+ param.setFields("key", "message");
+ return Testing.testTuple(Arrays.asList(values), param);
+ }
+
+ @Test
+ public void testSimple() {
+ MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ KafkaBolt<String, String> bolt = makeBolt(producer);
+
OutputCollector collector = mock(OutputCollector.class);
TopologyContext context = mock(TopologyContext.class);
Map<String, Object> conf = new HashMap<>();
bolt.prepare(conf, context, collector);
- MkTupleParam param = new MkTupleParam();
- param.setFields("key", "message");
- Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"),
param);
+
+ String key = "KEY";
+ String value = "VALUE";
+ Tuple testTuple = createTestTuple(key, value);
bolt.execute(testTuple);
- verify(producer).send(argThat(new
ArgumentMatcher<ProducerRecord<String, String>>() {
- @Override
- public boolean matches(ProducerRecord<String, String> arg) {
- LOG.info("GOT {} ->", arg);
- LOG.info(" {} {} {}", arg.topic(), arg.key(),
arg.value());
- return "MY_TOPIC".equals(arg.topic()) &&
- "KEY".equals(arg.key()) &&
- "VALUE".equals(arg.value());
- }
- }), any(Callback.class));
+
+ assertThat(producer.history().size(), is(1));
+ ProducerRecord<String, String> arg = producer.history().get(0);
+
+ LOG.info("GOT {} ->", arg);
+ LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+ assertThat(arg.topic(), is("MY_TOPIC"));
+ assertThat(arg.key(), is(key));
+ assertThat(arg.value(), is(value));
+
+ // Complete the send
+ producer.completeNext();
verify(collector).ack(testTuple);
}
+ @Test
+ public void testSimpleWithError() {
+ MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ KafkaBolt<String, String> bolt = makeBolt(producer);
+
+ OutputCollector collector = mock(OutputCollector.class);
+ TopologyContext context = mock(TopologyContext.class);
+ Map<String, Object> conf = new HashMap<>();
+ bolt.prepare(conf, context, collector);
+
+ String key = "KEY";
+ String value = "VALUE";
+ Tuple testTuple = createTestTuple(key, value);
+ bolt.execute(testTuple);
+
+ assertThat(producer.history().size(), is(1));
+ ProducerRecord<String, String> arg = producer.history().get(0);
+
+ LOG.info("GOT {} ->", arg);
+ LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+ assertThat(arg.topic(), is("MY_TOPIC"));
+ assertThat(arg.key(), is(key));
+ assertThat(arg.value(), is(value));
+
+ // Force a send error
+ KafkaException ex = new KafkaException();
+ producer.errorNext(ex);
+ verify(collector).reportError(ex);
+ verify(collector).fail(testTuple);
+ }
+
+ @Test
+ public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
+ MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ KafkaBolt<String, String> bolt = makeBolt(producer);
+
+ PreparableCallback customCallback = mock(PreparableCallback.class);
+ bolt.withProducerCallback(customCallback);
+
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(new HashMap<>(), mock(TopologyContext.class),
collector);
+
+ String key = "KEY";
+ String value = "VALUE";
+ Tuple testTuple = createTestTuple(key, value);
+ bolt.execute(testTuple);
+
+ assertThat(producer.history().size(), is(1));
+ ProducerRecord<String, String> arg = producer.history().get(0);
+
+ LOG.info("GOT {} ->", arg);
+ LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+ assertThat(arg.topic(), is("MY_TOPIC"));
+ assertThat(arg.key(), is(key));
+ assertThat(arg.value(), is(value));
+
+ // Complete the send
+ producer.completeNext();
+ verify(collector).ack(testTuple);
+ verify(customCallback).onCompletion(any(), any());
--- End diff --
Do you mean in regards to the `any()`? I'll see if I can make it a little
better.
---