Repository: nifi Updated Branches: refs/heads/master 608287f9f -> bef3fc8b4
NIFI-1301: Ensure that when creating site-to-site connection, if remote instance is applying backpressure that we do not block indefinitely waiting for the connection to be made Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bef3fc8b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bef3fc8b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bef3fc8b Branch: refs/heads/master Commit: bef3fc8b40cec21fd7ce86fde3e812fd348f1341 Parents: 608287f Author: Mark Payne <[email protected]> Authored: Thu Dec 17 15:40:00 2015 -0500 Committer: joewitt <[email protected]> Committed: Fri Dec 18 16:08:41 2015 -0500 ---------------------------------------------------------------------- .../client/socket/EndpointConnectionPool.java | 62 ++++++++++---------- 1 file changed, 32 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bef3fc8b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 88a34aa..18075db 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -250,26 +250,26 @@ public class EndpointConnectionPool { EndpointConnection connection; Peer peer = null; - logger.debug("{} getting next peer status", this); - final PeerStatus peerStatus = getNextPeerStatus(direction); - logger.debug("{} next peer status = {}", this, peerStatus); - if (peerStatus == null) { - return null; - } + do { + final List<EndpointConnection> addBack = new ArrayList<>(); + logger.debug("{} getting next peer status", this); + final PeerStatus peerStatus = getNextPeerStatus(direction); + logger.debug("{} next peer status = {}", this, peerStatus); + if (peerStatus == null) { + return null; + } - final PeerDescription peerDescription = peerStatus.getPeerDescription(); - BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription); - if (connectionQueue == null) { - connectionQueue = new LinkedBlockingQueue<>(); - BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); - if (existing != null) { - connectionQueue = existing; + final PeerDescription peerDescription = peerStatus.getPeerDescription(); + BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription); + if (connectionQueue == null) { + connectionQueue = new LinkedBlockingQueue<>(); + BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); + if (existing != null) { + connectionQueue = existing; + } } - } - final List<EndpointConnection> addBack = new ArrayList<>(); - try { - do { + try { connection = connectionQueue.poll(); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); final String portId = getPortIdentifier(direction); @@ -387,21 +387,23 @@ public class EndpointConnectionPool { protocol = connection.getSocketClientProtocol(); } } - } while (connection == null || codec == null || commsSession == null || protocol == null); - } catch (final Throwable t) { - if (commsSession != null) { - try { - commsSession.close(); - } catch (final IOException ioe) { + } catch (final Throwable t) { + if (commsSession != null) { + try { + commsSession.close(); + } catch (final IOException ioe) { + } } - } - throw t; - } finally { - if (!addBack.isEmpty()) { - connectionQueue.addAll(addBack); + throw t; + } finally { + if (!addBack.isEmpty()) { + connectionQueue.addAll(addBack); + addBack.clear(); + } } - } + + } while (connection == null || codec == null || commsSession == null || protocol == null); activeConnections.add(connection); return connection; @@ -773,7 +775,7 @@ public class EndpointConnectionPool { final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { - final double percentage = entry.getValue() * 100D / (double) destinations.size(); + final double percentage = entry.getValue() * 100D / destinations.size(); distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); } logger.info(distributionDescription.toString());
