So the option I was missing was topolgy.max.tuple.pending. It was null and so the kafka-spout really went crazy in reading the messages from kafka. After setting the above option to a saner value of 1000, I was able to resolve the ClosedChannelException as well as OutOfMemoryException.
Should we provide some default value to topology.max.spout.pending like 10,000 ? After all, the spout should not produce more if so many tuples are still pending. Also, there should be something in the storm logs that should indicate to the user that the tuples production rate by a spout/bolt is too high as compared to that of the bolt. The message should hint at changing topology.max.spout.pending too. These small changes will affect the usability a lot. On Fri, Dec 16, 2016 at 6:41 AM, Bobby Evans <[email protected]> wrote: > Are you using acking and/or do you have back-pressure enabled? Your worker > crashed because it exceeded the GC overhead limit which by default in java > means that you were spending moe than 98% of the time doing GC and only 2% > of the time doing real work. I am rather surprised that the supervisor > didn't shoot your worker and relaunch it. Because the worker will > typically have issues heartbeating in to the supervisor before it gets into > this situation. enabling acking with max spout pending set or turing on > backpressure should make it so your topology is less likely to die. You > may also need to tune how much memory you are giving your worker. Also I > would really like to know the stack trace on the exceptions you are > seeing. They could be caused by high GC overhead, or they could indicate > that some other system you are talking to, probably solr in this case, is > in trouble and is closing connections unexpectedly. > > - Bobby > > On Thursday, December 15, 2016 8:15 PM, S G <[email protected]> > wrote: > > > Hi, > > I am using Storm 1.0.2 > My configuration is quite simple: `kafka-spout` feeding to `solr-bolt` > > topology.workers = 2 > spout.parallelism = 1 > bolt.parallelism = 1 > > > Our messages coming from kafka are large: around 100kb per message to max > of 500kb per message. > > > > But I see lots of errors: > > Window Emitted Transferred Complete latency (ms) Acked > Failed > 10m 0s 355,160 355,161 15,263 > 29,040 340,823 > > And after running for 30 minutes, the kafka-spout goes OutOfMemory: > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at org.apache.storm.kafka.PartitionManager.fail(PartitionManager.java:281) > at org.apache.storm.kafka.KafkaSpout.fail(KafkaSpout.java:173) > at org.apache.storm.daemon.executor$fail_spout_msg. > invoke(executor.clj:439) > at org.apache.storm.daemon.executor$fn$reify__7993. > expire(executor.clj:512) > at org.apache.storm.utils.RotatingMap.rotate(RotatingMap.java:77) > at > org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke( > executor.clj:517) > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__ > 7979.invoke(executor.clj:467) > at > org.apache.storm.disruptor$clojure_handler$reify__7492. > onEvent(disruptor.clj:40) > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor( > DisruptorQueue.java:451) > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable( > DisruptorQueue.java:430) > at > org.apache.storm.utils.DisruptorQueue.consumeBatch( > DisruptorQueue.java:420) > at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn_ > _8036.invoke(executor.clj:628) > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > at clojure.lang.AFn.run(AFn.java:22) > at java.lang.Thread.run(Thread.java:745) > > > > In the worker.log I see lots of ERRORs like (just in a duration of 30 > minutes): > java.nio.channels.ClosedChannelException (24957 times) > java.net.ConnectException (107 times) > java.io.IOException (22 times) > > How do I debug this? > > Thanks > SG > > > >
