[
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kush Khandelwal updated STORM-3046:
-----------------------------------
Description:
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients
version 1.1.0.
We have an external kafka from where we get the messages.
Whenever I try to run the topology, I get a NPE, which leads to the worker
getting died.
Can someone please help me fix the issue?
Thanks.
Error -
10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR
o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.NullPointerException
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.1.jar:1.2.1]
... 6 more
Topology class -
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;
import java.util.Properties;
public class TestTopology {
private static StormTopology buildTopology(Properties stormProperties)
{
Properties kafkaProperties = getProperties("/kafka.properties");
TridentTopology topology = new TridentTopology();
Fields stageArguments = new Fields("test", "issue");
KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
"test")
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setRecordTranslator(new RecordTranslator(), stageArguments)
.build();
// If I set poll strategy to earliest and the topic already contains some
messages, it works fine.
// I have used a custom record translator which is working fine.
KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new
KafkaTridentSpoutOpaque(kafkaSpoutConfig);
Grouping partitionGroup = getPartitionGroup("test");
log.info("Creating Opaque-Trident-Kafka-Spout");
final Stream kafkaSpout =
topology.newStream(stormProperties.getProperty("SPOUT_NAME"),
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
TridentState testUpdate =
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(),
stageArguments).parallelismHint(1);
Stream viewUpdate =
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }
public static void main(String[] args) {
Config conf = new Config();
log.info("Topology config: " + conf);
Properties properties = getProperties("/storm-cluster.properties");
conf.setMessageTimeoutSecs(600);
log.info("Building Topology");
StormTopology topology = buildTopology(properties);
log.info(topology.toString());
log.info("Submitting handle-rule Topology");
try
{ LocalCluster cluster = new LocalCluster();
cluster.submitTopology("handle-rule", conf, topology); }
catch (Exception e)
{ e.printStackTrace(); }
}
}
was:
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients
version 1.1.0.
We have an external kafka from where we get the messages.
Whenever I try to run the topology, I get a NPE, which leads to the worker
getting died.
Can someone please help me fix the issue?
Thanks.
Error -
10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR
o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.NullPointerException
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
~[storm-kafka-client-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.1.jar:1.2.1]
... 6 more
Topology class -
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;
import java.util.Properties;
public class TestTopology {
private static StormTopology buildTopology(Properties stormProperties) {
Properties kafkaProperties = getProperties("/kafka.properties");
TridentTopology topology = new TridentTopology();
Fields stageArguments = new Fields("test", "issue");
KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
"test")
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setRecordTranslator(new RecordTranslator(), stageArguments)
.build();
// If I set poll strategy to earliest and the topic already contains some
messages, it works fine.
// I have used a custom record translator which is working fine.
KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new
KafkaTridentSpoutOpaque(kafkaSpoutConfig);
Grouping partitionGroup = getPartitionGroup("test");
log.info("Creating Opaque-Trident-Kafka-Spout");
final Stream kafkaSpout =
topology.newStream(stormProperties.getProperty("SPOUT_NAME"),
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
TridentState testUpdate =
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(),
stageArguments).parallelismHint(1);
Stream viewUpdate =
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
new UpdateView(), new Fields()).parallelismHint(2);
return topology.build();
}
public static void main(String[] args) {
Config conf = new Config();
log.info("Topology config: " + conf);
Properties properties = getProperties("/storm-cluster.properties");
conf.setMessageTimeoutSecs(600);
log.info("Building Topology");
StormTopology topology = buildTopology(properties);
log.info(topology.toString());
log.info("Submitting handle-rule Topology");
try {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("handle-rule", conf, topology);
} catch (Exception e) {
e.printStackTrace();
}
}
}
> Getting a NPE leading worker to die when starting a topology.
> -------------------------------------------------------------
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
> Issue Type: Bug
> Components: rm-kafka-client, trident
> Affects Versions: 1.2.1
> Reporter: Kush Khandelwal
> Priority: Blocker
> Labels: kafka, storm-kafka-client, trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients
> version 1.1.0.
> We have an external kafka from where we get the messages.
> Whenever I try to run the topology, I get a NPE, which leads to the worker
> getting died.
> Can someone please help me fix the issue?
> Thanks.
>
> Error -
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR
> o.a.s.util - Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> Caused by: java.lang.NullPointerException
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
> ~[storm-kafka-client-1.2.1.jar:1.2.1]
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
> ~[storm-kafka-client-1.2.1.jar:1.2.1]
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
> ~[storm-kafka-client-1.2.1.jar:1.2.1]
> at
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
> ~[storm-core-1.2.1.jar:1.2.1]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> ~[storm-core-1.2.1.jar:1.2.1]
> ... 6 more
>
>
> Topology class -
>
>
>
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>
> public class TestTopology {
>
> private static StormTopology buildTopology(Properties stormProperties)
> {
> Properties kafkaProperties = getProperties("/kafka.properties");
> TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"),
> "test")
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
> .setRecordTranslator(new RecordTranslator(), stageArguments)
> .build();
> // If I set poll strategy to earliest and the topic already contains some
> messages, it works fine.
> // I have used a custom record translator which is working fine.
> KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new
> KafkaTridentSpoutOpaque(kafkaSpoutConfig);
> Grouping partitionGroup = getPartitionGroup("test");
> log.info("Creating Opaque-Trident-Kafka-Spout");
> final Stream kafkaSpout =
> topology.newStream(stormProperties.getProperty("SPOUT_NAME"),
> kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
> TridentState testUpdate =
> kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new
> MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(),
> stageArguments).parallelismHint(1);
> Stream viewUpdate =
> ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
> new UpdateView(), new Fields()).parallelismHint(2); return topology.build();
> }
> public static void main(String[] args) {
> Config conf = new Config();
> log.info("Topology config: " + conf);
> Properties properties = getProperties("/storm-cluster.properties");
> conf.setMessageTimeoutSecs(600);
> log.info("Building Topology");
> StormTopology topology = buildTopology(properties);
> log.info(topology.toString());
> log.info("Submitting handle-rule Topology");
> try
> { LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("handle-rule", conf, topology); }
> catch (Exception e)
> { e.printStackTrace(); }
> }
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)