http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java index 2b27de2..b0ce357 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java @@ -28,22 +28,22 @@ import org.apache.nifi.remote.protocol.ServerProtocol; public class RemoteResourceFactory extends RemoteResourceInitiator { - @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") public static <T extends FlowFileCodec> T receiveCodecNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { final String codecName = dis.readUTF(); final int version = dis.readInt(); - + final T codec = (T) RemoteResourceManager.createCodec(codecName, version); final VersionNegotiator negotiator = codec.getVersionNegotiator(); - if ( negotiator.isVersionSupported(version) ) { + if (negotiator.isVersionSupported(version)) { dos.write(RESOURCE_OK); dos.flush(); - + negotiator.setVersion(version); return codec; } else { final Integer preferred = negotiator.getPreferredVersion(version); - if ( preferred == null ) { + if (preferred == null) { dos.write(ABORT); dos.flush(); throw new HandshakeException("Unable to negotiate an acceptable version of the FlowFileCodec " + codecName); @@ -51,36 +51,36 @@ public class RemoteResourceFactory extends RemoteResourceInitiator { dos.write(DIFFERENT_RESOURCE_VERSION); dos.writeInt(preferred); dos.flush(); - + return receiveCodecNegotiation(dis, dos); } - } - - public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException { - dis.readUTF(); // read codec name - dis.readInt(); // read codec version - - dos.write(ABORT); - dos.writeUTF(explanation); - dos.flush(); - } - - @SuppressWarnings("unchecked") + } + + public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException { + dis.readUTF(); // read codec name + dis.readInt(); // read codec version + + dos.write(ABORT); + dos.writeUTF(explanation); + dos.flush(); + } + + @SuppressWarnings("unchecked") public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { final String protocolName = dis.readUTF(); final int version = dis.readInt(); - + final T protocol = (T) RemoteResourceManager.createClientProtocol(protocolName); final VersionNegotiator negotiator = protocol.getVersionNegotiator(); - if ( negotiator.isVersionSupported(version) ) { + if (negotiator.isVersionSupported(version)) { dos.write(RESOURCE_OK); dos.flush(); - + negotiator.setVersion(version); return protocol; } else { final Integer preferred = negotiator.getPreferredVersion(version); - if ( preferred == null ) { + if (preferred == null) { dos.write(ABORT); dos.flush(); throw new HandshakeException("Unable to negotiate an acceptable version of the ClientProtocol " + protocolName); @@ -88,28 +88,27 @@ public class RemoteResourceFactory extends RemoteResourceInitiator { dos.write(DIFFERENT_RESOURCE_VERSION); dos.writeInt(preferred); dos.flush(); - + return receiveClientProtocolNegotiation(dis, dos); } } - - - @SuppressWarnings("unchecked") + + @SuppressWarnings("unchecked") public static <T extends ServerProtocol> T receiveServerProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { - final String protocolName = dis.readUTF(); + final String protocolName = dis.readUTF(); final int version = dis.readInt(); - + final T protocol = (T) RemoteResourceManager.createServerProtocol(protocolName); final VersionNegotiator negotiator = protocol.getVersionNegotiator(); - if ( negotiator.isVersionSupported(version) ) { + if (negotiator.isVersionSupported(version)) { dos.write(RESOURCE_OK); dos.flush(); - + negotiator.setVersion(version); return protocol; } else { final Integer preferred = negotiator.getPreferredVersion(version); - if ( preferred == null ) { + if (preferred == null) { dos.write(ABORT); dos.flush(); throw new HandshakeException("Unable to negotiate an acceptable version of the ServerProtocol " + protocolName); @@ -117,54 +116,53 @@ public class RemoteResourceFactory extends RemoteResourceInitiator { dos.write(DIFFERENT_RESOURCE_VERSION); dos.writeInt(preferred); dos.flush(); - + return receiveServerProtocolNegotiation(dis, dos); } } - - - - - public static <T extends VersionedRemoteResource> T receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs) throws IOException, HandshakeException { - final String resourceClassName = dis.readUTF(); - final T resource; - try { + + public static <T extends VersionedRemoteResource> T + receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs) + throws IOException, HandshakeException { + final String resourceClassName = dis.readUTF(); + final T resource; + try { @SuppressWarnings("unchecked") - final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName); - if ( !cls.isAssignableFrom(resourceClass) ) { - throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName); + final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName); + if (!cls.isAssignableFrom(resourceClass)) { + throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName); } - + final Constructor<T> ctr = resourceClass.getConstructor(constructorArgClasses); resource = ctr.newInstance(constructorArgs); } catch (final Throwable t) { - dos.write(ABORT); - final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName; - dos.writeUTF(errorMsg); - dos.flush(); - throw new HandshakeException(errorMsg); + dos.write(ABORT); + final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName; + dos.writeUTF(errorMsg); + dos.flush(); + throw new HandshakeException(errorMsg); } - + final int version = dis.readInt(); final VersionNegotiator negotiator = resource.getVersionNegotiator(); - if ( negotiator.isVersionSupported(version) ) { + if (negotiator.isVersionSupported(version)) { dos.write(RESOURCE_OK); dos.flush(); - + negotiator.setVersion(version); return resource; } else { final Integer preferred = negotiator.getPreferredVersion(version); - if ( preferred == null ) { - dos.write(ABORT); - dos.flush(); - throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName); + if (preferred == null) { + dos.write(ABORT); + dos.flush(); + throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName); } dos.write(DIFFERENT_RESOURCE_VERSION); dos.writeInt(preferred); dos.flush(); - + return receiveResourceNegotiation(cls, dis, dos, constructorArgClasses, constructorArgs); } - } + } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java index f86f066..8bbe7aa 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java @@ -34,20 +34,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RemoteResourceManager { + private static final Map<String, Class<? extends FlowFileCodec>> codecClassMap; private static final Map<String, Class<? extends ServerProtocol>> desiredServerProtocolClassMap = new ConcurrentHashMap<>(); private static final Map<String, Class<? extends ClientProtocol>> desiredClientProtocolClassMap = new ConcurrentHashMap<>(); - + private static final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolClassMap; private static final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolClassMap; - + private static final Logger logger = LoggerFactory.getLogger(RemoteResourceManager.class); - + static { final Map<String, Class<? extends FlowFileCodec>> codecMap = new HashMap<>(); final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolMap = new HashMap<>(); final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolMap = new HashMap<>(); - + // load all of the FlowFileCodecs that we know final ClassLoader classLoader = RemoteResourceManager.class.getClassLoader(); final ServiceLoader<FlowFileCodec> flowFileCodecLoader = ServiceLoader.load(FlowFileCodec.class, classLoader); @@ -58,12 +59,12 @@ public class RemoteResourceManager { final String codecName = codec.getResourceName(); final Class<? extends FlowFileCodec> previousValue = codecMap.put(codecName, clazz); - if ( previousValue != null ) { - logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}", - new Object[] {codecName, clazz.getName(), previousValue.getName()}); + if (previousValue != null) { + logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}", + new Object[]{codecName, clazz.getName(), previousValue.getName()}); } } - + final ServiceLoader<ServerProtocol> serverProtocolLoader = ServiceLoader.load(ServerProtocol.class, classLoader); final Iterator<ServerProtocol> serverItr = serverProtocolLoader.iterator(); while (serverItr.hasNext()) { @@ -72,14 +73,14 @@ public class RemoteResourceManager { final String protocolName = protocol.getResourceName(); Set<Class<? extends ServerProtocol>> classSet = serverProtocolMap.get(protocolName); - if ( classSet == null ) { + if (classSet == null) { classSet = new HashSet<>(); serverProtocolMap.put(protocolName, classSet); } - + classSet.add(clazz); } - + final ServiceLoader<ClientProtocol> clientProtocolLoader = ServiceLoader.load(ClientProtocol.class, classLoader); final Iterator<ClientProtocol> clientItr = clientProtocolLoader.iterator(); while (clientItr.hasNext()) { @@ -88,133 +89,132 @@ public class RemoteResourceManager { final String protocolName = protocol.getResourceName(); Set<Class<? extends ClientProtocol>> classSet = clientProtocolMap.get(protocolName); - if ( classSet == null ) { + if (classSet == null) { classSet = new HashSet<>(); clientProtocolMap.put(protocolName, classSet); } - + classSet.add(clazz); } - + codecClassMap = Collections.unmodifiableMap(codecMap); clientProtocolClassMap = Collections.unmodifiableMap(clientProtocolMap); serverProtocolClassMap = Collections.unmodifiableMap(serverProtocolMap); } - public static boolean isCodecSupported(final String codecName) { return codecClassMap.containsKey(codecName); } - + public static boolean isCodecSupported(final String codecName, final int version) { - if ( !isCodecSupported(codecName) ) { + if (!isCodecSupported(codecName)) { return false; } - + final FlowFileCodec codec = createCodec(codecName); final VersionNegotiator negotiator = codec.getVersionNegotiator(); return (negotiator.isVersionSupported(version)); } - + public static FlowFileCodec createCodec(final String codecName, final int version) { final FlowFileCodec codec = createCodec(codecName); final VersionNegotiator negotiator = codec.getVersionNegotiator(); - if ( !negotiator.isVersionSupported(version) ) { + if (!negotiator.isVersionSupported(version)) { throw new IllegalArgumentException("FlowFile Codec " + codecName + " does not support version " + version); } - + negotiator.setVersion(version); return codec; } - + private static FlowFileCodec createCodec(final String codecName) { final Class<? extends FlowFileCodec> codecClass = codecClassMap.get(codecName); - if ( codecClass == null ) { + if (codecClass == null) { throw new IllegalArgumentException("Unknown Codec: " + codecName); } - + try { return codecClass.newInstance(); } catch (final Exception e) { throw new RuntimeException("Unable to instantiate class " + codecClass.getName(), e); } } - + public static Set<String> getSupportedCodecNames() { return codecClassMap.keySet(); } - + public static List<Integer> getSupportedVersions(final String codecName) { final FlowFileCodec codec = createCodec(codecName); return codec.getSupportedVersions(); } - + public static Set<Class<? extends ClientProtocol>> getClientProtocolClasses(final String protocolName) { final Set<Class<? extends ClientProtocol>> classes = clientProtocolClassMap.get(protocolName); - if ( classes == null ) { + if (classes == null) { return new HashSet<>(); } return new HashSet<>(classes); } - + public static Set<Class<? extends ServerProtocol>> getServerProtocolClasses(final String protocolName) { final Set<Class<? extends ServerProtocol>> classes = serverProtocolClassMap.get(protocolName); - if ( classes == null ) { + if (classes == null) { return new HashSet<>(); } return new HashSet<>(classes); } - + public static void setServerProtocolImplementation(final String protocolName, final Class<? extends ServerProtocol> clazz) { desiredServerProtocolClassMap.put(protocolName, clazz); } - + public static void setClientProtocolImplementation(final String protocolName, final Class<? extends ClientProtocol> clazz) { desiredClientProtocolClassMap.put(protocolName, clazz); } - + public static ServerProtocol createServerProtocol(final String protocolName) { final Set<Class<? extends ServerProtocol>> classSet = getServerProtocolClasses(protocolName); - if ( classSet.isEmpty() ) { + if (classSet.isEmpty()) { throw new IllegalArgumentException("Unknkown Server Protocol: " + protocolName); } Class<? extends ServerProtocol> desiredClass = desiredServerProtocolClassMap.get(protocolName); - if ( desiredClass == null && classSet.size() > 1 ) { + if (desiredClass == null && classSet.size() > 1) { throw new IllegalStateException("Multiple implementations of Server Protocol " + protocolName + " were found and no preferred implementation has been specified"); } - - if ( desiredClass != null && !classSet.contains(desiredClass) ) { + + if (desiredClass != null && !classSet.contains(desiredClass)) { throw new IllegalStateException("Desired implementation of Server Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Server Protocol"); } - - if ( desiredClass == null ) { + + if (desiredClass == null) { desiredClass = classSet.iterator().next(); } - + try { return desiredClass.newInstance(); } catch (final Exception e) { throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e); - } + } } - + public static ClientProtocol createClientProtocol(final String protocolName) { final Set<Class<? extends ClientProtocol>> classSet = getClientProtocolClasses(protocolName); - if ( classSet.isEmpty() ) { + if (classSet.isEmpty()) { throw new IllegalArgumentException("Unknkown Client Protocol: " + protocolName); } Class<? extends ClientProtocol> desiredClass = desiredClientProtocolClassMap.get(protocolName); - if ( desiredClass == null && classSet.size() > 1 ) { + if (desiredClass == null && classSet.size() > 1) { throw new IllegalStateException("Multiple implementations of Client Protocol " + protocolName + " were found and no preferred implementation has been specified"); } - - if ( desiredClass != null && !classSet.contains(desiredClass) ) { + + if (desiredClass != null && !classSet.contains(desiredClass)) { throw new IllegalStateException("Desired implementation of Client Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Client Protocol"); } - - if ( desiredClass == null ) { + + if (desiredClass == null) { desiredClass = classSet.iterator().next(); } @@ -222,6 +222,6 @@ public class RemoteResourceManager { return desiredClass.newInstance(); } catch (final Exception e) { throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e); - } + } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java index 59e4d0a..6f7b977 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java @@ -30,4 +30,4 @@ public interface RemoteSiteListener { void stop(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 493d1fe..809147e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -49,43 +49,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketRemoteSiteListener implements RemoteSiteListener { + public static final String DEFAULT_FLOWFILE_PATH = "./"; private final int socketPort; private final SSLContext sslContext; private final NodeInformant nodeInformant; private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference<>(); - + private final AtomicBoolean stopped = new AtomicBoolean(false); - + private static final Logger LOG = LoggerFactory.getLogger(SocketRemoteSiteListener.class); public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext) { this(socketPort, sslContext, null); } - + public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext, final NodeInformant nodeInformant) { this.socketPort = socketPort; this.sslContext = sslContext; this.nodeInformant = nodeInformant; } - @Override public void setRootGroup(final ProcessGroup rootGroup) { this.rootGroup.set(rootGroup); } - @Override public void start() throws IOException { final boolean secure = (sslContext != null); - + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); serverSocketChannel.bind(new InetSocketAddress(socketPort)); stopped.set(false); - + final Thread listenerThread = new Thread(new Runnable() { private int threadCount = 0; @@ -95,19 +94,21 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { final ProcessGroup processGroup = rootGroup.get(); // If nodeInformant is not null, we are in clustered mode, which means that we don't care about // the processGroup. - if ( (nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty())) ) { - try { Thread.sleep(2000L); } catch (final Exception e) {} + if ((nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty()))) { + try { + Thread.sleep(2000L); + } catch (final Exception e) { + } continue; } - - + LOG.trace("Accepting Connection..."); Socket acceptedSocket = null; try { serverSocketChannel.configureBlocking(false); final ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.setSoTimeout(2000); - while ( !stopped.get() && acceptedSocket == null ) { + while (!stopped.get() && acceptedSocket == null) { try { acceptedSocket = serverSocket.accept(); } catch (final SocketTimeoutException ste) { @@ -116,14 +117,14 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { } } catch (final IOException e) { LOG.error("RemoteSiteListener Unable to accept connection due to {}", e.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", e); } continue; } LOG.trace("Got connection"); - - if ( stopped.get() ) { + + if (stopped.get()) { return; } final Socket socket = acceptedSocket; @@ -135,25 +136,25 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { final InetAddress inetAddress = socket.getInetAddress(); String hostname = inetAddress.getHostName(); final int slashIndex = hostname.indexOf("/"); - if ( slashIndex == 0 ) { + if (slashIndex == 0) { hostname = hostname.substring(1); - } else if ( slashIndex > 0 ) { + } else if (slashIndex > 0) { hostname = hostname.substring(0, slashIndex); } final int port = socket.getPort(); final String peerUri = "nifi://" + hostname + ":" + port; LOG.debug("{} Connection URL is {}", this, peerUri); - + final CommunicationsSession commsSession; final String dn; try { - if ( secure ) { + if (secure) { final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false); LOG.trace("Channel is secure; connecting..."); sslSocketChannel.connect(); LOG.trace("Channel connected"); - + commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri); dn = sslSocketChannel.getDn(); commsSession.setUserDn(dn); @@ -164,7 +165,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { } } catch (final Exception e) { LOG.error("RemoteSiteListener Unable to accept connection from {} due to {}", socket, e.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", e); } try { @@ -173,135 +174,136 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { } return; } - + LOG.info("Received connection from {}, User DN: {}", socket.getInetAddress(), dn); - + final InputStream socketIn; final OutputStream socketOut; - + try { socketIn = commsSession.getInput().getInputStream(); socketOut = commsSession.getOutput().getOutputStream(); } catch (final IOException e) { - LOG.error("Connection dropped from {} before any data was transmitted", peerUri); - try { - commsSession.close(); - } catch (final IOException ioe) {} - - return; + LOG.error("Connection dropped from {} before any data was transmitted", peerUri); + try { + commsSession.close(); + } catch (final IOException ioe) { + } + + return; } - + final DataInputStream dis = new DataInputStream(socketIn); - final DataOutputStream dos = new DataOutputStream(socketOut); - - ServerProtocol protocol = null; - Peer peer = null; + final DataOutputStream dos = new DataOutputStream(socketOut); + + ServerProtocol protocol = null; + Peer peer = null; try { - // ensure that we are communicating with another NiFi + // ensure that we are communicating with another NiFi LOG.debug("Verifying magic bytes..."); - verifyMagicBytes(dis, peerUri); - - LOG.debug("Receiving Server Protocol Negotiation"); - protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos); - protocol.setRootProcessGroup(rootGroup.get()); - protocol.setNodeInformant(nodeInformant); - - final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null); - peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort()); - LOG.debug("Handshaking...."); - protocol.handshake(peer); - - if (!protocol.isHandshakeSuccessful()) { - LOG.error("Handshake failed with {}; closing connection", peer); - try { - peer.close(); - } catch (final IOException e) { - LOG.warn("Failed to close {} due to {}", peer, e); - } - - // no need to shutdown protocol because we failed to perform handshake - return; - } - - commsSession.setTimeout((int) protocol.getRequestExpiration()); - - LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[] { - protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer}); - - try { - while (!protocol.isShutdown()) { - LOG.trace("Getting Protocol Request Type..."); - + verifyMagicBytes(dis, peerUri); + + LOG.debug("Receiving Server Protocol Negotiation"); + protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos); + protocol.setRootProcessGroup(rootGroup.get()); + protocol.setNodeInformant(nodeInformant); + + final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null); + peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort()); + LOG.debug("Handshaking...."); + protocol.handshake(peer); + + if (!protocol.isHandshakeSuccessful()) { + LOG.error("Handshake failed with {}; closing connection", peer); + try { + peer.close(); + } catch (final IOException e) { + LOG.warn("Failed to close {} due to {}", peer, e); + } + + // no need to shutdown protocol because we failed to perform handshake + return; + } + + commsSession.setTimeout((int) protocol.getRequestExpiration()); + + LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[]{ + protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer}); + + try { + while (!protocol.isShutdown()) { + LOG.trace("Getting Protocol Request Type..."); + int timeoutCount = 0; RequestType requestType = null; - - while ( requestType == null ) { + + while (requestType == null) { try { requestType = protocol.getRequestType(peer); } catch (final SocketTimeoutException e) { // Give the timeout a bit longer (twice as long) to receive the Request Type, // in order to attempt to receive more data without shutting down the socket if we don't // have to. - LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[] {this, protocol, peer}); + LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[]{this, protocol, peer}); timeoutCount++; requestType = null; - - if ( timeoutCount >= 2 ) { + + if (timeoutCount >= 2) { throw e; } } } - + LOG.debug("Request type from {} is {}", protocol, requestType); - switch (requestType) { - case NEGOTIATE_FLOWFILE_CODEC: - protocol.negotiateCodec(peer); - break; - case RECEIVE_FLOWFILES: - // peer wants to receive FlowFiles, so we will transfer FlowFiles. - protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>()); - break; - case SEND_FLOWFILES: - // Peer wants to send FlowFiles, so we will receive. + switch (requestType) { + case NEGOTIATE_FLOWFILE_CODEC: + protocol.negotiateCodec(peer); + break; + case RECEIVE_FLOWFILES: + // peer wants to receive FlowFiles, so we will transfer FlowFiles. + protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>()); + break; + case SEND_FLOWFILES: + // Peer wants to send FlowFiles, so we will receive. protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, String>()); - break; - case REQUEST_PEER_LIST: - protocol.sendPeerList(peer); - break; - case SHUTDOWN: - protocol.shutdown(peer); - break; - } - } - LOG.debug("Finished communicating with {} ({})", peer, protocol); - } catch (final Exception e) { - LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString()); - if ( LOG.isDebugEnabled() ) { - LOG.error("", e); - } - } + break; + case REQUEST_PEER_LIST: + protocol.sendPeerList(peer); + break; + case SHUTDOWN: + protocol.shutdown(peer); + break; + } + } + LOG.debug("Finished communicating with {} ({})", peer, protocol); + } catch (final Exception e) { + LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString()); + if (LOG.isDebugEnabled()) { + LOG.error("", e); + } + } } catch (final IOException e) { LOG.error("Unable to communicate with remote instance {} due to {}; closing connection", peer, e.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", e); } } catch (final Throwable t) { LOG.error("Handshake failed when communicating with {}; closing connection. Reason for failure: {}", peerUri, t.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", t); } } finally { LOG.trace("Cleaning up"); try { - if ( protocol != null && peer != null ) { + if (protocol != null && peer != null) { protocol.shutdown(peer); } } catch (final Exception protocolException) { LOG.warn("Failed to shutdown protocol due to {}", protocolException.toString()); } - + try { - if ( peer != null ) { + if (peer != null) { peer.close(); } } catch (final Exception peerException) { @@ -320,30 +322,30 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { listenerThread.setName("Site-to-Site Listener"); listenerThread.start(); } - + @Override public int getPort() { return socketPort; } - + @Override public void stop() { stopped.set(true); } - + private void verifyMagicBytes(final InputStream in, final String peerDescription) throws IOException, HandshakeException { final byte[] receivedMagicBytes = new byte[CommunicationsSession.MAGIC_BYTES.length]; // expect magic bytes try { - for (int i=0; i < receivedMagicBytes.length; i++) { + for (int i = 0; i < receivedMagicBytes.length; i++) { receivedMagicBytes[i] = (byte) in.read(); } } catch (final EOFException e) { throw new HandshakeException("Handshake failed (not enough bytes) when communicating with " + peerDescription); } - - if ( !Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes) ) { + + if (!Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes)) { throw new HandshakeException("Handshake with " + peerDescription + " failed because the Magic Header was not present"); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index eec6ed5..982d9ff 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -56,14 +56,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardRemoteGroupPort extends RemoteGroupPort { + private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String CONTENT_TYPE = "application/octet-stream"; - + public static final int GZIP_COMPRESSION_LEVEL = 1; - + private static final String CATEGORY = "Site to Site"; - + private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class); private final RemoteProcessGroup remoteGroup; private final AtomicBoolean useCompression = new AtomicBoolean(false); @@ -71,28 +72,27 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private final AtomicBoolean targetRunning = new AtomicBoolean(true); private final SSLContext sslContext; private final TransferDirection transferDirection; - + private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>(); - - - public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, + + public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) { // remote group port id needs to be unique but cannot just be the id of the port // in the remote group instance. this supports referencing the same remote // instance more than once. super(id, name, processGroup, type, scheduler); - + this.remoteGroup = remoteGroup; this.transferDirection = direction; this.sslContext = sslContext; setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); } - + private static File getPeerPersistenceFile(final String portId) { final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory(); return new File(stateDir, portId + ".peers"); } - + @Override public boolean isTargetRunning() { return targetRunning.get(); @@ -101,18 +101,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { public void setTargetRunning(boolean targetRunning) { this.targetRunning.set(targetRunning); } - + @Override public boolean isTriggerWhenEmpty() { return getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT; } - + @Override public void shutdown() { - super.shutdown(); - + super.shutdown(); + final SiteToSiteClient client = clientRef.get(); - if ( client != null ) { + if (client != null) { try { client.close(); } catch (final IOException ioe) { @@ -120,58 +120,57 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } } } - + @Override public void onSchedulingStart() { super.onSchedulingStart(); - + final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url(remoteGroup.getTargetUri().toString()) - .portIdentifier(getIdentifier()) - .sslContext(sslContext) - .eventReporter(remoteGroup.getEventReporter()) - .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) - .build(); + .url(remoteGroup.getTargetUri().toString()) + .portIdentifier(getIdentifier()) + .sslContext(sslContext) + .eventReporter(remoteGroup.getEventReporter()) + .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) + .build(); clientRef.set(client); } - - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { - if ( !remoteGroup.isTransmitting() ) { + if (!remoteGroup.isTransmitting()) { logger.debug("{} {} is not transmitting; will not send/receive", this, remoteGroup); return; } - if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0 ) { + if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) { logger.debug("{} No data to send", this); return; } - + String url = getRemoteProcessGroup().getTargetUri().toString(); - + // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, // we don't want to create a transaction at all. final FlowFile firstFlowFile; - if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { + if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) { firstFlowFile = session.get(); - if ( firstFlowFile == null ) { + if (firstFlowFile == null) { return; } } else { firstFlowFile = null; } - + final SiteToSiteClient client = clientRef.get(); final Transaction transaction; try { - transaction = client.createTransaction(transferDirection); + transaction = client.createTransaction(transferDirection); } catch (final PortNotRunningException e) { context.yield(); this.targetRunning.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url); logger.error(message); - session.rollback(); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final UnknownPortException e) { @@ -179,22 +178,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.targetExists.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url); logger.error(message); - session.rollback(); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final IOException e) { - context.yield(); + context.yield(); final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString()); logger.error(message); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", e); } - session.rollback(); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } - - if ( transaction == null ) { + + if (transaction == null) { logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); session.rollback(); context.yield(); @@ -202,11 +201,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } try { - if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { + if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) { transferFlowFiles(transaction, context, session, firstFlowFile); } else { final int numReceived = receiveFlowFiles(transaction, context, session); - if ( numReceived == 0 ) { + if (numReceived == 0) { context.yield(); } } @@ -215,24 +214,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } catch (final Throwable t) { final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString()); logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString()); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", t); } - + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); transaction.error(); session.rollback(); } } - @Override public String getYieldPeriod() { // delegate yield duration to remote process group return remoteGroup.getYieldDuration(); } - - + private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException { FlowFile flowFile = firstFlowFile; @@ -241,7 +238,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long startSendingNanos = System.nanoTime(); final StopWatch stopWatch = new StopWatch(true); long bytesSent = 0L; - + final Set<FlowFile> flowFilesSent = new HashSet<>(); boolean continueTransaction = true; while (continueTransaction) { @@ -255,79 +252,78 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { transaction.send(dataPacket); } }); - + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - + flowFilesSent.add(flowFile); bytesSent += flowFile.getSize(); logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl()); - + final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()); session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false); session.remove(flowFile); - + final long sendingNanos = System.nanoTime() - startSendingNanos; - if ( sendingNanos < BATCH_SEND_NANOS ) { + if (sendingNanos < BATCH_SEND_NANOS) { flowFile = session.get(); } else { flowFile = null; } - + continueTransaction = (flowFile != null); } - + transaction.confirm(); - + // consume input stream entirely, ignoring its contents. If we // don't do this, the Connection will not be returned to the pool stopWatch.stop(); final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesSent); - + session.commit(); transaction.complete(); - + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); - + return flowFilesSent.size(); } catch (final Exception e) { session.rollback(); throw e; } - } - + private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException { final String userDn = transaction.getCommunicant().getDistinguishedName(); - + final StopWatch stopWatch = new StopWatch(true); final Set<FlowFile> flowFilesReceived = new HashSet<>(); long bytesReceived = 0L; - + while (true) { final long start = System.nanoTime(); final DataPacket dataPacket = transaction.receive(); - if ( dataPacket == null ) { + if (dataPacket == null) { break; } - + FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); flowFile = session.importFrom(dataPacket.getData(), flowFile); final long receiveNanos = System.nanoTime() - start; - + String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); - if ( sourceFlowFileIdentifier == null ) { + if (sourceFlowFileIdentifier == null) { sourceFlowFileIdentifier = "<Unknown Identifier>"; } - + final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); session.transfer(flowFile, Relationship.ANONYMOUS); @@ -336,22 +332,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { // Confirm that what we received was the correct data. transaction.confirm(); - + // Commit the session so that we have persisted the data session.commit(); transaction.complete(); - if ( !flowFilesReceived.isEmpty() ) { + if (!flowFilesReceived.isEmpty()) { stopWatch.stop(); final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate }); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ + this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); } - + return flowFilesReceived.size(); } @@ -371,44 +367,44 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { ValidationResult error = null; if (!targetExists.get()) { error = new ValidationResult.Builder() - .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); - } else if ( getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty() ) { + .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); + } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { error = new ValidationResult.Builder() - .explanation(String.format("Port '%s' has no outbound connections", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Port '%s' has no outbound connections", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); } - - if ( error != null ) { + + if (error != null) { validationErrors.add(error); } - + return validationErrors; } - + @Override public void verifyCanStart() { super.verifyCanStart(); - - if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty() ) { + + if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) { throw new IllegalStateException("Port " + getName() + " has no incoming connections"); } } - + @Override public void setUseCompression(final boolean useCompression) { this.useCompression.set(useCompression); } - + @Override public boolean isUseCompression() { return useCompression.get(); } - + @Override public String toString() { return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]"; @@ -418,34 +414,32 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { public RemoteProcessGroup getRemoteProcessGroup() { return remoteGroup; } - + @Override public TransferDirection getTransferDirection() { return (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) ? TransferDirection.SEND : TransferDirection.RECEIVE; } - + public void setTargetExists(final boolean exists) { this.targetExists.set(exists); } - + @Override public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { super.removeConnection(connection); - - // If the Port no longer exists on the remote instance and this is the last Connection, tell + + // If the Port no longer exists on the remote instance and this is the last Connection, tell // RemoteProcessGroup to remove me - if ( !getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty() ) { + if (!getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty()) { remoteGroup.removeNonExistentPort(this); } } - - + @Override public SchedulingStrategy getSchedulingStrategy() { return SchedulingStrategy.TIMER_DRIVEN; } - - + @Override public boolean isSideEffectFree() { return false;
