http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 1b5412c..36d8bac 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -91,21 +91,22 @@ import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; public class EndpointConnectionPool { + public static final long PEER_REFRESH_PERIOD = 60000L; public static final String CATEGORY = "Site-to-Site"; public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); - - private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); + + private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>(); private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); private final URI clusterUrl; private final String apiUri; - + private final AtomicLong peerIndex = new AtomicLong(0L); - + private final ReentrantLock peerRefreshLock = new ReentrantLock(); private volatile List<PeerStatus> peerStatuses; private volatile long peerRefreshTime = 0L; @@ -118,132 +119,129 @@ public class EndpointConnectionPool { private final ScheduledExecutorService taskExecutor; private final int idleExpirationMillis; private final RemoteDestination remoteDestination; - + private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); private Integer siteToSitePort; private Boolean siteToSiteSecure; private long remoteRefreshTime; - private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier - private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier - + private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier + private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier + private volatile int commsTimeout; private volatile boolean shutdown = false; - - - public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, - final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) - { - this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile); - } - + + public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, + final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) { + this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile); + } + public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, - final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) - { + final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { Objects.requireNonNull(clusterUrl, "URL cannot be null"); Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); - try { - this.clusterUrl = new URI(clusterUrl); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); - } - - // Trim the trailing / + try { + this.clusterUrl = new URI(clusterUrl); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); + } + + // Trim the trailing / String uriPath = this.clusterUrl.getPath(); if (uriPath.endsWith("/")) { uriPath = uriPath.substring(0, uriPath.length() - 1); } apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; - + this.remoteDestination = remoteDestination; - this.sslContext = sslContext; - this.peersFile = persistenceFile; - this.eventReporter = eventReporter; - this.commsTimeout = commsTimeoutMillis; - this.idleExpirationMillis = idleExpirationMillis; - - Set<PeerStatus> recoveredStatuses; - if ( persistenceFile != null && persistenceFile.exists() ) { - try { - recoveredStatuses = recoverPersistedPeerStatuses(peersFile); - this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); - } catch (final IOException ioe) { - logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); - } - } else { - peerStatusCache = null; - } - - // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused - // connections and keep our list of peers up-to-date. - taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(final Runnable r) { - final Thread thread = defaultFactory.newThread(r); - thread.setName("NiFi Site-to-Site Connection Pool Maintenance"); - return thread; - } - }); - - taskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - refreshPeers(); - } - }, 0, 5, TimeUnit.SECONDS); - - taskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - cleanupExpiredSockets(); - } - }, 5, 5, TimeUnit.SECONDS); - } - + this.sslContext = sslContext; + this.peersFile = persistenceFile; + this.eventReporter = eventReporter; + this.commsTimeout = commsTimeoutMillis; + this.idleExpirationMillis = idleExpirationMillis; + + Set<PeerStatus> recoveredStatuses; + if (persistenceFile != null && persistenceFile.exists()) { + try { + recoveredStatuses = recoverPersistedPeerStatuses(peersFile); + this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); + } catch (final IOException ioe) { + logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); + } + } else { + peerStatusCache = null; + } + + // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused + // connections and keep our list of peers up-to-date. + taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("NiFi Site-to-Site Connection Pool Maintenance"); + return thread; + } + }); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + refreshPeers(); + } + }, 0, 5, TimeUnit.SECONDS); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + cleanupExpiredSockets(); + } + }, 5, 5, TimeUnit.SECONDS); + } + void warn(final String msg, final Object... args) { - logger.warn(msg, args); - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); - } + logger.warn(msg, args); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); + } } - + void warn(final String msg, final Throwable t) { - logger.warn(msg, t); - - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString()); - } + logger.warn(msg, t); + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString()); + } } - + void error(final String msg, final Object... args) { - logger.error(msg, args); - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); - } + logger.error(msg, args); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); + } } - + private String getPortIdentifier(final TransferDirection transferDirection) throws IOException { - if ( remoteDestination.getIdentifier() != null ) { + if (remoteDestination.getIdentifier() != null) { return remoteDestination.getIdentifier(); } - - if ( transferDirection == TransferDirection.RECEIVE ) { + + if (transferDirection == TransferDirection.RECEIVE) { return getOutputPortIdentifier(remoteDestination.getName()); } else { return getInputPortIdentifier(remoteDestination.getName()); } } - + public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { return getEndpointConnection(direction, null); } - - - public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { - // + + public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) + throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + // // Attempt to get a connection state that already exists for this URL. // FlowFileCodec codec = null; @@ -255,42 +253,42 @@ public class EndpointConnectionPool { logger.debug("{} getting next peer status", this); final PeerStatus peerStatus = getNextPeerStatus(direction); logger.debug("{} next peer status = {}", this, peerStatus); - if ( peerStatus == null ) { + if (peerStatus == null) { return null; } final PeerDescription peerDescription = peerStatus.getPeerDescription(); BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerStatus); - if ( connectionQueue == null ) { + if (connectionQueue == null) { connectionQueue = new LinkedBlockingQueue<>(); BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue); - if ( existing != null ) { + if (existing != null) { connectionQueue = existing; } } - + final List<EndpointConnection> addBack = new ArrayList<>(); try { do { connection = connectionQueue.poll(); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); final String portId = getPortIdentifier(direction); - - if ( connection == null && !addBack.isEmpty() ) { + + if (connection == null && !addBack.isEmpty()) { // all available connections have been penalized. logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId); return null; } - - if ( connection != null && connection.getPeer().isPenalized(portId) ) { + + if (connection != null && connection.getPeer().isPenalized(portId)) { // we have a connection, but it's penalized. We want to add it back to the queue // when we've found one to use. addBack.add(connection); continue; } - + // if we can't get an existing Connection, create one - if ( connection == null ) { + if (connection == null) { logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId); protocol = new SocketClientProtocol(); protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId)); @@ -304,7 +302,7 @@ public class EndpointConnectionPool { penalize(peerStatus.getPeerDescription(), penalizationMillis); throw ioe; } - + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); try { @@ -314,72 +312,72 @@ public class EndpointConnectionPool { try { commsSession.close(); } catch (final IOException ioe) { - throw e; + throw e; } } - + final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort(); peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString()); - + // set properties based on config - if ( config != null ) { + if (config != null) { protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS)); protocol.setPreferredBatchCount(config.getPreferredBatchCount()); protocol.setPreferredBatchSize(config.getPreferredBatchSize()); protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS)); } - + // perform handshake try { logger.debug("{} performing handshake", this); protocol.handshake(peer); - + // handle error cases - if ( protocol.isDestinationFull() ) { - logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", - this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()); - + if (protocol.isDestinationFull()) { + logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", + this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()); + penalize(peer, penalizationMillis); try { - peer.close(); + peer.close(); } catch (final IOException ioe) { } - + continue; - } else if ( protocol.isPortInvalid() ) { - penalize(peer, penalizationMillis); - cleanup(protocol, peer); - throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running"); - } else if ( protocol.isPortUnknown() ) { - penalize(peer, penalizationMillis); - cleanup(protocol, peer); - throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known"); + } else if (protocol.isPortInvalid()) { + penalize(peer, penalizationMillis); + cleanup(protocol, peer); + throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running"); + } else if (protocol.isPortUnknown()) { + penalize(peer, penalizationMillis); + cleanup(protocol, peer); + throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known"); } - + // negotiate the FlowFileCodec to use logger.debug("{} negotiating codec", this); codec = protocol.negotiateCodec(peer); logger.debug("{} negotiated codec is {}", this, codec); } catch (final PortNotRunningException | UnknownPortException e) { - throw e; + throw e; } catch (final Exception e) { penalize(peer, penalizationMillis); cleanup(protocol, peer); - + final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); error(message); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", e); } throw e; } - + connection = new EndpointConnection(peer, protocol, codec); } else { final long lastTimeUsed = connection.getLastTimeUsed(); final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; - - if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) { + + if (commsTimeout > 0L && millisSinceLastUse >= commsTimeout) { cleanup(connection.getSocketClientProtocol(), connection.getPeer()); connection = null; } else { @@ -389,68 +387,70 @@ public class EndpointConnectionPool { protocol = connection.getSocketClientProtocol(); } } - } while ( connection == null || codec == null || commsSession == null || protocol == null ); + } while (connection == null || codec == null || commsSession == null || protocol == null); } catch (final Throwable t) { - if ( commsSession != null ) { - try { - commsSession.close(); - } catch (final IOException ioe) { - } - } - - throw t; + if (commsSession != null) { + try { + commsSession.close(); + } catch (final IOException ioe) { + } + } + + throw t; } finally { - if ( !addBack.isEmpty() ) { + if (!addBack.isEmpty()) { connectionQueue.addAll(addBack); } } - + activeConnections.add(connection); return connection; } - - + public boolean offer(final EndpointConnection endpointConnection) { - final Peer peer = endpointConnection.getPeer(); - if ( peer == null ) { - return false; - } - - final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription()); - if ( connectionQueue == null ) { - return false; - } - - activeConnections.remove(endpointConnection); - if ( shutdown ) { - terminate(endpointConnection); - return false; - } else { - endpointConnection.setLastTimeUsed(); - return connectionQueue.offer(endpointConnection); - } - } - + final Peer peer = endpointConnection.getPeer(); + if (peer == null) { + return false; + } + + final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription()); + if (connectionQueue == null) { + return false; + } + + activeConnections.remove(endpointConnection); + if (shutdown) { + terminate(endpointConnection); + return false; + } else { + endpointConnection.setLastTimeUsed(); + return connectionQueue.offer(endpointConnection); + } + } + private void penalize(final PeerDescription peerDescription, final long penalizationMillis) { Long expiration = peerTimeoutExpirations.get(peerDescription); - if ( expiration == null ) { + if (expiration == null) { expiration = Long.valueOf(0L); } - + final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration)); } - + /** - * Updates internal state map to penalize a PeerStatus that points to the specified peer - * @param peer + * Updates internal state map to penalize a PeerStatus that points to the + * specified peer + * + * @param peer the peer + * @param penalizationMillis period of time to penalize a given peer */ public void penalize(final Peer peer, final long penalizationMillis) { penalize(peer.getDescription(), penalizationMillis); } - + private void cleanup(final SocketClientProtocol protocol, final Peer peer) { - if ( protocol != null && peer != null ) { + if (protocol != null && peer != null) { try { protocol.shutdown(peer); } catch (final TransmissionDisabledException e) { @@ -459,8 +459,8 @@ public class EndpointConnectionPool { } catch (IOException e1) { } } - - if ( peer != null ) { + + if (peer != null) { try { peer.close(); } catch (final TransmissionDisabledException e) { @@ -470,15 +470,14 @@ public class EndpointConnectionPool { } } } - + private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) { return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); } - - + private PeerStatus getNextPeerStatus(final TransferDirection direction) { List<PeerStatus> peerList = peerStatuses; - if ( isPeerRefreshNeeded(peerList) ) { + if (isPeerRefreshNeeded(peerList)) { peerRefreshLock.lock(); try { // now that we have the lock, check again that we need to refresh (because another thread @@ -490,15 +489,15 @@ public class EndpointConnectionPool { } catch (final Exception e) { final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); warn(message); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", e); } - - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); } } - + this.peerStatuses = peerList; peerRefreshTime = System.currentTimeMillis(); } @@ -507,46 +506,46 @@ public class EndpointConnectionPool { } } - if ( peerList == null || peerList.isEmpty() ) { + if (peerList == null || peerList.isEmpty()) { return null; } PeerStatus peerStatus; - for (int i=0; i < peerList.size(); i++) { + for (int i = 0; i < peerList.size(); i++) { final long idx = peerIndex.getAndIncrement(); final int listIndex = (int) (idx % peerList.size()); peerStatus = peerList.get(listIndex); - - if ( isPenalized(peerStatus) ) { + + if (isPenalized(peerStatus)) { logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); } else { return peerStatus; } } - + logger.debug("{} All peers appear to be penalized; returning null", this); return null; } - + private boolean isPenalized(final PeerStatus peerStatus) { final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); - return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); + return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis()); } - + private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { Set<PeerStatus> statuses = getPeerStatuses(); - if ( statuses == null ) { + if (statuses == null) { refreshPeers(); statuses = getPeerStatuses(); - if ( statuses == null ) { + if (statuses == null) { logger.debug("{} found no peers to connect to", this); return Collections.emptyList(); } } - + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); final List<NodeInformation> nodeInfos = new ArrayList<>(); - for ( final PeerStatus peerStatus : statuses ) { + for (final PeerStatus peerStatus : statuses) { final PeerDescription description = peerStatus.getPeerDescription(); final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount()); nodeInfos.add(nodeInfo); @@ -554,8 +553,7 @@ public class EndpointConnectionPool { clusterNodeInfo.setNodeInformation(nodeInfos); return formulateDestinationList(clusterNodeInfo, direction); } - - + private Set<PeerStatus> getPeerStatuses() { final PeerStatusCache cache = this.peerStatusCache; if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { @@ -576,14 +574,14 @@ public class EndpointConnectionPool { } private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { - final String hostname = clusterUrl.getHost(); + final String hostname = clusterUrl.getHost(); final Integer port = getSiteToSitePort(); - if ( port == null ) { + if (port == null) { throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications"); } - + final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://")); - final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); + final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); final SocketClientProtocol clientProtocol = new SocketClientProtocol(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); @@ -593,11 +591,11 @@ public class EndpointConnectionPool { clientProtocol.setTimeout(commsTimeout); if (clientProtocol.getVersionNegotiator().getVersion() < 5) { String portId = getPortIdentifier(TransferDirection.RECEIVE); - if ( portId == null ) { + if (portId == null) { portId = getPortIdentifier(TransferDirection.SEND); } - - if ( portId == null ) { + + if (portId == null) { peer.close(); throw new IOException("Failed to determine the identifier of port " + remoteDestination.getName()); } @@ -605,7 +603,7 @@ public class EndpointConnectionPool { } else { clientProtocol.handshake(peer, null); } - + final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); persistPeerStatuses(peerStatuses); @@ -632,14 +630,13 @@ public class EndpointConnectionPool { return peerStatuses; } - private void persistPeerStatuses(final Set<PeerStatus> statuses) { - if ( peersFile == null ) { - return; - } - + if (peersFile == null) { + return; + } + try (final OutputStream fos = new FileOutputStream(peersFile); - final OutputStream out = new BufferedOutputStream(fos)) { + final OutputStream out = new BufferedOutputStream(fos)) { for (final PeerStatus status : statuses) { final PeerDescription description = status.getPeerDescription(); @@ -679,53 +676,52 @@ public class EndpointConnectionPool { return statuses; } - - + private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { final PeerDescription description = peerStatus.getPeerDescription(); - return establishSiteToSiteConnection(description.getHostname(), description.getPort()); + return establishSiteToSiteConnection(description.getHostname(), description.getPort()); } - + private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { - final boolean siteToSiteSecure = isSecure(); + final boolean siteToSiteSecure = isSecure(); final String destinationUri = "nifi://" + hostname + ":" + port; CommunicationsSession commsSession = null; try { - if ( siteToSiteSecure ) { - if ( sslContext == null ) { - throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); - } - - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - socketChannel.connect(); - - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); - - try { - commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { - throw new IOException(ex); - } - } else { - final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); - } - - commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - commsSession.setUri(destinationUri); + if (siteToSiteSecure) { + if (sslContext == null) { + throw new IOException("Unable to communicate with " + hostname + ":" + port + + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); + } + + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + socketChannel.connect(); + + commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); + + try { + commsSession.setUserDn(socketChannel.getDn()); + } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { + throw new IOException(ex); + } + } else { + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); + } + + commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); + commsSession.setUri(destinationUri); } catch (final IOException ioe) { - if ( commsSession != null ) { + if (commsSession != null) { commsSession.close(); } - + throw ioe; } - + return commsSession; } - - + static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); final int numDestinations = Math.max(128, nodeInfoSet.size()); @@ -743,26 +739,26 @@ public class EndpointConnectionPool { final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); - + entryCountMap.put(nodeInfo, Math.max(1, entries)); totalEntries += entries; } - + final List<PeerStatus> destinations = new ArrayList<>(totalEntries); - for (int i=0; i < totalEntries; i++) { + for (int i = 0; i < totalEntries; i++) { destinations.add(null); } - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { final NodeInformation nodeInfo = entry.getKey(); final int numEntries = entry.getValue(); - + int skipIndex = numEntries; - for (int i=0; i < numEntries; i++) { + for (int i = 0; i < numEntries; i++) { int n = (skipIndex * i); while (true) { final int index = n % destinations.size(); PeerStatus status = destinations.get(index); - if ( status == null ) { + if (status == null) { final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); status = new PeerStatus(description, nodeInfo.getTotalFlowFiles()); destinations.set(index, status); @@ -776,7 +772,7 @@ public class EndpointConnectionPool { final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { final double percentage = entry.getValue() * 100D / (double) destinations.size(); distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); } @@ -785,55 +781,54 @@ public class EndpointConnectionPool { // Jumble the list of destinations. return destinations; } - - + private void cleanupExpiredSockets() { - for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) { + for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) { final List<EndpointConnection> connections = new ArrayList<>(); - + EndpointConnection connection; while ((connection = connectionQueue.poll()) != null) { // If the socket has not been used in 10 seconds, shut it down. final long lastUsed = connection.getLastTimeUsed(); - if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) { + if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) { try { connection.getSocketClientProtocol().shutdown(connection.getPeer()); } catch (final Exception e) { - logger.debug("Failed to shut down {} using {} due to {}", - new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} ); + logger.debug("Failed to shut down {} using {} due to {}", + new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e}); } - + terminate(connection); } else { connections.add(connection); } } - + connectionQueue.addAll(connections); } } - + public void shutdown() { shutdown = true; - taskExecutor.shutdown(); - peerTimeoutExpirations.clear(); - - for ( final EndpointConnection conn : activeConnections ) { - conn.getPeer().getCommunicationsSession().interrupt(); + taskExecutor.shutdown(); + peerTimeoutExpirations.clear(); + + for (final EndpointConnection conn : activeConnections) { + conn.getPeer().getCommunicationsSession().interrupt(); } - for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values() ) { + for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) { EndpointConnection state; - while ( (state = connectionQueue.poll()) != null) { + while ((state = connectionQueue.poll()) != null) { cleanup(state.getSocketClientProtocol(), state.getPeer()); } } } - + public void terminate(final EndpointConnection connection) { cleanup(connection.getSocketClientProtocol(), connection.getPeer()); } - + private void refreshPeers() { final PeerStatusCache existingCache = peerStatusCache; if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { @@ -851,69 +846,66 @@ public class EndpointConnectionPool { } } } - - + public String getInputPortIdentifier(final String portName) throws IOException { return getPortIdentifier(portName, inputPortMap); } - + public String getOutputPortIdentifier(final String portName) throws IOException { - return getPortIdentifier(portName, outputPortMap); + return getPortIdentifier(portName, outputPortMap); } - - + private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { - String identifier; - remoteInfoReadLock.lock(); + String identifier; + remoteInfoReadLock.lock(); try { - identifier = portMap.get(portName); + identifier = portMap.get(portName); } finally { - remoteInfoReadLock.unlock(); + remoteInfoReadLock.unlock(); } - - if ( identifier != null ) { - return identifier; + + if (identifier != null) { + return identifier; } - + refreshRemoteInfo(); - remoteInfoReadLock.lock(); + remoteInfoReadLock.lock(); try { - return portMap.get(portName); + return portMap.get(portName); } finally { - remoteInfoReadLock.unlock(); + remoteInfoReadLock.unlock(); } } - - + private ControllerDTO refreshRemoteInfo() throws IOException { - final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); + final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null); - final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout); - + final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout); + remoteInfoWriteLock.lock(); try { this.siteToSitePort = controller.getRemoteSiteListeningPort(); this.siteToSiteSecure = controller.isSiteToSiteSecure(); - + inputPortMap.clear(); for (final PortDTO inputPort : controller.getInputPorts()) { - inputPortMap.put(inputPort.getName(), inputPort.getId()); + inputPortMap.put(inputPort.getName(), inputPort.getId()); } - + outputPortMap.clear(); - for ( final PortDTO outputPort : controller.getOutputPorts()) { - outputPortMap.put(outputPort.getName(), outputPort.getId()); + for (final PortDTO outputPort : controller.getOutputPorts()) { + outputPortMap.put(outputPort.getName(), outputPort.getId()); } - + this.remoteRefreshTime = System.currentTimeMillis(); } finally { - remoteInfoWriteLock.unlock(); + remoteInfoWriteLock.unlock(); } - + return controller; } - + /** * @return the port that the remote instance is listening on for * site-to-site communication, or <code>null</code> if the remote instance @@ -930,7 +922,7 @@ public class EndpointConnectionPool { return listeningPort; } } finally { - remoteInfoReadLock.unlock(); + remoteInfoReadLock.unlock(); } final ControllerDTO controller = refreshRemoteInfo(); @@ -938,19 +930,16 @@ public class EndpointConnectionPool { return listeningPort; } - + @Override public String toString() { return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]"; } - - + /** - * Returns {@code true} if the remote instance is configured for secure site-to-site communications, - * {@code false} otherwise. - * - * @return - * @throws IOException + * @return {@code true} if the remote instance is configured for secure + * site-to-site communications, {@code false} otherwise + * @throws IOException if unable to check if secure */ public boolean isSecure() throws IOException { remoteInfoReadLock.lock(); @@ -960,23 +949,23 @@ public class EndpointConnectionPool { return secure; } } finally { - remoteInfoReadLock.unlock(); + remoteInfoReadLock.unlock(); } final ControllerDTO controller = refreshRemoteInfo(); final Boolean isSecure = controller.isSiteToSiteSecure(); - if ( isSecure == null ) { + if (isSecure == null) { throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); } - + return isSecure; } - - + private class IdEnrichedRemoteDestination implements RemoteDestination { + private final RemoteDestination original; private final String identifier; - + public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) { this.original = original; this.identifier = identifier;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 4aab3f7..33e4a66 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -33,71 +33,71 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketClient implements SiteToSiteClient { + private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); - + private final SiteToSiteClientConfig config; - private final EndpointConnectionPool pool; - private final boolean compress; - private final String portName; - private final long penalizationNanos; - private volatile String portIdentifier; - private volatile boolean closed = false; - - public SocketClient(final SiteToSiteClientConfig config) { - pool = new EndpointConnectionPool(config.getUrl(), - createRemoteDestination(config.getPortIdentifier(), config.getPortName()), - (int) config.getTimeout(TimeUnit.MILLISECONDS), - (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), - config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); - - this.config = config; - this.compress = config.isUseCompression(); - this.portIdentifier = config.getPortIdentifier(); - this.portName = config.getPortName(); - this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); - } - - @Override - public SiteToSiteClientConfig getConfig() { - return config; - } - - @Override - public boolean isSecure() throws IOException { - return pool.isSecure(); - } - - private String getPortIdentifier(final TransferDirection direction) throws IOException { - final String id = this.portIdentifier; - if ( id != null ) { - return id; - } - - final String portId; - if ( direction == TransferDirection.SEND ) { - portId = pool.getInputPortIdentifier(this.portName); - } else { - portId = pool.getOutputPortIdentifier(this.portName); - } - - if (portId == null) { - logger.debug("Unable to resolve port [{}] to an identifier", portName); - } else { - logger.debug("Resolved port [{}] to identifier [{}]", portName, portId); - this.portIdentifier = portId; - } - - return portId; - } - - - private RemoteDestination createRemoteDestination(final String portId, final String portName) { - return new RemoteDestination() { + private final EndpointConnectionPool pool; + private final boolean compress; + private final String portName; + private final long penalizationNanos; + private volatile String portIdentifier; + private volatile boolean closed = false; + + public SocketClient(final SiteToSiteClientConfig config) { + pool = new EndpointConnectionPool(config.getUrl(), + createRemoteDestination(config.getPortIdentifier(), config.getPortName()), + (int) config.getTimeout(TimeUnit.MILLISECONDS), + (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), + config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); + + this.config = config; + this.compress = config.isUseCompression(); + this.portIdentifier = config.getPortIdentifier(); + this.portName = config.getPortName(); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); + } + + @Override + public SiteToSiteClientConfig getConfig() { + return config; + } + + @Override + public boolean isSecure() throws IOException { + return pool.isSecure(); + } + + private String getPortIdentifier(final TransferDirection direction) throws IOException { + final String id = this.portIdentifier; + if (id != null) { + return id; + } + + final String portId; + if (direction == TransferDirection.SEND) { + portId = pool.getInputPortIdentifier(this.portName); + } else { + portId = pool.getOutputPortIdentifier(this.portName); + } + + if (portId == null) { + logger.debug("Unable to resolve port [{}] to an identifier", portName); + } else { + logger.debug("Resolved port [{}] to identifier [{}]", portName, portId); + this.portIdentifier = portId; + } + + return portId; + } + + private RemoteDestination createRemoteDestination(final String portId, final String portName) { + return new RemoteDestination() { @Override public String getIdentifier() { return portId; } - + @Override public String getName() { return portName; @@ -113,113 +113,112 @@ public class SocketClient implements SiteToSiteClient { return compress; } }; - } - - @Override - public Transaction createTransaction(final TransferDirection direction) throws IOException { - if ( closed ) { - throw new IllegalStateException("Client is closed"); - } - final String portId = getPortIdentifier(direction); - - if ( portId == null ) { - throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance"); - } - - final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); - if ( connectionState == null ) { - return null; - } - - final Transaction transaction; - try { - transaction = connectionState.getSocketClientProtocol().startTransaction( - connectionState.getPeer(), connectionState.getCodec(), direction); - } catch (final Throwable t) { - pool.terminate(connectionState); - throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); - } - - // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever - // the transaction is either completed or canceled. - final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState); - return new Transaction() { - @Override - public void confirm() throws IOException { - transaction.confirm(); - } - - @Override - public TransactionCompletion complete() throws IOException { - try { - return transaction.complete(); - } finally { - final EndpointConnection state = connectionStateRef.get(); - if ( state != null ) { - pool.offer(connectionState); - connectionStateRef.set(null); - } - } - } - - @Override - public void cancel(final String explanation) throws IOException { - try { - transaction.cancel(explanation); - } finally { + } + + @Override + public Transaction createTransaction(final TransferDirection direction) throws IOException { + if (closed) { + throw new IllegalStateException("Client is closed"); + } + final String portId = getPortIdentifier(direction); + + if (portId == null) { + throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance"); + } + + final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); + if (connectionState == null) { + return null; + } + + final Transaction transaction; + try { + transaction = connectionState.getSocketClientProtocol().startTransaction( + connectionState.getPeer(), connectionState.getCodec(), direction); + } catch (final Throwable t) { + pool.terminate(connectionState); + throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); + } + + // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever + // the transaction is either completed or canceled. + final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState); + return new Transaction() { + @Override + public void confirm() throws IOException { + transaction.confirm(); + } + + @Override + public TransactionCompletion complete() throws IOException { + try { + return transaction.complete(); + } finally { final EndpointConnection state = connectionStateRef.get(); - if ( state != null ) { + if (state != null) { + pool.offer(connectionState); + connectionStateRef.set(null); + } + } + } + + @Override + public void cancel(final String explanation) throws IOException { + try { + transaction.cancel(explanation); + } finally { + final EndpointConnection state = connectionStateRef.get(); + if (state != null) { pool.terminate(connectionState); connectionStateRef.set(null); } - } - } - - @Override - public void error() { - try { - transaction.error(); - } finally { + } + } + + @Override + public void error() { + try { + transaction.error(); + } finally { final EndpointConnection state = connectionStateRef.get(); - if ( state != null ) { + if (state != null) { pool.terminate(connectionState); connectionStateRef.set(null); } - } - } - - @Override - public void send(final DataPacket dataPacket) throws IOException { - transaction.send(dataPacket); - } - - @Override - public void send(final byte[] content, final Map<String, String> attributes) throws IOException { - transaction.send(content, attributes); - } - - @Override - public DataPacket receive() throws IOException { - return transaction.receive(); - } - - @Override - public TransactionState getState() throws IOException { - return transaction.getState(); - } - - @Override - public Communicant getCommunicant() { - return transaction.getCommunicant(); - } - }; - } - - - @Override - public void close() throws IOException { - closed = true; - pool.shutdown(); - } - + } + } + + @Override + public void send(final DataPacket dataPacket) throws IOException { + transaction.send(dataPacket); + } + + @Override + public void send(final byte[] content, final Map<String, String> attributes) throws IOException { + transaction.send(content, attributes); + } + + @Override + public DataPacket receive() throws IOException { + return transaction.receive(); + } + + @Override + public TransactionState getState() throws IOException { + return transaction.getState(); + } + + @Override + public Communicant getCommunicant() { + return transaction.getCommunicant(); + } + }; + } + + @Override + public void close() throws IOException { + closed = true; + pool.shutdown(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java index 1380e1b..e79fc47 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java @@ -38,13 +38,13 @@ public interface FlowFileCodec extends VersionedRemoteResource { * Returns a List of all versions that this codec is able to support, in the * order that they are preferred by the codec * - * @return + * @return all supported versions */ public List<Integer> getSupportedVersions(); /** - * Encodes a DataPacket and its content as a single stream of data and writes - * that stream to the output. + * Encodes a DataPacket and its content as a single stream of data and + * writes that stream to the output. * * @param dataPacket the data to serialize * @param outStream the stream to write the data to @@ -58,12 +58,13 @@ public interface FlowFileCodec extends VersionedRemoteResource { * Decodes the contents of the InputStream, interpreting the data to * determine the next DataPacket's attributes and content. * - * @param stream an InputStream containing DataPacket's content and attributes + * @param stream an InputStream containing DataPacket's content and + * attributes * - * @return the DataPacket that was created, or <code>null</code> if the stream - * was out of data + * @return the DataPacket that was created, or <code>null</code> if the + * stream was out of data * - * @throws IOException + * @throws IOException if unable to read stream * @throws ProtocolException if the input is malformed * @throws TransmissionDisabledException if a user terminates the connection */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java index 6fd92de..0bee537 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java @@ -34,7 +34,8 @@ import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.stream.io.StreamUtils; public class StandardFlowFileCodec implements FlowFileCodec { - public static final int MAX_NUM_ATTRIBUTES = 25000; + + public static final int MAX_NUM_ATTRIBUTES = 25000; public static final String DEFAULT_FLOWFILE_PATH = "./"; @@ -43,30 +44,29 @@ public class StandardFlowFileCodec implements FlowFileCodec { public StandardFlowFileCodec() { versionNegotiator = new StandardVersionNegotiator(1); } - + @Override public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException { final DataOutputStream out = new DataOutputStream(encodedOut); - + final Map<String, String> attributes = dataPacket.getAttributes(); out.writeInt(attributes.size()); - for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { + for (final Map.Entry<String, String> entry : attributes.entrySet()) { writeString(entry.getKey(), out); writeString(entry.getValue(), out); } - + out.writeLong(dataPacket.getSize()); - + final InputStream in = dataPacket.getData(); StreamUtils.copy(in, encodedOut); encodedOut.flush(); } - @Override public DataPacket decode(final InputStream stream) throws IOException, ProtocolException { final DataInputStream in = new DataInputStream(stream); - + final int numAttributes; try { numAttributes = in.readInt(); @@ -74,22 +74,22 @@ public class StandardFlowFileCodec implements FlowFileCodec { // we're out of data. return null; } - + // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will // generally result in an OutOfMemoryError. - if ( numAttributes > MAX_NUM_ATTRIBUTES ) { - throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); + if (numAttributes > MAX_NUM_ATTRIBUTES) { + throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); } - + final Map<String, String> attributes = new HashMap<>(numAttributes); - for (int i=0; i < numAttributes; i++) { + for (int i = 0; i < numAttributes; i++) { final String attrName = readString(in); final String attrValue = readString(in); attributes.put(attrName, attrValue); } - + final long numBytes = in.readLong(); - + return new StandardDataPacket(attributes, stream, numBytes); } @@ -99,14 +99,13 @@ public class StandardFlowFileCodec implements FlowFileCodec { out.write(bytes); } - private String readString(final DataInputStream in) throws IOException { final int numBytes = in.readInt(); final byte[] bytes = new byte[numBytes]; StreamUtils.fillBuffer(in, bytes, true); return new String(bytes, "UTF-8"); } - + @Override public List<Integer> getSupportedVersions() { return versionNegotiator.getSupportedVersions(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java index d4d55e1..198aaef 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -18,13 +18,14 @@ package org.apache.nifi.remote.exception; import java.io.IOException; - /** - * A HandshakeException occurs when the client and the remote NiFi instance do not agree - * on some condition during the handshake. For example, if the NiFi instance does not recognize - * one of the parameters that the client passes during the Handshaking phase. + * A HandshakeException occurs when the client and the remote NiFi instance do + * not agree on some condition during the handshake. For example, if the NiFi + * instance does not recognize one of the parameters that the client passes + * during the Handshaking phase. */ public class HandshakeException extends IOException { + private static final long serialVersionUID = 178192341908726L; public HandshakeException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java index 8b97832..09fc05c 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java @@ -17,11 +17,12 @@ package org.apache.nifi.remote.exception; /** - * PortNotRunningException occurs when the remote NiFi instance reports - * that the Port that the client is attempting to communicate with is not - * currently running and therefore communications with that Port are not allowed. + * PortNotRunningException occurs when the remote NiFi instance reports that the + * Port that the client is attempting to communicate with is not currently + * running and therefore communications with that Port are not allowed. */ public class PortNotRunningException extends ProtocolException { + private static final long serialVersionUID = -2790940982005516375L; public PortNotRunningException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java index 45a4e15..cc6ae50 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -19,8 +19,8 @@ package org.apache.nifi.remote.exception; import java.io.IOException; /** - * A ProtocolException occurs when unexpected data is received, for example - * an invalid Response Code. + * A ProtocolException occurs when unexpected data is received, for example an + * invalid Response Code. */ public class ProtocolException extends IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java index 592a1b3..4249075 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java @@ -17,10 +17,12 @@ package org.apache.nifi.remote.exception; /** - * An UnknownPortException indicates that the remote NiFi instance has reported that - * the endpoint that the client attempted to communicate with does not exist. + * An UnknownPortException indicates that the remote NiFi instance has reported + * that the endpoint that the client attempted to communicate with does not + * exist. */ public class UnknownPortException extends ProtocolException { + private static final long serialVersionUID = -2790940982005516375L; public UnknownPortException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java index 8065f57..6180c3c 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java @@ -22,11 +22,12 @@ import java.nio.channels.SocketChannel; import org.apache.nifi.remote.AbstractCommunicationsSession; public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SocketChannel channel; private final SocketChannelInput request; private final SocketChannelOutput response; private int timeout = 30000; - + public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { super(uri); request = new SocketChannelInput(socketChannel); @@ -34,12 +35,12 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe channel = socketChannel; socketChannel.configureBlocking(false); } - + @Override public boolean isClosed() { return !channel.isConnected(); } - + @Override public SocketChannelInput getInput() { return request; @@ -65,28 +66,28 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe @Override public void close() throws IOException { IOException suppressed = null; - + try { request.consume(); } catch (final IOException ioe) { suppressed = ioe; } - + try { channel.close(); } catch (final IOException ioe) { - if ( suppressed != null ) { + if (suppressed != null) { ioe.addSuppressed(suppressed); } - + throw ioe; } - - if ( suppressed != null ) { + + if (suppressed != null) { throw suppressed; } } - + @Override public boolean isDataAvailable() { return request.isDataAvailable(); @@ -101,7 +102,7 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe public long getBytesRead() { return request.getBytesRead(); } - + @Override public void interrupt() { request.interrupt(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java index 7dffddd..68a8dc4 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java @@ -26,18 +26,19 @@ import org.apache.nifi.remote.io.InterruptableInputStream; import org.apache.nifi.remote.protocol.CommunicationsInput; public class SocketChannelInput implements CommunicationsInput { + private final SocketChannelInputStream socketIn; private final ByteCountingInputStream countingIn; private final InputStream bufferedIn; private final InterruptableInputStream interruptableIn; - + public SocketChannelInput(final SocketChannel socketChannel) throws IOException { this.socketIn = new SocketChannelInputStream(socketChannel); countingIn = new ByteCountingInputStream(socketIn); bufferedIn = new BufferedInputStream(countingIn); interruptableIn = new InterruptableInputStream(bufferedIn); } - + @Override public InputStream getInputStream() throws IOException { return interruptableIn; @@ -46,7 +47,7 @@ public class SocketChannelInput implements CommunicationsInput { public void setTimeout(final int millis) { socketIn.setTimeout(millis); } - + public boolean isDataAvailable() { try { return interruptableIn.available() > 0; @@ -54,12 +55,12 @@ public class SocketChannelInput implements CommunicationsInput { return false; } } - + @Override public long getBytesRead() { return countingIn.getBytesRead(); } - + public void interrupt() { interruptableIn.interrupt(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java index 26c0164..13974a5 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java @@ -26,32 +26,33 @@ import org.apache.nifi.remote.io.InterruptableOutputStream; import org.apache.nifi.remote.protocol.CommunicationsOutput; public class SocketChannelOutput implements CommunicationsOutput { + private final SocketChannelOutputStream socketOutStream; private final ByteCountingOutputStream countingOut; private final OutputStream bufferedOut; private final InterruptableOutputStream interruptableOut; - + public SocketChannelOutput(final SocketChannel socketChannel) throws IOException { socketOutStream = new SocketChannelOutputStream(socketChannel); countingOut = new ByteCountingOutputStream(socketOutStream); bufferedOut = new BufferedOutputStream(countingOut); interruptableOut = new InterruptableOutputStream(bufferedOut); } - + @Override public OutputStream getOutputStream() throws IOException { return interruptableOut; } - + public void setTimeout(final int timeout) { socketOutStream.setTimeout(timeout); } - + @Override public long getBytesWritten() { return countingOut.getBytesWritten(); } - + public void interrupt() { interruptableOut.interrupt(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java index 50e9162..5e5abc7 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java @@ -21,17 +21,18 @@ import java.io.IOException; import org.apache.nifi.remote.AbstractCommunicationsSession; public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SSLSocketChannel channel; private final SSLSocketChannelInput request; private final SSLSocketChannelOutput response; - + public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) { super(uri); request = new SSLSocketChannelInput(channel); response = new SSLSocketChannelOutput(channel); this.channel = channel; } - + @Override public SSLSocketChannelInput getInput() { return request; @@ -55,33 +56,33 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication @Override public void close() throws IOException { IOException suppressed = null; - + try { request.consume(); } catch (final IOException ioe) { suppressed = ioe; } - + try { channel.close(); } catch (final IOException ioe) { - if ( suppressed != null ) { + if (suppressed != null) { ioe.addSuppressed(suppressed); } - + throw ioe; } - - if ( suppressed != null ) { + + if (suppressed != null) { throw suppressed; } } - + @Override public boolean isClosed() { return channel.isClosed(); } - + @Override public boolean isDataAvailable() { try { @@ -105,7 +106,7 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication public void interrupt() { channel.interrupt(); } - + @Override public String toString() { return super.toString() + "[SSLSocketChannel=" + channel + "]"; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java index 01fb9f2..6cd2344 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java @@ -24,25 +24,26 @@ import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.remote.protocol.CommunicationsInput; public class SSLSocketChannelInput implements CommunicationsInput { + private final SSLSocketChannelInputStream in; private final ByteCountingInputStream countingIn; private final InputStream bufferedIn; - + public SSLSocketChannelInput(final SSLSocketChannel socketChannel) { in = new SSLSocketChannelInputStream(socketChannel); countingIn = new ByteCountingInputStream(in); this.bufferedIn = new BufferedInputStream(countingIn); } - + @Override public InputStream getInputStream() throws IOException { return bufferedIn; } - + public boolean isDataAvailable() throws IOException { return bufferedIn.available() > 0; } - + @Override public long getBytesRead() { return countingIn.getBytesRead(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java index dc3d68f..33d13cb 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java @@ -24,9 +24,10 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.remote.protocol.CommunicationsOutput; public class SSLSocketChannelOutput implements CommunicationsOutput { + private final OutputStream out; private final ByteCountingOutputStream countingOut; - + public SSLSocketChannelOutput(final SSLSocketChannel channel) { countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel)); out = new BufferedOutputStream(countingOut); @@ -36,7 +37,7 @@ public class SSLSocketChannelOutput implements CommunicationsOutput { public OutputStream getOutputStream() throws IOException { return out; } - + @Override public long getBytesWritten() { return countingOut.getBytesWritten(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index 36a0e8d..2efea11 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -48,37 +48,27 @@ public interface ClientProtocol extends VersionedRemoteResource { boolean isReadyForFileTransfer(); - - - Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException; - - + /** - * returns <code>true</code> if remote instance indicates that the port is + * @return <code>true</code> if remote instance indicates that the port is * invalid - * - * @return * @throws IllegalStateException if a handshake has not successfully * completed */ boolean isPortInvalid() throws IllegalStateException; /** - * returns <code>true</code> if remote instance indicates that the port is + * @return <code>true</code> if remote instance indicates that the port is * unknown - * - * @return * @throws IllegalStateException if a handshake has not successfully * completed */ boolean isPortUnknown(); /** - * returns <code>true</code> if remote instance indicates that the port's + * @return <code>true</code> if remote instance indicates that the port's * destination is full - * - * @return * @throws IllegalStateException if a handshake has not successfully * completed */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java index 5e56902..3fa3e96 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java @@ -23,10 +23,11 @@ public interface CommunicationsInput { /** * Reads all data currently on the socket and throws it away - * @throws IOException + * + * @throws IOException if unable to consume */ void consume() throws IOException; - + InputStream getInputStream() throws IOException; long getBytesRead(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java index d009cec..aff73ba 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java @@ -55,10 +55,8 @@ public interface CommunicationsSession extends Closeable { void interrupt(); /** - * Returns <code>true</code> if the connection is closed, <code>false</code> - * otherwise. - * - * @return + * @return <code>true</code> if the connection is closed, <code>false</code> + * otherwise */ boolean isClosed(); }
