Github user dfdemar commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2790#discussion_r207761956
  
    --- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
    @@ -17,73 +17,140 @@
      */
     package org.apache.storm.kafka.bolt;
     
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.MatcherAssert.assertThat;
     import static org.mockito.ArgumentMatchers.any;
    -import static org.mockito.ArgumentMatchers.argThat;
     import static org.mockito.Mockito.mock;
     import static org.mockito.Mockito.verify;
    -import static org.mockito.Mockito.when;
     
     import java.util.Arrays;
     import java.util.HashMap;
     import java.util.Map;
     import java.util.Properties;
    -import org.apache.kafka.clients.producer.Callback;
    -import org.apache.kafka.clients.producer.KafkaProducer;
    +
    +import org.apache.kafka.clients.producer.MockProducer;
    +import org.apache.kafka.clients.producer.Producer;
     import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.common.Cluster;
    +import org.apache.kafka.common.KafkaException;
     import org.apache.storm.Testing;
     import org.apache.storm.task.OutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.testing.MkTupleParam;
     import org.apache.storm.tuple.Tuple;
     import org.junit.Test;
    -import org.mockito.ArgumentMatcher;
    -import org.mockito.invocation.InvocationOnMock;
    -import org.mockito.stubbing.Answer;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     public class KafkaBoltTest {
         private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
    -    
    -    @SuppressWarnings({ "unchecked", "serial" })
    -    @Test
    -    public void testSimple() {
    -        final KafkaProducer<String, String> producer = 
mock(KafkaProducer.class);
    -        when(producer.send(any(), any())).thenAnswer(new Answer<Object>() {
    -            @Override
    -            public Object answer(InvocationOnMock invocation) throws 
Throwable {
    -                Callback c = (Callback)invocation.getArguments()[1];
    -                c.onCompletion(null, null);
    -                return null;
    -            }
    -        });
    -        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
    +
    +    private <K, V> KafkaBolt<K, V> makeBolt(Producer<K, V> producer) {
    +        KafkaBolt<K, V> bolt = new KafkaBolt<K, V>() {
                 @Override
    -            protected KafkaProducer<String, String> mkProducer(Properties 
props) {
    +            protected Producer<K, V> mkProducer(Properties props) {
                     return producer;
                 }
             };
             bolt.withTopicSelector("MY_TOPIC");
    -        
    +
    +        return bolt;
    +    }
    +
    +    private Tuple createTestTuple(String... values) {
    +        MkTupleParam param = new MkTupleParam();
    +        param.setFields("key", "message");
    +        return Testing.testTuple(Arrays.asList(values), param);
    +    }
    +
    +    @Test
    +    public void testSimple() {
    +        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
    +        KafkaBolt<String, String> bolt = makeBolt(producer);
    +
             OutputCollector collector = mock(OutputCollector.class);
             TopologyContext context = mock(TopologyContext.class);
             Map<String, Object> conf = new HashMap<>();
             bolt.prepare(conf, context, collector);
    -        MkTupleParam param = new MkTupleParam();
    -        param.setFields("key", "message");
    -        Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), 
param);
    +
    +        String key = "KEY";
    +        String value = "VALUE";
    +        Tuple testTuple = createTestTuple(key, value);
             bolt.execute(testTuple);
    -        verify(producer).send(argThat(new 
ArgumentMatcher<ProducerRecord<String, String>>() {
    -            @Override
    -            public boolean matches(ProducerRecord<String, String> arg) {
    -                LOG.info("GOT {} ->", arg);
    -                LOG.info("  {} {} {}", arg.topic(), arg.key(), 
arg.value());
    -                return "MY_TOPIC".equals(arg.topic()) &&
    -                        "KEY".equals(arg.key()) &&
    -                        "VALUE".equals(arg.value());
    -            }
    -        }), any(Callback.class));
    +
    +        assertThat(producer.history().size(), is(1));
    +        ProducerRecord<String, String> arg = producer.history().get(0);
    +
    +        LOG.info("GOT {} ->", arg);
    +        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
    +        assertThat(arg.topic(), is("MY_TOPIC"));
    +        assertThat(arg.key(), is(key));
    +        assertThat(arg.value(), is(value));
    +
    +        // Complete the send
    +        producer.completeNext();
             verify(collector).ack(testTuple);
         }
     
    +    @Test
    +    public void testSimpleWithError() {
    +        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
    +        KafkaBolt<String, String> bolt = makeBolt(producer);
    +
    +        OutputCollector collector = mock(OutputCollector.class);
    +        TopologyContext context = mock(TopologyContext.class);
    +        Map<String, Object> conf = new HashMap<>();
    +        bolt.prepare(conf, context, collector);
    +
    +        String key = "KEY";
    +        String value = "VALUE";
    +        Tuple testTuple = createTestTuple(key, value);
    +        bolt.execute(testTuple);
    +
    +        assertThat(producer.history().size(), is(1));
    +        ProducerRecord<String, String> arg = producer.history().get(0);
    +
    +        LOG.info("GOT {} ->", arg);
    +        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
    +        assertThat(arg.topic(), is("MY_TOPIC"));
    +        assertThat(arg.key(), is(key));
    +        assertThat(arg.value(), is(value));
    +
    +        // Force a send error
    +        KafkaException ex = new KafkaException();
    +        producer.errorNext(ex);
    +        verify(collector).reportError(ex);
    +        verify(collector).fail(testTuple);
    +    }
    +
    +    @Test
    +    public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
    +        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
    +        KafkaBolt<String, String> bolt = makeBolt(producer);
    +
    +        PreparableCallback customCallback = mock(PreparableCallback.class);
    +        bolt.withProducerCallback(customCallback);
    +
    +        OutputCollector collector = mock(OutputCollector.class);
    +        bolt.prepare(new HashMap<>(), mock(TopologyContext.class), 
collector);
    +
    +        String key = "KEY";
    +        String value = "VALUE";
    +        Tuple testTuple = createTestTuple(key, value);
    +        bolt.execute(testTuple);
    +
    +        assertThat(producer.history().size(), is(1));
    +        ProducerRecord<String, String> arg = producer.history().get(0);
    +
    +        LOG.info("GOT {} ->", arg);
    +        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
    +        assertThat(arg.topic(), is("MY_TOPIC"));
    +        assertThat(arg.key(), is(key));
    +        assertThat(arg.value(), is(value));
    +
    +        // Complete the send
    +        producer.completeNext();
    +        verify(collector).ack(testTuple);
    +        verify(customCallback).onCompletion(any(), any());
    --- End diff --
    
    I was able to make it a little better. I went ahead and squashed the 
commits and it should be okay to merge now.


---

Reply via email to