Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r189821240
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +92,34 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC
server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
}
}
}
}
- private void reconnectClient(DRPCInvocationsClient client) {
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- client.reconnectClient(); //Blocking call
- } catch (TException e2) {
- LOG.error("Failed to connect to DRPC server", e2);
+ private DistributedRPCInvocations.Iface getDRPCClient(String host, int
port) {
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface)
ServiceRegistry.getService(host);
+ } else {
+ List server = new ArrayList() {
+ {
+ add(host);
+ add(port);
+ }
+ };
+ if (!_clients.containsKey(server)) {
+ try {
+ DRPCInvocationsClient oldClient = _clients.put(server,
new DRPCInvocationsClient(_conf, host, port));
--- End diff --
Now it loses the cache functionality. Instead of this way, can we
invalidate cache when we find that the client is broken?
---