Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client a6293e340 -> c174d3a60
NIFI-282: Refactoring to make client from site-to-site components Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c174d3a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c174d3a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c174d3a6 Branch: refs/heads/site-to-site-client Commit: c174d3a600358ebed8b8064247785606af6c6134 Parents: a6293e3 Author: Mark Payne <[email protected]> Authored: Tue Jan 20 19:07:18 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Jan 20 19:07:18 2015 -0500 ---------------------------------------------------------------------- nifi/commons/site-to-site-client/pom.xml | 13 + .../apache/nifi/remote/client/DataPacket.java | 28 -- .../nifi/remote/client/SiteToSiteClient.java | 5 +- .../apache/nifi/remote/client/Transaction.java | 21 ++ .../socket/EndpointConnectionStatePool.java | 309 +++++++++++++------ .../nifi/remote/client/socket/SocketClient.java | 151 ++++++++- .../nifi/remote/protocol/ClientProtocol.java | 15 + .../apache/nifi/remote/protocol/DataPacket.java | 29 ++ .../protocol/socket/SocketClientProtocol.java | 73 ++++- .../socket/SocketClientTransaction.java | 66 ++++ .../nifi/remote/util/RemoteNiFiUtils.java | 216 +++++++++++++ .../apache/nifi/groups/RemoteProcessGroup.java | 30 -- .../nifi/remote/StandardRemoteProcessGroup.java | 89 +----- .../util/RemoteProcessGroupUtils.java | 216 ------------- .../nifi/remote/RemoteResourceFactory.java | 8 + .../nifi/remote/StandardRemoteGroupPort.java | 28 +- .../socket/SocketFlowFileServerProtocol.java | 9 +- .../apache/nifi/remote/RemoteDestination.java | 10 - 18 files changed, 822 insertions(+), 494 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/pom.xml b/nifi/commons/site-to-site-client/pom.xml index 7719d55..d65f440 100644 --- a/nifi/commons/site-to-site-client/pom.xml +++ b/nifi/commons/site-to-site-client/pom.xml @@ -21,6 +21,19 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>client-dto</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-web-utils</artifactId> + </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java deleted file mode 100644 index ec77f2c..0000000 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.client; - -import java.io.InputStream; -import java.util.Map; - -public interface DataPacket { - - Map<String, String> getAttributes(); - - InputStream getData(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 47a09be..34cb56a 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -16,9 +16,12 @@ */ package org.apache.nifi.remote.client; +import java.io.Closeable; import java.io.IOException; -public interface SiteToSiteClient { +import org.apache.nifi.remote.protocol.DataPacket; + +public interface SiteToSiteClient extends Closeable { void send(DataPacket dataPacket) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java new file mode 100644 index 0000000..bae6e51 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +public interface Transaction { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java index d20fb58..0718bb1 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -41,10 +41,16 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; import javax.net.ssl.SSLContext; @@ -72,21 +78,28 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.remote.util.RemoteNiFiUtils; import org.apache.nifi.reporting.Severity; import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EndpointConnectionStatePool { public static final long PEER_REFRESH_PERIOD = 60000L; public static final String CATEGORY = "Site-to-Site"; + public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class); - private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>(); + private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>(); private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); - + private final URI clusterUrl; + private final String apiUri; + private final AtomicLong peerIndex = new AtomicLong(0L); private final ReentrantLock peerRefreshLock = new ReentrantLock(); @@ -98,15 +111,41 @@ public class EndpointConnectionStatePool { private final File peersFile; private final EventReporter eventReporter; private final SSLContext sslContext; + private final ScheduledExecutorService taskExecutor; + + private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); + private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); + private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); + private Integer siteToSitePort; + private Boolean siteToSiteSecure; + private long remoteRefreshTime; + private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier + private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier + + private volatile int commsTimeout; - public EndpointConnectionStatePool(final EventReporter eventReporter, final File persistenceFile) { - this(null, eventReporter, persistenceFile); + public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) { + this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile); } - public EndpointConnectionStatePool(final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + try { + this.clusterUrl = new URI(clusterUrl); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); + } + + // Trim the trailing / + String uriPath = this.clusterUrl.getPath(); + if (uriPath.endsWith("/")) { + uriPath = uriPath.substring(0, uriPath.length() - 1); + } + apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; + this.sslContext = sslContext; this.peersFile = persistenceFile; this.eventReporter = eventReporter; + this.commsTimeout = commsTimeoutMillis; Set<PeerStatus> recoveredStatuses; if ( persistenceFile != null && persistenceFile.exists() ) { @@ -119,21 +158,39 @@ public class EndpointConnectionStatePool { } else { peerStatusCache = null; } + + // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused + // connections and keep our list of peers up-to-date. + taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("NiFi Site-to-Site Connection Pool Maintenance"); + return thread; + } + }); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + refreshPeers(); + } + }, 0, 5, TimeUnit.SECONDS); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + cleanupExpiredSockets(); + } + }, 5, 5, TimeUnit.SECONDS); } - public EndpointConnectionState getEndpointConnectionState(final String clusterUrl, final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { // // Attempt to get a connection state that already exists for this URL. // - BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(clusterUrl); - if ( connectionStateQueue == null ) { - connectionStateQueue = new LinkedBlockingQueue<>(); - BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue); - if ( existingQueue != null ) { - connectionStateQueue = existingQueue; - } - } - FlowFileCodec codec = null; CommunicationsSession commsSession = null; SocketClientProtocol protocol = null; @@ -172,7 +229,7 @@ public class EndpointConnectionStatePool { final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); - peer = new Peer(commsSession, peerUrl, clusterUrl); + peer = new Peer(commsSession, peerUrl, clusterUrl.toString()); // perform handshake try { @@ -214,9 +271,8 @@ public class EndpointConnectionStatePool { } else { final long lastTimeUsed = connectionState.getLastTimeUsed(); final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; - final long timeoutMillis = remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS); - if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) { + if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) { cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer()); connectionState = null; } else { @@ -243,12 +299,7 @@ public class EndpointConnectionStatePool { return false; } - final BlockingQueue<EndpointConnectionState> queue = endpointConnectionMap.get(url); - if ( queue == null ) { - return false; - } - - return queue.offer(endpointConnectionState); + return connectionStateQueue.offer(endpointConnectionState); } /** @@ -365,7 +416,7 @@ public class EndpointConnectionStatePool { } - public Set<PeerStatus> getPeerStatuses() { + private Set<PeerStatus> getPeerStatuses() { final PeerStatusCache cache = this.peerStatusCache; if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { return null; @@ -384,12 +435,12 @@ public class EndpointConnectionStatePool { return cache.getStatuses(); } - private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, final boolean secure) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException { - final String hostname = destinationUri.getHost(); - final int port = destinationUri.getPort(); + private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException { + final String hostname = clusterUrl.getHost(); + final int port = getSiteToSitePort(); - final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port, secure); - final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, destinationUri.toString()); + final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); + final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); final SocketClientProtocol clientProtocol = new SocketClientProtocol(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); @@ -399,8 +450,8 @@ public class EndpointConnectionStatePool { throw new BadRequestException(e.toString()); } - // TODO: Make the 30000 millis configurable - clientProtocol.handshake(peer, null, 30000); + clientProtocol.setTimeout(commsTimeout); + clientProtocol.handshake(peer, null); final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); persistPeerStatuses(peerStatuses); @@ -474,38 +525,41 @@ public class EndpointConnectionStatePool { } - public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { - return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort(), peerStatus.isSecure()); + private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { + return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort()); } - public CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port, final boolean secure) throws IOException { + private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { + if ( siteToSiteSecure == null ) { + throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); + } + final String destinationUri = "nifi://" + hostname + ":" + port; CommunicationsSession commsSession = null; try { - if ( secure ) { - if ( sslContext == null ) { - throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); - } - - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - socketChannel.connect(); - - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); - - try { - commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { - throw new IOException(ex); - } - } else { - final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); - } - - commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - - commsSession.setUri(destinationUri); + if ( siteToSiteSecure ) { + if ( sslContext == null ) { + throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); + } + + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + socketChannel.connect(); + + commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); + + try { + commsSession.setUserDn(socketChannel.getDn()); + } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { + throw new IOException(ex); + } + } else { + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); + } + + commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); + commsSession.setUri(destinationUri); } catch (final IOException ioe) { if ( commsSession != null ) { commsSession.close(); @@ -578,59 +632,52 @@ public class EndpointConnectionStatePool { } - public void cleanupExpiredSockets() { + private void cleanupExpiredSockets() { final List<EndpointConnectionState> states = new ArrayList<>(); - for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { - states.clear(); - - EndpointConnectionState state; - while ((state = queue.poll()) != null) { - // If the socket has not been used in 10 seconds, shut it down. - final long lastUsed = state.getLastTimeUsed(); - if ( lastUsed < System.currentTimeMillis() - 10000L ) { - try { - state.getSocketClientProtocol().shutdown(state.getPeer()); - } catch (final Exception e) { - logger.debug("Failed to shut down {} using {} due to {}", - new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); - } - - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } else { - states.add(state); + EndpointConnectionState state; + while ((state = connectionStateQueue.poll()) != null) { + // If the socket has not been used in 10 seconds, shut it down. + final long lastUsed = state.getLastTimeUsed(); + if ( lastUsed < System.currentTimeMillis() - 10000L ) { + try { + state.getSocketClientProtocol().shutdown(state.getPeer()); + } catch (final Exception e) { + logger.debug("Failed to shut down {} using {} due to {}", + new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); } + + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } else { + states.add(state); } - - queue.addAll(states); } + + connectionStateQueue.addAll(states); } public void shutdown() { + taskExecutor.shutdown(); peerTimeoutExpirations.clear(); for ( final CommunicationsSession commsSession : activeCommsChannels ) { commsSession.interrupt(); } - for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { - EndpointConnectionState state; - while ( (state = queue.poll()) != null) { - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } + EndpointConnectionState state; + while ( (state = connectionStateQueue.poll()) != null) { + cleanup(state.getSocketClientProtocol(), state.getPeer()); } - - endpointConnectionMap.clear(); } - public void refreshPeers(final URI targetUri, final boolean secure) { + private void refreshPeers() { final PeerStatusCache existingCache = peerStatusCache; if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { return; } try { - final Set<PeerStatus> statuses = fetchRemotePeerStatuses(targetUri, secure); + final Set<PeerStatus> statuses = fetchRemotePeerStatuses(); peerStatusCache = new PeerStatusCache(statuses); logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); } catch (Exception e) { @@ -639,6 +686,92 @@ public class EndpointConnectionStatePool { logger.warn("", e); } } + } + + + public String getInputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, inputPortMap); + } + + public String getOutputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, outputPortMap); + } + + + private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { + String identifier; + remoteInfoReadLock.lock(); + try { + identifier = portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + + if ( identifier != null ) { + return identifier; + } + + refreshRemoteInfo(); + + remoteInfoReadLock.lock(); + try { + return portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + } + + + private ControllerDTO refreshRemoteInfo() throws IOException { + final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); + final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null); + final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout); + + remoteInfoWriteLock.lock(); + try { + this.siteToSitePort = controller.getRemoteSiteListeningPort(); + this.siteToSiteSecure = controller.isSiteToSiteSecure(); + + inputPortMap.clear(); + for (final PortDTO inputPort : controller.getInputPorts()) { + inputPortMap.put(inputPort.getName(), inputPort.getId()); + } + + outputPortMap.clear(); + for ( final PortDTO outputPort : controller.getOutputPorts()) { + outputPortMap.put(outputPort.getName(), outputPort.getId()); + } + + this.remoteRefreshTime = System.currentTimeMillis(); + } finally { + remoteInfoWriteLock.unlock(); + } + + return controller; + } + + /** + * @return the port that the remote instance is listening on for + * site-to-site communication, or <code>null</code> if the remote instance + * is not configured to allow site-to-site communications. + * + * @throws IOException if unable to communicate with the remote instance + */ + private Integer getSiteToSitePort() throws IOException { + Integer listeningPort; + remoteInfoReadLock.lock(); + try { + listeningPort = this.siteToSitePort; + if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return listeningPort; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + listeningPort = controller.getRemoteSiteListeningPort(); + return listeningPort; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 48e9cc5..b81b425 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -16,16 +16,87 @@ */ package org.apache.nifi.remote.client.socket; +import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import org.apache.nifi.remote.client.DataPacket; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.protocol.DataPacket; public class SocketClient implements SiteToSiteClient { - + private final EndpointConnectionStatePool pool; + private final boolean compress; + private final String portName; + private final long penalizationNanos; + private volatile String portIdentifier; + + private SocketClient(final Builder builder) { + pool = new EndpointConnectionStatePool(builder.url, (int) TimeUnit.NANOSECONDS.toMillis(builder.timeoutNanos), + builder.sslContext, builder.eventReporter, builder.peerPersistenceFile); + + this.compress = builder.useCompression; + this.portIdentifier = builder.portIdentifier; + this.portName = builder.portName; + this.penalizationNanos = builder.penalizationNanos; + } + + + private String getPortIdentifier(final TransferDirection direction) throws IOException { + final String id = this.portIdentifier; + if ( id != null ) { + return id; + } + + if ( direction == TransferDirection.SEND ) { + return pool.getInputPortIdentifier(this.portName); + } else { + return pool.getOutputPortIdentifier(this.portName); + } + } + + @Override public void send(final DataPacket dataPacket) throws IOException { - // TODO Auto-generated method stub + final String portId = getPortIdentifier(TransferDirection.SEND); + + if ( portId == null ) { + throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance"); + } + + final RemoteDestination remoteDestination = new RemoteDestination() { + @Override + public String getIdentifier() { + return portId; + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public boolean isUseCompression() { + return compress; + } + }; + + final EndpointConnectionState connectionState; + try { + connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND); + } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) { + throw new IOException(e); + } + + } @Override @@ -33,5 +104,79 @@ public class SocketClient implements SiteToSiteClient { // TODO Auto-generated method stub return null; } + + @Override + public void close() throws IOException { + pool.shutdown(); + } + + public static class Builder { + private String url; + private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); + private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); + private SSLContext sslContext; + private EventReporter eventReporter; + private File peerPersistenceFile; + private boolean useCompression; + private String portName; + private String portIdentifier; + + public Builder url(final String url) { + this.url = url; + return this; + } + + public Builder timeout(final long timeout, final TimeUnit unit) { + this.timeoutNanos = unit.toNanos(timeout); + return this; + } + + public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { + this.penalizationNanos = unit.toNanos(period); + return this; + } + + public Builder sslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + return this; + } + + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + + public Builder peerPersistenceFile(final File peerPersistenceFile) { + this.peerPersistenceFile = peerPersistenceFile; + return this; + } + + public Builder useCompression(final boolean compress) { + this.useCompression = compress; + return this; + } + + public Builder portName(final String portName) { + this.portName = portName; + return this; + } + + public Builder portIdentifier(final String portIdentifier) { + this.portIdentifier = portIdentifier; + return this; + } + + public SocketClient build() { + if ( url == null ) { + throw new IllegalStateException("Must specify URL to build Site-to-Site client"); + } + + if ( portName == null && portIdentifier == null ) { + throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); + } + + return new SocketClient(this); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index 32274eb..d817425 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -46,6 +47,20 @@ public interface ClientProtocol extends VersionedRemoteResource { boolean isReadyForFileTransfer(); + + + + void startTransaction(Peer peer, TransferDirection direction) throws IOException; + + void completeTransaction(); + + void rollbackTransaction(); + + void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException; + + DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException; + + /** * returns <code>true</code> if remote instance indicates that the port is * invalid http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java new file mode 100644 index 0000000..f4fa4d0 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.InputStream; +import java.util.Map; + +public interface DataPacket { + + Map<String, String> getAttributes(); + + InputStream getData(); + + long getSize(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 560385c..6b0c94b 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -41,7 +41,9 @@ import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteResourceInitiator; import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.client.Transaction; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -50,6 +52,7 @@ import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.StopWatch; @@ -60,7 +63,7 @@ public class SocketClientProtocol implements ClientProtocol { private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); private RemoteDestination destination; - private boolean useCompression; + private boolean useCompression = false; private String commsIdentifier; private boolean handshakeComplete = false; @@ -70,6 +73,7 @@ public class SocketClientProtocol implements ClientProtocol { private Response handshakeResponse = null; private boolean readyForFileTransfer = false; private String transitUriPrefix = null; + private int timeoutMillis = 30000; private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds @@ -81,13 +85,16 @@ public class SocketClientProtocol implements ClientProtocol { this.useCompression = destination.isUseCompression(); } + public void setTimeout(final int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } @Override public void handshake(final Peer peer) throws IOException, HandshakeException { - handshake(peer, destination.getIdentifier(), (int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + handshake(peer, destination.getIdentifier()); } - public void handshake(final Peer peer, final String destinationId, final int timeoutMillis) throws IOException, HandshakeException { + public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException { if ( handshakeComplete ) { throw new IllegalStateException("Handshake has already been completed"); } @@ -228,6 +235,65 @@ public class SocketClientProtocol implements ClientProtocol { return codec; } + + // TODO: move up to top with member variables + private SocketClientTransaction transaction; + + @Override + public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse); + } + + transaction = new SocketClientTransaction(peer, direction, useCompression); + + final DataOutputStream dos = transaction.getDataOutputStream(); + if ( direction == TransferDirection.RECEIVE ) { + // Indicate that we would like to have some data + RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); + dos.flush(); + } else { + // Indicate that we would like to have some data + RequestType.SEND_FLOWFILES.writeRequestType(dos); + dos.flush(); + } + } + + @Override + public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException { + if ( transaction == null ) { + throw new IllegalStateException("Cannot receive data because no transaction has been started"); + } + + final Peer peer = transaction.getPeer(); + logger.debug("{} Receiving FlowFiles from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String userDn = commsSession.getUserDn(); + if ( userDn == null ) { + userDn = "none"; + } + + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + return null; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + + + } + @Override public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { @@ -258,6 +324,7 @@ public class SocketClientProtocol implements ClientProtocol { logger.debug("{} {} Indicates that data is available", this, peer); break; case NO_MORE_DATA: + context.yield(); logger.debug("{} No data available from {}", peer); return; default: http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java new file mode 100644 index 0000000..0c4ce05 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.Transaction; +import org.apache.nifi.remote.io.CompressionInputStream; + +public class SocketClientTransaction implements Transaction { + private final long startTime = System.nanoTime(); + private long bytesReceived = 0L; + private CRC32 crc = new CRC32(); + + private final Peer peer; + private final TransferDirection direction; + + private final DataInputStream dis; + private final DataOutputStream dos; + private final CheckedInputStream checkedInputStream; + + SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException { + this.peer = peer; + this.direction = direction; + + this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); + this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); + + final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis; + checkedInputStream = new CheckedInputStream(dataInputStream, crc); + } + + CheckedInputStream getCheckedInputStream() { + return checkedInputStream; + } + + DataOutputStream getDataOutputStream() { + return dos; + } + + Peer getPeer() { + return peer; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java new file mode 100644 index 0000000..b2dbdcd --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.MediaType; + +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.entity.ControllerEntity; +import org.apache.nifi.web.util.WebUtils; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +/** + * + */ +public class RemoteNiFiUtils { + + public static final String CONTROLLER_URI_PATH = "/controller"; + + private static final int CONNECT_TIMEOUT = 10000; + private static final int READ_TIMEOUT = 10000; + + private final Client client; + + public RemoteNiFiUtils(final SSLContext sslContext) { + this.client = getClient(sslContext); + } + + + /** + * Gets the content at the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + return get(uri, timeoutMillis, null); + } + + /** + * Gets the content at the specified URI using the given query parameters. + * + * @param uri + * @param timeoutMillis + * @param queryParams + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + if ( queryParams != null ) { + for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) { + webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); + } + } + + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + + return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } + + /** + * Performs a HEAD request to the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + return webResource.head(); + } + + /** + * Gets a client based on the specified URI. + * + * @param uri + * @return + */ + private Client getClient(final SSLContext sslContext) { + final Client client; + if (sslContext == null) { + client = WebUtils.createClient(null); + } else { + client = WebUtils.createClient(null, sslContext); + } + + client.setReadTimeout(READ_TIMEOUT); + client.setConnectTimeout(CONNECT_TIMEOUT); + + return client; + } + + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port. + * @param timeoutMillis + * @return + * @throws IOException + */ + public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteListeningPort(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteRootGroupId(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getController(uriObject, timeoutMillis).getInstanceId(); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the full URI to fetch, including the path. + * @return + * @throws IOException + */ + private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getRemoteSiteListeningPort(); + } + + private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getId(); + } + + public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { + final ClientResponse response = get(uri, timeoutMillis); + + if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) { + final ControllerEntity entity = response.getEntity(ControllerEntity.class); + return entity.getController(); + } else { + final String responseMessage = response.getEntity(String.class); + throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage); + } + } + + /** + * Issues a registration request on behalf of the current user. + * + * @param baseApiUri + * @return + */ + public ClientResponse issueRegistrationRequest(String baseApiUri) { + final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users")); + + // set up the query params + MultivaluedMapImpl entity = new MultivaluedMapImpl(); + entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first."); + + // create the web resource + WebResource webResource = client.resource(uri); + + // get the client utils and make the request + return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 9f2dac8..ac41cba 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.groups; -import java.io.IOException; import java.net.URI; import java.util.Date; import java.util.Set; @@ -109,15 +108,6 @@ public interface RemoteProcessGroup { String getCommunicationsTimeout(); /** - * @return the port that the remote instance is listening on for - * site-to-site communication, or <code>null</code> if the remote instance - * is not configured to allow site-to-site communications. - * - * @throws IOException if unable to communicate with the remote instance - */ - Integer getListeningPort() throws IOException; - - /** * Indicates whether or not the RemoteProcessGroup is currently scheduled to * transmit data * @@ -229,24 +219,4 @@ public interface RemoteProcessGroup { void verifyCanStopTransmitting(); void verifyCanUpdate(); - - /** - * Returns a set of PeerStatus objects that describe the different peers - * that we can communicate with for this RemoteProcessGroup. - * - * If the destination is a cluster, this set will contain PeerStatuses for - * each of the nodes in the cluster. - * - * If the destination is a standalone instance, this set will contain just a - * PeerStatus for the destination. - * - * Once the PeerStatuses have been obtained, they may be cached by this - * RemoteProcessGroup for some amount of time. - * - * If unable to obtain the PeerStatuses or no peer status has yet been - * obtained, will return null. - * - * @return - */ - Set<PeerStatus> getPeerStatuses(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 857add9..db0aeb7 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -19,7 +19,6 @@ package org.apache.nifi.remote; import static java.util.Objects.requireNonNull; import java.io.File; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -50,7 +49,6 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.exception.CommunicationsException; -import org.apache.nifi.controller.util.RemoteProcessGroupUtils; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; @@ -59,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; +import org.apache.nifi.remote.util.RemoteNiFiUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; @@ -85,7 +84,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public static final String CONTROLLER_URI_PATH = "/controller"; public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status"; - public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); // status codes public static final int OK_STATUS_CODE = Status.OK.getStatusCode(); @@ -127,9 +125,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 0, 0, 0); private Long refreshContentsTimestamp = null; - private Integer listeningPort; - private long listeningPortRetrievalTime = 0L; private Boolean destinationSecure; + private Integer listeningPort; private volatile String authorizationIssue; @@ -175,48 +172,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } }; - endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile()); + endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS), + sslContext, eventReporter, getPeerPersistenceFile()); - final Runnable socketCleanup = new Runnable() { - @Override - public void run() { - final Set<StandardRemoteGroupPort> ports = new HashSet<>(); - readLock.lock(); - try { - ports.addAll(inputPorts.values()); - ports.addAll(outputPorts.values()); - } finally { - readLock.unlock(); - } - - endpointConnectionPool.cleanupExpiredSockets(); - } - }; - - final Runnable refreshPeers = new Runnable() { - @Override - public void run() { - final boolean secure; - try { - secure = isSecure(); - } catch (CommunicationsException e) { - logger.warn("{} Unable to determine if remote instance {} is configured for secure site-to-site due to {}; will not refresh list of peers", new Object[] {this, getTargetUri(), e.toString()}); - if ( logger.isDebugEnabled() ) { - logger.warn("", e); - } - return; - } - - endpointConnectionPool.refreshPeers(getTargetUri(), secure); - } - }; - final Runnable checkAuthorizations = new InitializationTask(); backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS); - backgroundThreadExecutor.scheduleWithFixedDelay(refreshPeers, 0, 5, TimeUnit.SECONDS); - backgroundThreadExecutor.scheduleWithFixedDelay(socketCleanup, 10L, 10L, TimeUnit.SECONDS); } @Override @@ -810,7 +772,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return; } - final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); + final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null); final String uriVal = apiUri.toString() + CONTROLLER_URI_PATH; URI uri; try { @@ -950,39 +912,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return descriptor; } - /** - * @return the port that the remote instance is listening on for - * site-to-site communication, or <code>null</code> if the remote instance - * is not configured to allow site-to-site communications. - * - * @throws IOException if unable to communicate with the remote instance - */ - @Override - public Integer getListeningPort() throws IOException { - Integer listeningPort; - readLock.lock(); - try { - listeningPort = this.listeningPort; - if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) { - return listeningPort; - } - } finally { - readLock.unlock(); - } - - final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); - listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - writeLock.lock(); - try { - this.listeningPort = listeningPort; - this.listeningPortRetrievalTime = System.currentTimeMillis(); - } finally { - writeLock.unlock(); - } - - return listeningPort; - } - @Override public boolean isTransmitting() { return transmitting.get(); @@ -1218,7 +1147,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void run() { try { - final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); + final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null); final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); final int statusCode = response.getStatus(); @@ -1398,12 +1327,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } - @Override - public Set<PeerStatus> getPeerStatuses() { - return endpointConnectionPool.getPeerStatuses(); - } - - private File getPeerPersistenceFile() { final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory(); return new File(stateDir, getIdentifier() + ".peers"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java deleted file mode 100644 index 10208f8..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.util; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; - -import javax.net.ssl.SSLContext; -import javax.ws.rs.core.MediaType; - -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.entity.ControllerEntity; -import org.apache.nifi.web.util.WebUtils; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.UniformInterfaceException; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.core.util.MultivaluedMapImpl; - -/** - * - */ -public class RemoteProcessGroupUtils { - - public static final String CONTROLLER_URI_PATH = "/controller"; - - private static final int CONNECT_TIMEOUT = 10000; - private static final int READ_TIMEOUT = 10000; - - private final Client client; - - public RemoteProcessGroupUtils(final SSLContext sslContext) { - this.client = getClient(sslContext); - } - - - /** - * Gets the content at the specified URI. - * - * @param uri - * @param timeoutMillis - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { - return get(uri, timeoutMillis, null); - } - - /** - * Gets the content at the specified URI using the given query parameters. - * - * @param uri - * @param timeoutMillis - * @param queryParams - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { - // perform the request - WebResource webResource = client.resource(uri); - if ( queryParams != null ) { - for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) { - webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); - } - } - - webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); - webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); - - return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - } - - /** - * Performs a HEAD request to the specified URI. - * - * @param uri - * @param timeoutMillis - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { - // perform the request - WebResource webResource = client.resource(uri); - webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); - webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); - return webResource.head(); - } - - /** - * Gets a client based on the specified URI. - * - * @param uri - * @return - */ - private Client getClient(final SSLContext sslContext) { - final Client client; - if (sslContext == null) { - client = WebUtils.createClient(null); - } else { - client = WebUtils.createClient(null, sslContext); - } - - client.setReadTimeout(READ_TIMEOUT); - client.setConnectTimeout(CONNECT_TIMEOUT); - - return client; - } - - - /** - * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance - * is not configured to use Site-to-Site transfers. - * - * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port. - * @param timeoutMillis - * @return - * @throws IOException - */ - public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getRemoteListeningPort(uriObject, timeoutMillis); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getRemoteRootGroupId(uriObject, timeoutMillis); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getController(uriObject, timeoutMillis).getInstanceId(); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - /** - * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance - * is not configured to use Site-to-Site transfers. - * - * @param uri the full URI to fetch, including the path. - * @return - * @throws IOException - */ - private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException { - return getController(uri, timeoutMillis).getRemoteSiteListeningPort(); - } - - private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException { - return getController(uri, timeoutMillis).getId(); - } - - private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { - final ClientResponse response = get(uri, timeoutMillis); - - if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) { - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - return entity.getController(); - } else { - final String responseMessage = response.getEntity(String.class); - throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage); - } - } - - /** - * Issues a registration request on behalf of the current user. - * - * @param baseApiUri - * @return - */ - public ClientResponse issueRegistrationRequest(String baseApiUri) { - final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users")); - - // set up the query params - MultivaluedMapImpl entity = new MultivaluedMapImpl(); - entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first."); - - // create the web resource - WebResource webResource = client.resource(uri); - - // get the client utils and make the request - return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java index 922d4e7..2b27de2 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java @@ -56,6 +56,14 @@ public class RemoteResourceFactory extends RemoteResourceInitiator { } } + public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException { + dis.readUTF(); // read codec name + dis.readInt(); // read codec version + + dos.write(ABORT); + dos.writeUTF(explanation); + dos.flush(); + } @SuppressWarnings("unchecked") public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 77ac1a9..82d8206 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -17,7 +17,6 @@ package org.apache.nifi.remote; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -33,7 +32,6 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.processor.ProcessContext; @@ -144,7 +142,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final EndpointConnectionState connectionState; try { - connectionState = connectionStatePool.getEndpointConnectionState(url, this, transferDirection); + connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection); } catch (final PortNotRunningException e) { context.yield(); this.targetRunning.set(false); @@ -366,28 +364,4 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { public boolean isSideEffectFree() { return false; } - - @Override - public String getDescription() { - return toString(); - } - - @Override - public long getCommunicationsTimeout(final TimeUnit timeUnit) { - return getRemoteProcessGroup().getCommunicationsTimeout(timeUnit); - } - - @Override - public URI getTargetUri() { - return remoteGroup.getTargetUri(); - } - - @Override - public boolean isSecure() { - try { - return remoteGroup.isSecure(); - } catch (final CommunicationsException ce) { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 647b45c..887429c 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -204,11 +204,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name()); throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name()); } - if ( port == null ) { - logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing"); - ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name()); - throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name()); - } // send "OK" response if ( !responseWritten ) { @@ -243,6 +238,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + if ( port == null ) { + RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified"); + } + // Negotiate the FlowFileCodec to use. try { negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java index 94de86b..8c972f7 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java @@ -16,21 +16,11 @@ */ package org.apache.nifi.remote; -import java.net.URI; import java.util.concurrent.TimeUnit; public interface RemoteDestination { - - String getDescription(); - String getIdentifier(); - URI getTargetUri(); - - boolean isSecure(); - - long getCommunicationsTimeout(TimeUnit timeUnit); - long getYieldPeriod(TimeUnit timeUnit); boolean isUseCompression();
