[ 
https://issues.apache.org/jira/browse/STORM-963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14658343#comment-14658343
 ] 

Jungtaek Lim commented on STORM-963:
------------------------------------

You seems not handle heartbeat tuple from multilang bolt process.
Please refer http://storm.apache.org/documentation/Multilang-protocol.html to 
how to handle heartbeat.
storm-multilang supports it, and AFAIK some 3rd party libraries support it.

Here's related log from jstack. It calls ShellBolt.die() because of heartbeat 
timeout.
{code}
Thread 3735: (state = IN_NATIVE)
 - java.io.FileInputStream.readBytes(byte[], int, int) @bci=0 (Compiled frame; 
information may be imprecise)
 - java.io.FileInputStream.read(byte[], int, int) @bci=4, line=255 (Compiled 
frame)
 - java.io.BufferedInputStream.read1(byte[], int, int) @bci=39, line=284 
(Compiled frame)
 - java.io.BufferedInputStream.read(byte[], int, int) @bci=49, line=345 
(Compiled frame)
 - sun.nio.cs.StreamDecoder.readBytes() @bci=135, line=284 (Compiled frame)
 - sun.nio.cs.StreamDecoder.implRead(char[], int, int) @bci=112, line=326 
(Compiled frame)
 - sun.nio.cs.StreamDecoder.read(char[], int, int) @bci=180, line=178 (Compiled 
frame)
 - java.io.InputStreamReader.read(char[], int, int) @bci=7, line=184 (Compiled 
frame)
 - java.io.Reader.read(char[]) @bci=5, line=140 (Interpreted frame)
 - org.apache.commons.io.IOUtils.copyLarge(java.io.Reader, java.io.Writer, 
char[]) @bci=8, line=2001 (Interpreted frame)
 - org.apache.commons.io.IOUtils.copyLarge(java.io.Reader, java.io.Writer) 
@bci=7, line=1980 (Interpreted frame)
 - org.apache.commons.io.IOUtils.copy(java.io.Reader, java.io.Writer) @bci=2, 
line=1957 (Interpreted frame)
 - org.apache.commons.io.IOUtils.copy(java.io.InputStream, java.io.Writer, 
java.nio.charset.Charset) @bci=15, line=1907 (Interpreted frame)
 - org.apache.commons.io.IOUtils.toString(java.io.InputStream, 
java.nio.charset.Charset) @bci=11, line=778 (Interpreted frame)
 - org.apache.commons.io.IOUtils.toString(java.io.InputStream) @bci=4, line=759 
(Interpreted frame)
 - backtype.storm.utils.ShellProcess.getErrorsString() @bci=11, line=138 
(Interpreted frame)
 - backtype.storm.utils.ShellProcess.getProcessTerminationInfoString() @bci=19, 
line=180 (Interpreted frame)
 - backtype.storm.task.ShellBolt.die(java.lang.Throwable) @bci=21, line=280 
(Interpreted frame)
 - backtype.storm.task.ShellBolt.access$400(backtype.storm.task.ShellBolt, 
java.lang.Throwable) @bci=2, line=69 (Compiled frame)
 - backtype.storm.task.ShellBolt$BoltHeartbeatTimerTask.run() @bci=81, line=305 
(Compiled frame)
 - java.util.concurrent.Executors$RunnableAdapter.call() @bci=4, line=511 
(Compiled frame)
 - java.util.concurrent.FutureTask.runAndReset() @bci=47, line=308 (Compiled 
frame)
 - 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask)
 @bci=1, line=180 (Compiled frame)
 - java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run() 
@bci=37, line=294 (Compiled frame)
 - 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
 @bci=95, line=1142 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617 
(Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
{code}

And here's related log from worker log.
{code}
2015-08-03T16:01:25.762+0200 b.s.t.ShellBolt [ERROR] Halting process: ShellBolt 
died.
java.lang.RuntimeException: subprocess heartbeat timeout
        at 
backtype.storm.task.ShellBolt$BoltHeartbeatTimerTask.run(ShellBolt.java:305) 
[storm-core-0.9.5.jar:0.9.5]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_45]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_45]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_45]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_45]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_45]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
{code}

In other words, topology is not frozen, but workers were suicided themselves 
cause of heartbeat timeout.

> Frozen topology (KafkaSpout + Multilang bolt)
> ---------------------------------------------
>
>                 Key: STORM-963
>                 URL: https://issues.apache.org/jira/browse/STORM-963
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.9.4, 0.9.5, 0.9.6
>         Environment: - VMware ESX 5.5 
> - Ubuntu Server 14.04 LTS (kernel 3.16.0-41-generic)
> - Java (TM) SE Runtime Environment (build 1.8.0_45-b14)
> - Python 2.7.6 (default, Jun 22 2015, 17:58:13)
> - Zookeeper 3.4.6
>            Reporter: Alex Sobrino
>              Labels: multilang
>
> Hi,
> We've got a pretty simple topology running with Storm 0.9.5 (tried also with 
> 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
> {code}kafkaSpout (3) -----> processBolt (12){code}
> Some info:
> - kafkaSpout reads from a topic with 3 partitions and 2 replications
> - processBolt iterates throught the message and saves the results in MongoDB
> - processBolt is implemented in Python and has a storm.log("I'm doing 
> something") just to add a simple debug message in the logs
> - The messages can be quite big (~25-40 MB) and are in JSON format
> - The kafka topic has a retention of 2 hours
> - We use the same ZooKeeper cluster to both Kafka and Storm
> The topology gets frozen after several hours (not days) running. We don't see 
> any message in the logs... In fact, the periodic message from s.k.KafkaUtils 
> and s.k.ZkCoordinator disapears. As you can imagine, the message from the 
> Bolt also dissapears. Logs are copy/pasted further on. If we redeploy the 
> topology everything starts to work again until it becomes frozen again.
> Our kafkaSpout config is:
> {code}
> ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", 
> "/topic/ourclientid", "ourclientid");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.fetchSizeBytes = 50*1024*1024;
> kafkaConfig.bufferSizeBytes = 50*1024*1024;
> {code}
> We've also tried setting the following options
> {code}
> kafkaConfig.forceFromStart = true;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // Also 
> with kafka.api.OffsetRequest.LatestTime();
> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
> {code}
> Right now the topology is running without acking the messages since there's a 
> bug in kafkaSpout with failed messages and deleted offsets in Kafka.
> This is what can be seen in the logs in one of the workers:
> {code}
> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing 
> partition manager connections
> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 
> 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned 
> [Partition{host=kafka2, partition=1}]
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted 
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New 
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished 
> refreshing
> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, 
> name:processBolt I'm doing something
> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing 
> partition manager connections
> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 
> 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned 
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted 
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New 
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished 
> refreshing
> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing 
> partition manager connections
> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 
> 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned 
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted 
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New 
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished 
> refreshing
> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing 
> partition manager connections
> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 
> 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned 
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted 
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New 
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished 
> refreshing
> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing 
> partition manager connections
> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 
> 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned 
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted 
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New 
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished 
> refreshing
> {code}
> and then it becomes frozen. Nothing is written into the nimbus log. We've 
> checked the offsets in ZooKeeper and they're not updated:
> {code}
> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
> cZxid = 0x100028958
> ctime = Wed Jul 01 12:22:36 CEST 2015
> mZxid = 0x100518527
> mtime = Thu Jul 23 12:42:41 CEST 2015
> pZxid = 0x100028958
> cversion = 0
> dataVersion = 446913
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 183
> numChildren = 0
> {code}
> Any ideas of what we could be missing?
> PS: This was sent to the Storm user's mailing list and got 0 replies :\



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to