Stig Rohde Døssing created STORM-3082:
-----------------------------------------

             Summary: NamedTopicFilter can't handle topics that don't exist yet
                 Key: STORM-3082
                 URL: https://issues.apache.org/jira/browse/STORM-3082
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 1.2.1, 2.0.0
            Reporter: Stig Rohde Døssing


[~aniket.alhat] reported on the mailing list that he got an NPE when trying to 
start the Trident spout.

{code}
2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
java.lang.NullPointerException: null
        at 
org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100)
 ~[stormjar.jar:?]
        at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50)
 ~[stormjar.jar:?]
        at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245)
 ~[storm-core-1.2.1.jar:1.2.1]
        at 
org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) 
~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) 
[storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
{code}

It looks to me like the partitionsFor method on the consumer will return null 
if the specified topic doesn't exist. We didn't account for this in the filter, 
because the return type of the method is a List, and we assumed it wouldn't be 
null.

I think it's reasonable that people should be able to subscribe to topics that 
don't exist yet, and the spout should pick up the new topics eventually.

We should check for null here 
https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55,
 and maybe log a warning if the returned value is null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to