NIFI-492: When attempting to get a connection from the pool, if we create a new one and encounter an error, should ensure we close the new connection
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8d20b820 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8d20b820 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8d20b820 Branch: refs/heads/NIFI-271 Commit: 8d20b82095857e06ee58ae892039a5f10613b125 Parents: 39735c3 Author: Mark Payne <[email protected]> Authored: Tue Apr 7 14:16:54 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Tue Apr 7 14:16:54 2015 -0400 ---------------------------------------------------------------------- .../remote/client/socket/EndpointConnectionPool.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d20b820/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index daf52b4..1a6dfd5 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -314,6 +314,11 @@ public class EndpointConnectionPool { if ( protocol.isDestinationFull() ) { logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); penalize(peer, penalizationMillis); + try { + peer.close(); + } catch (final IOException ioe) { + } + continue; } else if ( protocol.isPortInvalid() ) { penalize(peer, penalizationMillis); @@ -359,6 +364,15 @@ public class EndpointConnectionPool { } } } while ( connection == null || codec == null || commsSession == null || protocol == null ); + } catch (final Throwable t) { + if ( commsSession != null ) { + try { + commsSession.close(); + } catch (final IOException ioe) { + } + } + + throw t; } finally { if ( !addBack.isEmpty() ) { connectionQueue.addAll(addBack);
