Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r186308658
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC
server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
}
}
}
}
- private void reconnectClient(DRPCInvocationsClient client) {
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- client.reconnectClient(); //Blocking call
- } catch (TException e2) {
- LOG.error("Failed to connect to DRPC server", e2);
+ private DistributedRPCInvocations.Iface getDRPCClient(String host, int
port) {
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface)
ServiceRegistry.getService(host);
+ } else {
+ String server = getServer(host, port);
+ if (!_clients.containsKey(server)) {
+ try {
+ _clients.put(server, new DRPCInvocationsClient(_conf,
host, port));
+ } catch (org.apache.thrift.transport.TTransportException
ex) {
+ throw new RuntimeException(ex);
+ }
}
+ client = _clients.get(server);
}
+ return client;
+ }
+
+ private String getServer(String host, int port) {
+ return host + port;
--- End diff --
Better to keep it as List since bad case could be happen (when host ends
with number).
---