Hi,
I am grad student, and I am working on Storm as a part of my course project.
I am trying optimizing the acking.
I have created a new field in TupleImpl: which is a long.
I have created two methods:
1. for fetching the value of that variable.
2. for incrementing.
public long getCurrTupleCount()
{
return currTupleCount;
}
public void incrementCurrTupleCount()
{
currTupleCount = currTupleCount+1;
}
I am calling these functions from bolt-emit.
(if (= 1 (.get task_counter))
((.incrementCurrTupleCount a)
(log-message " VIBHA: Incremented CurrTupleCount:")))
(log-message "VIBHA: CurrTupleCount: " (.getCurrTupleCount a))
I am getting following error during runtime:
If I comment the above code, then the issue does not occur.
Am I doing some basic mistake?
I am new to clojure. Can anyone please look into it?
Thanks alot!
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:448)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:414)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.daemon.executor$fn__6297$fn__6310$fn__6389.invoke(executor.clj:913)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at org.apache.storm.util$async_loop$fn__560.invoke(util.clj:484)
[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_99]
Caused by: java.lang.NullPointerException
at
org.apache.storm.daemon.executor$fn__6297$fn__6310$bolt_emit__6316.invoke(executor.clj:793)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.daemon.executor$fn__6297$fn$reify__6331.emit_tokens(executor.clj:852)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.task.OutputCollector.emit_tokens(OutputCollector.java:208)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.starter.ExclamationTopology_3_out_task$ExclamationBolt.execute(ExclamationTopology_3_out_task.java:54)
~[stormjar.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.daemon.executor$fn__6297$tuple_action_fn__6299.invoke(executor.clj:742)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__6212.invoke(executor.clj:463)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.disruptor$clojure_handler$reify__5728.onEvent(disruptor.clj:40)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:435)
~[storm-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]