Repository: storm
Updated Branches:
  refs/heads/1.x-branch 620d2be86 -> 609fe10f5


STORM-2409: Storm-Kafka-Client KafkaSpout Support for Failed and Null Tuples
  - Created config property to make emit null tuples configurable
  - Ack directly null tuples that are not emitted
  - Added emit field to KafkaSpoutMessageId


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8670f60d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8670f60d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8670f60d

Branch: refs/heads/1.x-branch
Commit: 8670f60d459f4c27ea040bd2f401ecd22c4bf497
Parents: 3ebf81b
Author: Hugo Louro <[email protected]>
Authored: Mon Mar 13 19:29:12 2017 -0700
Committer: Hugo Louro <[email protected]>
Committed: Wed Mar 15 17:43:36 2017 -0700

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 124 +++++++++++--------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  33 +++--
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  21 +++-
 .../storm/kafka/spout/RecordTranslator.java     |   9 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  19 ++-
 5 files changed, 140 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index bbad9e8..207ba23 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -76,18 +76,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // 
Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private transient Map<TopicPartition, OffsetManager> acked;           // 
Tuples that were successfully acked. These tuples will be committed 
periodically when the commit timer expires, after consumer rebalance, or on 
close/deactivate
+    private transient Map<TopicPartition, OffsetManager> acked;         // 
Tuples that were successfully acked. These tuples will be committed 
periodically when the commit timer expires, or after a consumer rebalance, or 
during close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed. Not used if it's AutoCommitMode
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // 
Records that have been polled and are queued to be emitted in the nextTuple() 
call. One record is emitted per nextTuple()
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     // 
Records that have been polled and are queued to be emitted in the nextTuple() 
call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed. 
Not used if it's AutoCommitMode
+    private transient Timer refreshSubscriptionTimer;                   // 
Triggers when a subscription should be refreshed
     private transient TopologyContext context;
-    private transient Timer refreshSubscriptionTimer;                   // 
Used to say when a subscription should be refreshed
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
     }
-    
+
     //This constructor is here for testing
     KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, 
KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
         this.kafkaConsumerFactory = kafkaConsumerFactory;
@@ -106,7 +106,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         // Offset management
         firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
         // with AutoCommitMode, offset will be periodically committed in the 
background by Kafka consumer
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();  
+        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
 
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
@@ -150,9 +150,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             }
 
             retryService.retainAll(partitions);
