DRILL-5599: Notify StatusHandler that batch sending has failed even if channel is still open
close #857 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7e6571aa Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7e6571aa Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7e6571aa Branch: refs/heads/master Commit: 7e6571aa5d4c58185dbfa131de99354ea7dc6b4e Parents: dd55b5c Author: Arina Ielchiieva <[email protected]> Authored: Tue Jun 20 12:18:27 2017 +0300 Committer: Aman Sinha <[email protected]> Committed: Sat Jun 24 09:42:34 2017 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/rpc/RequestIdMap.java | 21 +++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7e6571aa/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java index a9c3012..804834c 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -47,7 +47,7 @@ class RequestIdMap { private final IntObjectHashMap<RpcOutcome<?>> map; public RequestIdMap() { - map = new IntObjectHashMap<RpcOutcome<?>>(); + map = new IntObjectHashMap<>(); } void channelClosed(Throwable ex) { @@ -82,7 +82,7 @@ class RequestIdMap { public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) { final int i = lastCoordinationId.incrementAndGet(); - final RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection); + final RpcListener<V> future = new RpcListener<>(handler, clazz, i, connection); final Object old; synchronized (map) { Preconditions.checkArgument(isOpen.get(), @@ -111,13 +111,16 @@ class RequestIdMap { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - removeFromMap(coordinationId); - if (future.channel().isActive()) { - throw new RpcException("Future failed"); - } else { - setException(new ChannelClosedException()); + try { + removeFromMap(coordinationId); + } finally { + final Throwable cause = future.cause(); + if (future.channel().isActive()) { + setException(cause == null ? new RpcException("Unknown ChannelFuture operation failure") : cause); + } else { + setException(cause == null ? new ChannelClosedException() : new ChannelClosedException(cause)); + } } } }
