Each worker is listening for changes to the placement of downstream components.
If it sees one, it will adjust the routing and close/open new connections as
needed. Is the remote worker actually up on a different node? is there a
networking issues where the remote node is not reachable? We have had issues
where IP tables were not updated correctly and some nodes were not reachable.
- Bobby
On Sunday, August 9, 2015 6:31 PM, Daniel Li <[email protected]> wrote:
Recently we upgraded from storm 0.9.1 to 0.9.3. One observation is that,
when a downstream bolt is reassigned to a different server, the upstream
spout will bail out after reconnecting for 50 minutes - (1) default max
retries is 300; (2) every retry times out in 10 seconds. see code pointer
and log below. We end up setting the max retries to 6 (so max time in retry
is 6 x 10seconds = 1 minute).
*One question we have is regarding the reconnect to downstream bolt. *
When a downstream bolt is reassigned to a different node, we expect the
upstream spout/bolt reload topology allocation from Zookeeper after
reconnect fails to reach downstream bolt. However, current behavior is
that, the upstream spout/bolt just exits when reconnect to downstream bolt
fails; after it is restarted, it loads the right topology allocation.
Comments?
Code pointers and logs -
https://github.com/apache/storm/blob/v0.9.3/conf/defaults.yaml
storm.messaging.netty.max_retries: 300
https://raw.githubusercontent.com/apache/storm/v0.9.3/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
/**
* We will retry connection with exponential back-off policy
*/
private synchronized void connect() {
try {
Channel channel = channelRef.get();
if (channel != null && channel.isConnected()) {
return;
}
int tried = 0;
//setting channel to null to make sure we throw an exception
when reconnection fails
channel = null;
while (tried <= max_retries) {
LOG.info("Reconnect started for {}... [{}]", name(), tried);
LOG.debug("connection started...");
ChannelFuture future = bootstrap.connect(remote_addr);
future.awaitUninterruptibly();
Channel current = future.getChannel();
if (!future.isSuccess()) {
if (null != current) {
current.close();
}
} else {
channel = current;
break;
}
Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
tried++;
}
if (null != channel) {
LOG.info("connection established to a remote host " +
name() + ", " + channel.toString());
channelRef.set(channel);
} else {
close();
throw new RuntimeException("Remote address is not
reachable. We will close this client " + name());
}
} catch (InterruptedException e) {
throw new RuntimeException("connection failed " + name(), e);
}
}
The log for reference -
2015-07-20 15:55:48.288 b.s.m.n.Client [INFO] Reconnect started for
Netty-Client-CH1SCH060020712/10.153.3.160:6711... [0]
2015-07-20 15:55:48.289 b.s.m.n.Client [DEBUG] connection started...
2015-07-20 15:55:48.288 b.s.m.n.Client [DEBUG] 1 request(s) sent
2015-07-20 15:55:48.289 b.s.m.n.Client [DEBUG] 1 request(s) sent
2015-07-20 15:55:58.394 b.s.m.n.Client [INFO] Reconnect started for
Netty-Client-CH1SCH060020712/10.153.3.160:6711... [1]
2015-07-20 15:55:58.394 b.s.m.n.Client [DEBUG] connection started...
2015-07-20 15:56:08.501 b.s.m.n.Client [INFO] Reconnect started for
Netty-Client-CH1SCH060020712/10.153.3.160:6711... [2]
2015-07-20 15:56:08.501 b.s.m.n.Client [DEBUG] connection started...
…
2015-07-20 16:46:04.652 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1437435964,
:storm-id "BingLayercakeAllLayersCh1dTest2-48-1437429062", :executors
#{[328 328] [-1 -1]}, :port 6706}
2015-07-20 16:46:04.951 b.s.m.n.Client [INFO] Reconnect started for
Netty-Client-CH1SCH060020712/10.153.3.160:6711... [284]
2015-07-20 16:46:04.951 b.s.m.n.Client [DEBUG] connection started...
...
2015-07-20 16:48:59.798 b.s.m.n.Client [INFO] Reconnect started for
Netty-Client-CH1SCH060020712/10.153.3.160:6711... [300]
2015-07-20 16:48:59.798 b.s.m.n.Client [DEBUG] connection started...
..
2015-07-20 16:49:10.743 b.s.m.n.Client [INFO] Closing Netty Client
Netty-Client-CH1SCH060020712/10.153.3.160:6711
2015-07-20 16:49:10.743 b.s.m.n.Client [INFO] Waiting for pending batchs to
be sent with Netty-Client-CH1SCH060020712/10.153.3.160:6711..., timeout:
600000ms, pendings: 0
2015-07-20 16:49:10.743 b.s.m.n.Client [DEBUG] channel CH1SCH060020712/
10.153.3.160:6711 closed
2015-07-20 16:49:10.744 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
not reachable. We will close this client Netty-Client-CH1SCH060020712/
10.153.3.160:6711