-            
-            //Emitted messages for partitions that are no longer assigned to 
this spout can't be acked, and they shouldn't be retried. Remove them from 
emitted.
-            Set<TopicPartition> partitionsSet = new HashSet(partitions);
+
+            /*
+             * Emitted messages for partitions that are no longer assigned to 
this spout can't
+             * be acked and should not be retried, hence remove them from 
emitted collection.
+            */
+            Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
             Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
             while (msgIdIterator.hasNext()) {
                 KafkaSpoutMessageId msgId = msgIdIterator.next();
@@ -210,7 +213,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void nextTuple() {
-        try{
+        try {
             if (initialized) {
                 if (commit()) {
                     commitOffsetsForAckedTuples();
@@ -234,7 +237,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             throwKafkaConsumerInterruptedException();
         }
     }
-    
+
     private void throwKafkaConsumerInterruptedException() {
         //Kafka throws their own type of exception when interrupted.
         //Throw a new Java InterruptedException to ensure Storm can recognize 
the exception as a reaction to an interrupt.
@@ -247,8 +250,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final boolean poll = !waitingToEmit() 
-                       && ( numUncommittedOffsets < maxUncommittedOffsets || 
consumerAutoCommitMode);
+        final boolean poll = !waitingToEmit()
+                && (numUncommittedOffsets < maxUncommittedOffsets || 
consumerAutoCommitMode);
 
         if (!poll) {
             if (waitingToEmit()) {
@@ -301,13 +304,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // ======== emit  =========
     private void emit() {
-        while(!emitTupleIfNotEmitted(waitingToEmit.next()) && 
waitingToEmit.hasNext()) {
+        while (!emitTupleIfNotEmitted(waitingToEmit.next()) && 
waitingToEmit.hasNext()) {
             waitingToEmit.remove();
         }
     }
 
-    //Emits one tuple per record
-    //@return true if tuple was emitted
+    /**
+     * Creates a tuple from the kafka record and emits it if it was not yet 
emitted
+     *
+     * @param record to be emitted
+     * @return true if tuple was emitted. False if tuple has been acked or has 
been emitted and is pending ack or fail
+     */
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
@@ -317,37 +324,51 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         } else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
         } else {
-            boolean isScheduled = retryService.isScheduled(msgId);
-            if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-                final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
-                
-                if(consumerAutoCommitMode){
-                    if (tuple instanceof KafkaTuple) {
-                        collector.emit(((KafkaTuple)tuple).getStream(), tuple);
+            final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+            if (isEmitTuple(tuple)) {
+                final boolean isScheduled = retryService.isScheduled(msgId);
+                // not scheduled <=> never failed (i.e. never emitted), or 
scheduled and ready to be retried
+                if (!isScheduled || retryService.isReady(msgId)) {
+                    if (consumerAutoCommitMode) {
+                        if (tuple instanceof KafkaTuple) {
+                            collector.emit(((KafkaTuple) tuple).getStream(), 
tuple);
+                        } else {
+                            collector.emit(tuple);
+                        }
                     } else {
-                        collector.emit(tuple);
-                    }
-                }else{                
-                    if (tuple instanceof KafkaTuple) {
-                        collector.emit(((KafkaTuple)tuple).getStream(), tuple, 
msgId);
-                    } else {
-                        collector.emit(tuple, msgId);
-                    }
-                
-                    emitted.add(msgId);
-                    numUncommittedOffsets++;
-                
-                    if (isScheduled) { // Was scheduled for retry, now being 
re-emitted. Remove from schedule.
-                        retryService.remove(msgId);
+                        if (tuple instanceof KafkaTuple) {
+                            collector.emit(((KafkaTuple) tuple).getStream(), 
tuple, msgId);
+                        } else {
+                            collector.emit(tuple, msgId);
+                        }
+
+                        emitted.add(msgId);
+
+                        if (isScheduled) {  // Was scheduled for retry and 
re-emitted, so remove from schedule.
+                            retryService.remove(msgId);
+                        } else {            //New tuple, hence increment the 
uncommitted offset counter
+                            numUncommittedOffsets++;
+                        }
                     }
+                    LOG.trace("Emitted tuple [{}] for record [{}] with msgId 
[{}]", tuple, record, msgId);
+                    return true;
                 }
-                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
-                return true;
+            } else {
+                LOG.debug("Not emitting null tuple for record [{}] as defined 
in configuration.", record);
+                msgId.setEmitted(false);
+                ack(msgId);
             }
         }
         return false;
     }
 
+    /**
+     * Emits a tuple if it is not a null tuple, or if the spout is configured 
to emit null tuples
+     */
+    private boolean isEmitTuple(List<Object> tuple) {
+        return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
+    }
+
     private void commitOffsetsForAckedTuples() {
         // Find offsets that are ready to be committed for every topic 
partition
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new 
HashMap<>();
@@ -383,15 +404,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void ack(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        if(!emitted.contains(msgId)) {
-            LOG.debug("Received ack for tuple this spout is no longer 
tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
-            return;
-        }
-        
-        if (!consumerAutoCommitMode) {  // Only need to keep track of acked 
tuples if commits are not done automatically
-            acked.get(msgId.getTopicPartition()).add(msgId);
+        if (!emitted.contains(msgId)) {
+            if (msgId.isEmitted()) {
+                LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that " +
+                        "came from a topic-partition that this consumer group 
instance is no longer tracking " +
+                        "due to rebalance/partition reassignment. No action 
taken.", msgId);
+            } else {
+                LOG.debug("Received direct ack for message [{}], associated 
with null tuple", msgId);
+            }
+        } else {
+            if (!consumerAutoCommitMode) {  // Only need to keep track of 
acked tuples if commits are not done automatically
+                acked.get(msgId.getTopicPartition()).add(msgId);
+            }
+            emitted.remove(msgId);
         }
-        emitted.remove(msgId);
     }
 
     // ======== Fail =======
@@ -399,7 +425,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        if(!emitted.contains(msgId)) {
+        if (!emitted.contains(msgId)) {
             LOG.debug("Received fail for tuple this spout is no longer 
tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
@@ -460,7 +486,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
-        for (String stream: translator.streams()) {
+        for (String stream : translator.streams()) {
             declarer.declareStream(stream, translator.getFieldsFor(stream));
         }
     }
@@ -474,7 +500,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     @Override
-    public Map<String, Object> getComponentConfiguration () {
+    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> configuration = super.getComponentConfiguration();
         if (configuration == null) {
             configuration = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 2b81dea..920dca9 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,6 +18,13 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,13 +33,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.tuple.Fields;
-
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer 
to a consumer group, as well as the subscribing topics
  */
@@ -106,6 +106,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
         private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+        private boolean emitNullTuples = false;
         
         public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, SerializableDeserializer<V> valDes, String ... topics) {
             this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
@@ -402,7 +403,17 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
             return this;
         }
-        
+
+        /**
+         * Specifies if the spout should emit null tuples to the component 
downstream, or rather not emit and directly
+         * ack them. By default this parameter is set to false, which means 
that null tuples are not emitted.
+         * @param emitNullTuples sets if null tuples should or not be emitted 
downstream
+         */
+        public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
+            this.emitNullTuples = emitNullTuples;
+            return this;
+        }
+
         public KafkaSpoutConfig<K,V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -424,6 +435,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutRetryService retryService;
     private final long partitionRefreshPeriodMs;
+    private final boolean emitNullTuples;
 
     private KafkaSpoutConfig(Builder<K,V> builder) {
         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
@@ -439,6 +451,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         this.valueDes = builder.valueDes;
         this.valueDesClazz = builder.valueDesClazz;
         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+        this.emitNullTuples = builder.emitNullTuples;
     }
 
     public Map<String, Object> getKafkaProps() {
@@ -508,6 +521,10 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         return partitionRefreshPeriodMs;
     }
 
+    public boolean isEmitNullTuples() {
+        return emitNullTuples;
+    }
+
     @Override
     public String toString() {
         return "KafkaSpoutConfig{" +

http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 3cfad9d..dea57c4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -25,14 +25,25 @@ public class KafkaSpoutMessageId {
     private transient TopicPartition topicPart;
     private transient long offset;
     private transient int numFails = 0;
+    private boolean emitted;   // true if the record was emitted using a form 
of collector.emit(...).
+                               // false when skipping null tuples as 
configured by the user in KafkaSpoutConfig
 
     public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
-        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset());
+        this(consumerRecord, true);
+    }
+
+    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean 
emitted) {
+        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset(), emitted);
     }
 
     public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
+        this(topicPart, offset, true);
+    }
+
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean 
emitted) {
         this.topicPart = topicPart;
         this.offset = offset;
+        this.emitted = emitted;
     }
 
     public int partition() {
@@ -59,6 +70,14 @@ public class KafkaSpoutMessageId {
         return topicPart;
     }
 
+    public boolean isEmitted() {
+        return emitted;
+    }
+
+    public void setEmitted(boolean emitted) {
+        this.emitted = emitted;
+    }
+
     public String getMetadata(Thread currThread) {
         return "{" +
                 "topic-partition=" + topicPart +

http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
index 2e72c99..71af4d0 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -17,12 +17,14 @@
  */
 package org.apache.storm.kafka.spout;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
 
 /**
  * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a 
tuple.
@@ -34,7 +36,8 @@ public interface RecordTranslator<K, V> extends Serializable, 
Func<ConsumerRecor
      * Translate the ConsumerRecord into a list of objects that can be emitted
      * @param record the record to translate
      * @return the objects in the tuple.  Return a {@link KafkaTuple}
-     * if you want to route the tuple to a non-default stream
+     * if you want to route the tuple to a non-default stream.
+     * Return null to discard an invalid {@link ConsumerRecord} if {@link 
Builder#setEmitNullTuples(boolean)} is set to true
      */
     List<Object> apply(ConsumerRecord<K,V> record);
     

http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 57e0120..9f62b90 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.junit.Test;
 
 import java.util.HashMap;
 
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class KafkaSpoutConfigTest {
 
@@ -39,4 +39,13 @@ public class KafkaSpoutConfigTest {
         expected.put("enable.auto.commit", "false");
         assertEquals(expected, conf.getKafkaProps());
     }
+
+    @Test
+    public void test_setEmitNullTuples_true_true() {
+        final KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+                .setEmitNullTuples(true)
+                .build();
+
+        assertTrue("Failed to set emit null tuples to true", 
conf.isEmitNullTuples());
+    }
 }

Reply via email to