http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java new file mode 100644 index 0000000..656573d --- /dev/null +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.security.util; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + +/** + * A factory for creating SSL contexts using the application's security + * properties. + * + */ +public final class SslContextFactory { + + public static enum ClientAuth { + + WANT, + REQUIRED, + NONE + } + + /** + * Creates a SSLContext instance using the given information. + * + * @param keystore the full path to the keystore + * @param keystorePasswd the keystore password + * @param keystoreType the type of keystore (e.g., PKCS12, JKS) + * @param truststore the full path to the truststore + * @param truststorePasswd the truststore password + * @param truststoreType the type of truststore (e.g., PKCS12, JKS) + * @param clientAuth the type of client authentication + * @param protocol the protocol to use for the SSL connection + * + * @return a SSLContext instance + * @throws java.security.KeyStoreException if any issues accessing the keystore + * @throws java.io.IOException for any problems loading the keystores + * @throws java.security.NoSuchAlgorithmException if an algorithm is found to be used but is unknown + * @throws java.security.cert.CertificateException if there is an issue with the certificate + * @throws java.security.UnrecoverableKeyException if the key is insufficient + * @throws java.security.KeyManagementException if unable to manage the key + */ + public static SSLContext createSslContext( + final String keystore, final char[] keystorePasswd, final String keystoreType, + final String truststore, final char[] truststorePasswd, final String truststoreType, + final ClientAuth clientAuth, final String protocol) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException, KeyManagementException { + + // prepare the keystore + final KeyStore keyStore = KeyStore.getInstance(keystoreType); + try (final InputStream keyStoreStream = new FileInputStream(keystore)) { + keyStore.load(keyStoreStream, keystorePasswd); + } + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, keystorePasswd); + + // prepare the truststore + final KeyStore trustStore = KeyStore.getInstance(truststoreType); + try (final InputStream trustStoreStream = new FileInputStream(truststore)) { + trustStore.load(trustStoreStream, truststorePasswd); + } + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + // initialize the ssl context + final SSLContext sslContext = SSLContext.getInstance(protocol); + sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom()); + if (ClientAuth.REQUIRED == clientAuth) { + sslContext.getDefaultSSLParameters().setNeedClientAuth(true); + } else if (ClientAuth.WANT == clientAuth) { + sslContext.getDefaultSSLParameters().setWantClientAuth(true); + } else { + sslContext.getDefaultSSLParameters().setWantClientAuth(false); + } + + return sslContext; + + } + + /** + * Creates a SSLContext instance using the given information. + * + * @param keystore the full path to the keystore + * @param keystorePasswd the keystore password + * @param keystoreType the type of keystore (e.g., PKCS12, JKS) + * @param protocol the protocol to use for the SSL connection + * + * @return a SSLContext instance + * @throws java.security.KeyStoreException if any issues accessing the keystore + * @throws java.io.IOException for any problems loading the keystores + * @throws java.security.NoSuchAlgorithmException if an algorithm is found to be used but is unknown + * @throws java.security.cert.CertificateException if there is an issue with the certificate + * @throws java.security.UnrecoverableKeyException if the key is insufficient + * @throws java.security.KeyManagementException if unable to manage the key + */ + public static SSLContext createSslContext( + final String keystore, final char[] keystorePasswd, final String keystoreType, final String protocol) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException, KeyManagementException { + + // prepare the keystore + final KeyStore keyStore = KeyStore.getInstance(keystoreType); + try (final InputStream keyStoreStream = new FileInputStream(keystore)) { + keyStore.load(keyStoreStream, keystorePasswd); + } + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, keystorePasswd); + + // initialize the ssl context + final SSLContext ctx = SSLContext.getInstance(protocol); + ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom()); + + return ctx; + + } + + /** + * Creates a SSLContext instance using the given information. + * + * @param truststore the full path to the truststore + * @param truststorePasswd the truststore password + * @param truststoreType the type of truststore (e.g., PKCS12, JKS) + * @param protocol the protocol to use for the SSL connection + * + * @return a SSLContext instance + * @throws java.security.KeyStoreException if any issues accessing the keystore + * @throws java.io.IOException for any problems loading the keystores + * @throws java.security.NoSuchAlgorithmException if an algorithm is found to be used but is unknown + * @throws java.security.cert.CertificateException if there is an issue with the certificate + * @throws java.security.UnrecoverableKeyException if the key is insufficient + * @throws java.security.KeyManagementException if unable to manage the key + */ + public static SSLContext createTrustSslContext( + final String truststore, final char[] truststorePasswd, final String truststoreType, final String protocol) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, + UnrecoverableKeyException, KeyManagementException { + + // prepare the truststore + final KeyStore trustStore = KeyStore.getInstance(truststoreType); + try (final InputStream trustStoreStream = new FileInputStream(truststore)) { + trustStore.load(trustStoreStream, truststorePasswd); + } + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + // initialize the ssl context + final SSLContext ctx = SSLContext.getInstance(protocol); + ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom()); + + return ctx; + + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml new file mode 100644 index 0000000..01b3aec --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-site-to-site-client</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-client-dto</artifactId> + <version>0.3.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java new file mode 100644 index 0000000..dacfd64 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public abstract class AbstractCommunicationsSession implements CommunicationsSession { + + private String userDn; + + private volatile String uri; + + public AbstractCommunicationsSession(final String uri) { + this.uri = uri; + } + + @Override + public String toString() { + return uri; + } + + @Override + public void setUri(final String uri) { + this.uri = uri; + } + + @Override + public String getUri() { + return uri; + } + + @Override + public String getUserDn() { + return userDn; + } + + @Override + public void setUserDn(final String dn) { + this.userDn = dn; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java new file mode 100644 index 0000000..17b990e --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +/** + * Represents the remote entity that the client is communicating with + */ +public interface Communicant { + + /** + * @return the NiFi site-to-site URL for the remote NiFi instance + */ + String getUrl(); + + /** + * @return The Host of the remote NiFi instance + */ + String getHost(); + + /** + * @return The Port that the remote NiFi instance is listening on for + * site-to-site communications + */ + int getPort(); + + /** + * @return The distinguished name that the remote NiFi instance has provided + * in its certificate if using secure communications, or <code>null</code> + * if the Distinguished Name is unknown + */ + String getDistinguishedName(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java new file mode 100644 index 0000000..5cb37b0 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public class Peer implements Communicant { + + private final PeerDescription description; + private final CommunicationsSession commsSession; + private final String url; + private final String clusterUrl; + private final String host; + private final int port; + + private final Map<String, Long> penaltyExpirationMap = new HashMap<>(); + private boolean closed = false; + + public Peer(final PeerDescription description, final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) { + this.description = description; + this.commsSession = commsSession; + this.url = peerUrl; + this.clusterUrl = clusterUrl; + + try { + final URI uri = new URI(peerUrl); + this.port = uri.getPort(); + this.host = uri.getHost(); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid URL: " + peerUrl); + } + } + + public PeerDescription getDescription() { + return description; + } + + @Override + public String getUrl() { + return url; + } + + public String getClusterUrl() { + return clusterUrl; + } + + public CommunicationsSession getCommunicationsSession() { + return commsSession; + } + + public void close() throws IOException { + this.closed = true; + + // Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer + try { + commsSession.getInput().consume(); + } finally { + commsSession.close(); + } + } + + /** + * Penalizes this peer for the given destination only for the provided + * number of milliseconds + * + * @param destinationId id of destination + * @param millis period of time to penalize peer + */ + public void penalize(final String destinationId, final long millis) { + final Long currentPenalty = penaltyExpirationMap.get(destinationId); + final long proposedPenalty = System.currentTimeMillis() + millis; + if (currentPenalty == null || proposedPenalty > currentPenalty) { + penaltyExpirationMap.put(destinationId, proposedPenalty); + } + } + + public boolean isPenalized(final String destinationId) { + final Long currentPenalty = penaltyExpirationMap.get(destinationId); + return (currentPenalty != null && currentPenalty > System.currentTimeMillis()); + } + + public boolean isClosed() { + return closed; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int hashCode() { + return 8320 + url.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof Peer)) { + return false; + } + + final Peer other = (Peer) obj; + return this.url.equals(other.url); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Peer[url=").append(url); + if (closed) { + sb.append(",CLOSED"); + } + sb.append("]"); + return sb.toString(); + } + + @Override + public int getPort() { + return port; + } + + @Override + public String getDistinguishedName() { + return commsSession.getUserDn(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java new file mode 100644 index 0000000..6fc90e4 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public class PeerDescription { + + private final String hostname; + private final int port; + private final boolean secure; + + public PeerDescription(final String hostname, final int port, final boolean secure) { + this.hostname = hostname; + this.port = port; + this.secure = secure; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public boolean isSecure() { + return secure; + } + + @Override + public String toString() { + return "PeerDescription[hostname=" + hostname + ", port=" + port + ", secure=" + secure + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hostname == null) ? 0 : hostname.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + final PeerDescription other = (PeerDescription) obj; + if (hostname == null) { + if (other.hostname != null) { + return false; + } + } else if (!hostname.equals(other.hostname)) { + return false; + } + + return port == other.port; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java new file mode 100644 index 0000000..6c8a4ec --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public class PeerStatus { + + private final PeerDescription description; + private final int numFlowFiles; + + public PeerStatus(final PeerDescription description, final int numFlowFiles) { + this.description = description; + this.numFlowFiles = numFlowFiles; + } + + public PeerDescription getPeerDescription() { + return description; + } + + public int getFlowFileCount() { + return numFlowFiles; + } + + @Override + public String toString() { + return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() + + ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]"; + } + + @Override + public int hashCode() { + return 9824372 + description.getHostname().hashCode() + description.getPort() * 41; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (!(obj instanceof PeerStatus)) { + return false; + } + + final PeerStatus other = (PeerStatus) obj; + return description.equals(other.getPeerDescription()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java new file mode 100644 index 0000000..582916e --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.nifi.remote.exception.HandshakeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RemoteResourceInitiator { + + public static final int RESOURCE_OK = 20; + public static final int DIFFERENT_RESOURCE_VERSION = 21; + public static final int ABORT = 255; + + private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class); + + public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) + throws IOException, HandshakeException { + // Write the classname of the RemoteStreamCodec, followed by its version + logger.debug("Negotiating resource; proposal is {}", resource); + dos.writeUTF(resource.getResourceName()); + final VersionNegotiator negotiator = resource.getVersionNegotiator(); + dos.writeInt(negotiator.getVersion()); + dos.flush(); + + // wait for response from server. + logger.debug("Receiving response from remote instance"); + final int statusCode = dis.read(); + switch (statusCode) { + case RESOURCE_OK: // server accepted our proposal of codec name/version + logger.debug("Response was RESOURCE_OK"); + return resource; + case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version + logger.debug("Response was DIFFERENT_RESOURCE_VERSION"); + // Get server's preferred version + final int newVersion = dis.readInt(); + + // Determine our new preferred version that is no greater than the server's preferred version. + final Integer newPreference = negotiator.getPreferredVersion(newVersion); + // If we could not agree with server on a version, fail now. + if (newPreference == null) { + throw new HandshakeException("Could not agree on version for " + resource); + } + + negotiator.setVersion(newPreference); + + // Attempt negotiation of resource based on our new preferred version. + return initiateResourceNegotiation(resource, dis, dos); + case ABORT: + logger.debug("Response was ABORT"); + throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); + default: + logger.debug("Response was {}; unable to negotiate codec", statusCode); + return null; // Unable to negotiate codec + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java new file mode 100644 index 0000000..bfa5c82 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.remote.protocol.DataPacket; + +/** + * <p> + * Provides a transaction for performing site-to-site data transfers. + * </p> + * + * <p> + * A Transaction is created by calling the + * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} + * method of a + * {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The + * resulting Transaction can be used to either send or receive data but not + * both. A new Transaction must be created in order perform the other operation. + * </p> + * + * <p> + * The general flow of execute of a Transaction is as follows: + * <ol> + * <li>Create the transaction as described above.</li> + * <li>Send data via the {@link #send(DataPacket)} method or receive data via + * the {@link #receive()} method. This method will be called 1 or more times. In + * the case of receive, this method should be called until the method returns + * {@code null}, signifying that the remote instance is finished sending data. + * <b>Note:</b> <code>receive()</code> should not be called a second time + * without first fully consuming the stream from the previous Packet that was + * received.</li> + * <li>Confirm the transaction via the {@link #confirm()} method.</li> + * <li>Either complete the transaction via the {@link #complete(boolean)} method + * or cancel the transaction via the {@link #cancel()} method.</li> + * </ol> + * </p> + * + * <p> + * It is important that the Transaction be terminated in order to free the + * resources held by the Transaction. If a Transaction is not terminated, its + * resources will not be freed and if the Transaction holds connections from a + * connection pool, the connections in that pool will eventually become + * exhausted. A Transaction is terminated by calling one of the following + * methods: + * <ul> + * <li>{@link #complete(boolean)}</li> + * <li>{@link #cancel()}</li> + * <li>{@link #error()}</li> + * </ul> + * </p> + * + * <p> + * If at any point an IOException is thrown from one of the methods of the + * Transaction, that Transaction is automatically closed via a call to + * {@link #error()}. + * </p> + * + * <p> + * The Transaction class should not be assumed to be thread-safe. + * </p> + */ +public interface Transaction { + + /** + * Sends information to the remote NiFi instance. + * + * @param dataPacket the data packet to send + * @throws IOException if unable to send + */ + void send(DataPacket dataPacket) throws IOException; + + /** + * Sends the given byte array as the content of a {@link DataPacket} along + * with the provided attributes + * + * @param content to send + * @param attributes of the content + * @throws IOException if unable to send + */ + void send(byte[] content, Map<String, String> attributes) throws IOException; + + /** + * Retrieves information from the remote NiFi instance, if any is available. + * If no data is available, will return {@code null}. It is important to + * consume all data from the remote NiFi instance before attempting to call + * {@link #confirm()}. This is because the sender is always responsible for + * determining when the Transaction has finished. This is done in order to + * prevent the need for a round-trip network request to receive data for + * each data packet. + * + * @return the DataPacket received, or {@code null} if there is no more data + * to receive. + * @throws IOException if unable to receive + */ + DataPacket receive() throws IOException; + + /** + * <p> + * Confirms the data that was sent or received by comparing CRC32's of the + * data sent and the data received. + * </p> + * + * <p> + * Even if the protocol being used to send the data is reliable and + * guarantees ordering of packets (such as TCP), it is still required that + * we confirm the transaction before completing the transaction. This is + * done as "safety net" or a defensive programming technique. Mistakes + * happen, and this mechanism helps to ensure that if a bug exists somewhere + * along the line that we do not end up sending or receiving corrupt data. + * If the CRC32 of the sender and the CRC32 of the receiver do not match, an + * IOException will be thrown and both the sender and receiver will cancel + * the transaction automatically. + * </p> + * + * <p> + * If the {@link TransferDirection} of this Transaction is RECEIVE, this + * method will throw an Exception unless all data from the remote instance + * has been consumed (i.e., a call to {@link #receive()} returns + * {@code null}). + * </p> + * + * <p> + * If the {@link TransferDirection} of this Transaction is SEND, calling + * this method dictates that no more data will be sent in this transaction. + * I.e., there will be no more calls to {@link #send(DataPacket)}. + * </p> + * + * @throws IOException if unable to confirm transaction + */ + void confirm() throws IOException; + + /** + * <p> + * Completes the transaction and indicates to both the sender and receiver + * that the data transfer was successful. + * </p> + * + * @throws IOException if unable to complete + * + * @return a TransactionCompletion that contains details about the + * Transaction + */ + TransactionCompletion complete() throws IOException; + + /** + * <p> + * Cancels this transaction, indicating to the sender that the data has not + * been successfully received so that the sender can retry or handle however + * is appropriate. + * </p> + * + * @param explanation an explanation to tell the other party why the + * transaction was canceled. + * @throws IOException if unable to cancel + */ + void cancel(final String explanation) throws IOException; + + /** + * <p> + * Sets the TransactionState of the Transaction to + * {@link TransactionState#ERROR}, and closes the Transaction. The + * underlying connection should not be returned to a connection pool in this + * case. + * </p> + */ + void error(); + + /** + * @return the current state of the Transaction. + * @throws IOException ioe + */ + TransactionState getState() throws IOException; + + /** + * @return a Communicant that represents the other side of this Transaction + * (i.e., the remote NiFi instance) + */ + Communicant getCommunicant(); + + public enum TransactionState { + + /** + * Transaction has been started but no data has been sent or received. + */ + TRANSACTION_STARTED, + /** + * Transaction has been started and data has been sent or received. + */ + DATA_EXCHANGED, + /** + * Data that has been transferred has been confirmed via its CRC. + * Transaction is ready to be completed. + */ + TRANSACTION_CONFIRMED, + /** + * Transaction has been successfully completed. + */ + TRANSACTION_COMPLETED, + /** + * The Transaction has been canceled. + */ + TRANSACTION_CANCELED, + /** + * The Transaction ended in an error. + */ + ERROR; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java new file mode 100644 index 0000000..1587e87 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.remote.protocol.DataPacket; + +/** + * A TransactionCompletion provides information about a {@link Transaction} that + * has completed successfully. + */ +public interface TransactionCompletion { + + /** + * When a sending to a NiFi instance, the server may accept the content sent + * to it but indicate that its queues are full and that the client should + * backoff sending data for a bit. + * + * @return <code>true</code> if the server did in fact request that, + * <code>false</code> otherwise + */ + boolean isBackoff(); + + /** + * @return the number of Data Packets that were sent to or received from the + * remote NiFi instance in the Transaction + */ + int getDataPacketsTransferred(); + + /** + * @return the number of bytes of DataPacket content that were sent to or + * received from the remote NiFI instance in the Transaction. Note that this + * is different than the number of bytes actually transferred between the + * client and server, as it does not take into account the attributes or + * protocol-specific information that is exchanged but rather takes into + * account only the data in the {@link InputStream} of the + * {@link DataPacket} + */ + long getBytesTransferred(); + + /** + * @param timeUnit unit of time for which to report the duration + * @return the amount of time that the Transaction took, from the time that + * the Transaction was created to the time that the Transaction was + * completed + */ + long getDuration(TimeUnit timeUnit); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java new file mode 100644 index 0000000..979ad9c --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +/** + * An enumeration for specifying the direction in which data should be + * transferred between a client and a remote NiFi instance. + */ +public enum TransferDirection { + + /** + * The client is to send data to the remote instance. + */ + SEND, + /** + * The client is to receive data from the remote instance. + */ + RECEIVE; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java new file mode 100644 index 0000000..bfccd98 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public interface VersionedRemoteResource { + + VersionNegotiator getVersionNegotiator(); + + String getResourceName(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java new file mode 100644 index 0000000..63c3d63 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.Serializable; + +public enum KeystoreType implements Serializable { + PKCS12, + JKS; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java new file mode 100644 index 0000000..78237b9 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.socket.SocketClient; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.protocol.DataPacket; + +/** + * <p> + * The SiteToSiteClient provides a mechanism for sending data to a remote + * instance of NiFi (or NiFi cluster) and retrieving data from a remote instance + * of NiFi (or NiFi cluster). + * </p> + * + * <p> + * When configuring the client via the {@link SiteToSiteClient.Builder}, the + * Builder must be provided the URL of the remote NiFi instance. If the URL + * points to a standalone instance of NiFi, all interaction will take place with + * that instance of NiFi. However, if the URL points to the NiFi Cluster Manager + * of a cluster, the client will automatically handle load balancing the + * transactions across the different nodes in the cluster. + * </p> + * + * <p> + * The SiteToSiteClient provides a {@link Transaction} through which all + * interaction with the remote instance takes place. After data has been + * exchanged or it is determined that no data is available, the Transaction can + * then be canceled (via the {@link Transaction#cancel(String)} method) or can + * be completed (via the {@link Transaction#complete(boolean)} method). + * </p> + * + * <p> + * An instance of SiteToSiteClient can be obtained by constructing a new + * instance of the {@link SiteToSiteClient.Builder} class, calling the + * appropriate methods to configured the client as desired, and then calling the + * {@link SiteToSiteClient.Builder#build() build()} method. + * </p> + * + * <p> + * The SiteToSiteClient itself is immutable once constructed and is thread-safe. + * Many threads can share access to the same client. However, the + * {@link Transaction} that is created by the client is not thread safe and + * should not be shared among threads. + * </p> + */ +public interface SiteToSiteClient extends Closeable { + + /** + * <p> + * Creates a new Transaction that can be used to either send data to a + * remote NiFi instance or receive data from a remote NiFi instance, + * depending on the value passed for the {@code direction} argument. + * </p> + * + * <p> + * <b>Note:</b> If all of the nodes are penalized (See + * {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then this method + * will return <code>null</code>. + * </p> + * + * @param direction specifies which direction the data should be + * transferred. A value of {@link TransferDirection#SEND} indicates that + * this Transaction will send data to the remote instance; a value of + * {@link TransferDirection#RECEIVE} indicates that this Transaction will be + * used to receive data from the remote instance. + * + * @return a Transaction to use for sending or receiving data, or + * <code>null</code> if all nodes are penalized. + * @throws org.apache.nifi.remote.exception.HandshakeException he + * @throws org.apache.nifi.remote.exception.PortNotRunningException pnre + * @throws IOException ioe + * @throws org.apache.nifi.remote.exception.UnknownPortException upe + */ + Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException; + + /** + * <p> + * In order to determine whether the server is configured for secure + * communications, the client may have to query the server's RESTful + * interface. Doing so could result in an IOException. + * </p> + * + * @return {@code true} if site-to-site communications with the remote + * instance are secure, {@code false} if site-to-site communications with + * the remote instance are not secure. Whether or not communications are + * secure depends on the server, not the client + * @throws IOException if unable to query the remote instance's RESTful + * interface or if the remote instance is not configured to allow + * site-to-site communications + */ + boolean isSecure() throws IOException; + + /** + * + * @return the configuration object that was built by the Builder + */ + SiteToSiteClientConfig getConfig(); + + /** + * <p> + * The Builder is the mechanism by which all configuration is passed to the + * SiteToSiteClient. Once constructed, the SiteToSiteClient cannot be + * reconfigured (i.e., it is immutable). If a change in configuration should + * be desired, the client should be {@link Closeable#close() closed} and a + * new client created. + * </p> + */ + public static class Builder implements Serializable { + + private static final long serialVersionUID = -4954962284343090219L; + + private String url; + private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); + private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); + private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private SSLContext sslContext; + private String keystoreFilename; + private String keystorePass; + private KeystoreType keystoreType; + private String truststoreFilename; + private String truststorePass; + private KeystoreType truststoreType; + private EventReporter eventReporter; + private File peerPersistenceFile; + private boolean useCompression; + private String portName; + private String portIdentifier; + private int batchCount; + private long batchSize; + private long batchNanos; + + /** + * Populates the builder with values from the provided config + * + * @param config to start with + * @return the builder + */ + public Builder fromConfig(final SiteToSiteClientConfig config) { + this.url = config.getUrl(); + this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); + this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.sslContext = config.getSslContext(); + this.keystoreFilename = config.getKeystoreFilename(); + this.keystorePass = config.getKeystorePassword(); + this.keystoreType = config.getKeystoreType(); + this.truststoreFilename = config.getTruststoreFilename(); + this.truststorePass = config.getTruststorePassword(); + this.truststoreType = config.getTruststoreType(); + this.eventReporter = config.getEventReporter(); + this.peerPersistenceFile = config.getPeerPersistenceFile(); + this.useCompression = config.isUseCompression(); + this.portName = config.getPortName(); + this.portIdentifier = config.getPortIdentifier(); + this.batchCount = config.getPreferredBatchCount(); + this.batchSize = config.getPreferredBatchSize(); + this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + + return this; + } + + /** + * Specifies the URL of the remote NiFi instance. If this URL points to + * the Cluster Manager of a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes. + * + * @param url url of remote instance + * @return the builder + */ + public Builder url(final String url) { + this.url = url; + return this; + } + + /** + * Specifies the communications timeouts to use when interacting with + * the remote instances. The default value is 30 seconds. + * + * @param timeout to use when interacting with remote instances + * @param unit unit of time over which to interpret the given timeout + * @return the builder + */ + public Builder timeout(final long timeout, final TimeUnit unit) { + this.timeoutNanos = unit.toNanos(timeout); + return this; + } + + /** + * Specifies the amount of time that a connection can remain idle in the + * connection pool before it is "expired" and shutdown. The default + * value is 30 seconds. + * + * @param timeout to use when interacting with remote instances + * @param unit unit of time over which to interpret the given timeout + * @return the builder + */ + public Builder idleExpiration(final long timeout, final TimeUnit unit) { + this.idleExpirationNanos = unit.toNanos(timeout); + return this; + } + + /** + * If there is a problem communicating with a node (i.e., any node in + * the remote NiFi cluster or the remote instance of NiFi if it is + * standalone), specifies how long the client should wait before + * attempting to communicate with that node again. While a particular + * node is penalized, all other nodes in the remote cluster (if any) + * will still be available for communication. The default value is 3 + * seconds. + * + * @param period time to wait between communication attempts + * @param unit over which to evaluate the given period + * @return the builder + */ + public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { + this.penalizationNanos = unit.toNanos(period); + return this; + } + + /** + * Specifies the SSL Context to use when communicating with the remote + * NiFi instance(s). If not specified, communications will not be + * secure. The remote instance of NiFi always determines whether or not + * Site-to-Site communications are secure (i.e., the client will always + * use secure or non-secure communications, depending on what the server + * dictates). <b>Note:</b> The SSLContext provided by this method will be + * ignored if using a Serializable Configuration (see {@link #buildSerializableConfig()}). + * If a Serializable Configuration is required and communications are to be + * secure, the {@link #keystoreFilename(String)}, {@link #keystorePass(String)}, + * {@link #keystoreType}, {@link #truststoreFilename}, {@link #truststorePass(String)}, + * and {@link #truststoreType(KeystoreType)} methods must be used instead. + * + * @param sslContext the context + * @return the builder + */ + public Builder sslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + return this; + } + + /** + * @return the filename to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + */ + public String getKeystoreFilename() { + return keystoreFilename; + } + + /** + * Sets the filename to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + * + * @param keystoreFilename the filename to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + * @return the builder + */ + public Builder keystoreFilename(final String keystoreFilename) { + this.keystoreFilename = keystoreFilename; + return this; + } + + /** + * @return the password to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + */ + public String getKeystorePass() { + return keystorePass; + } + + /** + * Sets the password to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + * + * @param keystorePass the password to use for the Keystore in order to communicate securely + * with the remote instance of NiFi + * @return the builder + */ + public Builder keystorePass(final String keystorePass) { + this.keystorePass = keystorePass; + return this; + } + + /** + * @return the type of Keystore to use in order to communicate securely + * with the remote instance of NiFi + */ + public KeystoreType getKeystoreType() { + return keystoreType; + } + + /** + * Sets the type of the Keystore to use in order to communicate securely + * with the remote instance of NiFi + * + * @param keystoreType the type of the Keystore to use in order to communicate securely + * with the remote instance of NiFi + * @return the builder + */ + public Builder keystoreType(final KeystoreType keystoreType) { + this.keystoreType = keystoreType; + return this; + } + + /** + * @return the filename to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + */ + public String getTruststoreFilename() { + return truststoreFilename; + } + + /** + * Sets the filename to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + * + * @param truststoreFilename the filename to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + * @return the builder + */ + public Builder truststoreFilename(final String truststoreFilename) { + this.truststoreFilename = truststoreFilename; + return this; + } + + /** + * @return the password to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + */ + public String getTruststorePass() { + return truststorePass; + } + + /** + * Sets the password to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + * + * @param truststorePass the filename to use for the Truststore in order to communicate securely + * with the remote instance of NiFi + */ + public Builder truststorePass(final String truststorePass) { + this.truststorePass = truststorePass; + return this; + } + + /** + * @return the type of the Truststore to use in order to communicate securely + * with the remote instance of NiFi + */ + public KeystoreType getTruststoreType() { + return truststoreType; + } + + /** + * Sets the password type of the Truststore to use in order to communicate securely + * with the remote instance of NiFi + * + * @param truststoreType the type of the Truststore to use in order to communicate securely + * with the remote instance of NiFi + * @return the builder + */ + public Builder truststoreType(final KeystoreType truststoreType) { + this.truststoreType = truststoreType; + return this; + } + + /** + * Provides an EventReporter that can be used by the client in order to + * report any events that could be of interest when communicating with + * the remote instance. The EventReporter provided must be threadsafe. + * + * @param eventReporter reporter + * @return the builder + */ + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + + /** + * Specifies a file that the client can write to in order to persist the + * list of nodes in the remote cluster and recover the list of nodes + * upon restart. This allows the client to function if the remote + * Cluster Manager is unavailable, even after a restart of the client + * software. If not specified, the list of nodes will not be persisted + * and a failure of the Cluster Manager will result in not being able to + * communicate with the remote instance if a new client is created. + * + * @param peerPersistenceFile file + * @return the builder + */ + public Builder peerPersistenceFile(final File peerPersistenceFile) { + this.peerPersistenceFile = peerPersistenceFile; + return this; + } + + /** + * Specifies whether or not data should be compressed before being + * transferred to or from the remote instance. + * + * @param compress true if should compress + * @return the builder + */ + public Builder useCompression(final boolean compress) { + this.useCompression = compress; + return this; + } + + /** + * Specifies the name of the port to communicate with. Either the port + * name or the port identifier must be specified. + * + * @param portName name of port + * @return the builder + */ + public Builder portName(final String portName) { + this.portName = portName; + return this; + } + + /** + * Specifies the unique identifier of the port to communicate with. If + * it is known, this is preferred over providing the port name, as the + * port name may change. + * + * @param portIdentifier identifier of port + * @return the builder + */ + public Builder portIdentifier(final String portIdentifier) { + this.portIdentifier = portIdentifier; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * number of {@link DataPacket}s to include in a Transaction. + * + * @param count client preferred batch size + * @return the builder + */ + public Builder requestBatchCount(final int count) { + this.batchCount = count; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * number of bytes to include in a Transaction. + * + * @param bytes client preferred batch size + * @return the builder + */ + public Builder requestBatchSize(final long bytes) { + this.batchSize = bytes; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * amount of time that a Transaction should span. + * + * @param value client preferred batch duration + * @param unit client preferred batch duration unit + * @return the builder + */ + public Builder requestBatchDuration(final long value, final TimeUnit unit) { + this.batchNanos = unit.toNanos(value); + return this; + } + + /** + * @return a {@link SiteToSiteClientConfig} for the configured values + * but does not create a SiteToSiteClient + */ + public SiteToSiteClientConfig buildConfig() { + return new SiteToSiteClientConfig() { + private static final long serialVersionUID = 1L; + + @Override + public boolean isUseCompression() { + return Builder.this.isUseCompression(); + } + + @Override + public String getUrl() { + return Builder.this.getUrl(); + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return Builder.this.getTimeout(timeUnit); + } + + @Override + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return Builder.this.getIdleConnectionExpiration(timeUnit); + } + + @Override + public SSLContext getSslContext() { + return Builder.this.getSslContext(); + } + + @Override + public String getPortName() { + return Builder.this.getPortName(); + } + + @Override + public String getPortIdentifier() { + return Builder.this.getPortIdentifier(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return Builder.this.getPenalizationPeriod(timeUnit); + } + + @Override + public File getPeerPersistenceFile() { + return Builder.this.getPeerPersistenceFile(); + } + + @Override + public EventReporter getEventReporter() { + return Builder.this.getEventReporter(); + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return batchSize; + } + + @Override + public int getPreferredBatchCount() { + return batchCount; + } + + @Override + public String getKeystoreFilename() { + return keystoreFilename; + } + + @Override + public String getKeystorePassword() { + return keystorePass; + } + + @Override + public KeystoreType getKeystoreType() { + return keystoreType; + } + + @Override + public String getTruststoreFilename() { + return truststoreFilename; + } + + @Override + public String getTruststorePassword() { + return truststorePass; + } + + @Override + public KeystoreType getTruststoreType() { + return truststoreType; + } + }; + } + + /** + * @return a new SiteToSiteClient that can be used to send and receive + * data with remote instances of NiFi + * + * @throws IllegalStateException if either the url is not set or neither + * the port name nor port identifier is set. + */ + public SiteToSiteClient build() { + if (url == null) { + throw new IllegalStateException("Must specify URL to build Site-to-Site client"); + } + + if (portName == null && portIdentifier == null) { + throw new IllegalStateException("Must specify either Port Name or Port Identifier to build Site-to-Site client"); + } + + return new SocketClient(buildConfig()); + } + + /** + * @return the configured URL for the remote NiFi instance + */ + public String getUrl() { + return url; + } + + /** + * @param timeUnit unit over which to interpret the timeout + * @return the communications timeout + */ + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + /** + * @param timeUnit unit over which to interpret the time + * @return the amount of of time that a connection can remain idle in + * the connection pool before being shutdown + */ + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); + } + + /** + * @param timeUnit unit of reported time + * @return the amount of time that a particular node will be ignored + * after a communications error with that node occurs + */ + public long getPenalizationPeriod(TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + /** + * @return the SSL Context that is configured for this builder + */ + public SSLContext getSslContext() { + if (sslContext != null) { + return sslContext; + } + + final KeyManagerFactory keyManagerFactory; + if (keystoreFilename != null && keystorePass != null && keystoreType != null) { + try { + // prepare the keystore + final KeyStore keyStore = KeyStore.getInstance(getKeystoreType().name()); + try (final InputStream keyStoreStream = new FileInputStream(new File(getKeystoreFilename()))) { + keyStore.load(keyStoreStream, getKeystorePass().toCharArray()); + } + keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, getKeystorePass().toCharArray()); + } catch (final Exception e) { + throw new RuntimeException("Failed to load Keystore", e); + } + } else { + keyManagerFactory = null; + } + + final TrustManagerFactory trustManagerFactory; + if (truststoreFilename != null && truststorePass != null && truststoreType != null) { + try { + // prepare the truststore + final KeyStore trustStore = KeyStore.getInstance(getTruststoreType().name()); + try (final InputStream trustStoreStream = new FileInputStream(new File(getTruststoreFilename()))) { + trustStore.load(trustStoreStream, getTruststorePass().toCharArray()); + } + trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + } catch (final Exception e) { + throw new RuntimeException("Failed to load Truststore", e); + } + } else { + trustManagerFactory = null; + } + + if (keyManagerFactory != null || trustManagerFactory != null) { + try { + // initialize the ssl context + final SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom()); + sslContext.getDefaultSSLParameters().setNeedClientAuth(true); + + return sslContext; + } catch (final Exception e) { + throw new RuntimeException("Created keystore and truststore but failed to initialize SSLContext"); + } + } else { + return null; + } + } + + /** + * @return the EventReporter that is to be used by clients to report + * events + */ + public EventReporter getEventReporter() { + return eventReporter; + } + + /** + * @return the file that is to be used for persisting the nodes of a + * remote cluster, if any + */ + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + /** + * @return a boolean indicating whether or not compression will be used + * to transfer data to and from the remote instance + */ + public boolean isUseCompression() { + return useCompression; + } + + /** + * @return the name of the port that the client is to communicate with + */ + public String getPortName() { + return portName; + } + + /** + * @return the identifier of the port that the client is to communicate + * with + */ + public String getPortIdentifier() { + return portIdentifier; + } + } + + + public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable { + private static final long serialVersionUID = 1L; + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java new file mode 100644 index 0000000..50a0d3c --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.File; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.protocol.DataPacket; + +public interface SiteToSiteClientConfig extends Serializable { + + /** + * @return the configured URL for the remote NiFi instance + */ + String getUrl(); + + /** + * @param timeUnit unit over which to report the timeout + * @return the communications timeout in given unit + */ + long getTimeout(final TimeUnit timeUnit); + + /** + * @param timeUnit the unit for which to report the time + * @return the amount of time that a connection can remain idle before it is + * "expired" and shut down + */ + long getIdleConnectionExpiration(TimeUnit timeUnit); + + /** + * @param timeUnit unit over which to report the time + * @return the amount of time that a particular node will be ignored after a + * communications error with that node occurs + */ + long getPenalizationPeriod(TimeUnit timeUnit); + + /** + * @return the SSL Context that is configured for this builder + */ + SSLContext getSslContext(); + + /** + * @return the filename to use for the keystore, or <code>null</code> if none is configured + */ + String getKeystoreFilename(); + + /** + * @return the password to use for the keystore, or <code>null</code> if none is configured + */ + String getKeystorePassword(); + + /** + * @return the Type of the keystore, or <code>null</code> if none is configured + */ + KeystoreType getKeystoreType(); + + /** + * @return the filename to use for the truststore, or <code>null</code> if none is configured + */ + String getTruststoreFilename(); + + /** + * @return the password to use for the truststore, or <code>null</code> if none is configured + */ + String getTruststorePassword(); + + /** + * @return the type of the truststore, or <code>null</code> if none is configured + */ + KeystoreType getTruststoreType(); + + /** + * @return the file that is to be used for persisting the nodes of a remote + * cluster, if any + */ + File getPeerPersistenceFile(); + + /** + * @return a boolean indicating whether or not compression will be used to + * transfer data to and from the remote instance + */ + boolean isUseCompression(); + + /** + * @return the name of the port that the client is to communicate with + */ + String getPortName(); + + /** + * @return the identifier of the port that the client is to communicate with + */ + String getPortIdentifier(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @param timeUnit unit of time over which to report the duration + * @return the maximum amount of time that we will request a NiFi instance + * to send data to us in a Transaction + */ + long getPreferredBatchDuration(TimeUnit timeUnit); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @return returns the maximum number of bytes that we will request a NiFi + * instance to send data to us in a Transaction + */ + long getPreferredBatchSize(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @return the maximum number of {@link DataPacket}s that we will request a + * NiFi instance to send data to us in a Transaction + */ + int getPreferredBatchCount(); + + /** + * @return the EventReporter that is to be used by clients to report events + */ + EventReporter getEventReporter(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java new file mode 100644 index 0000000..1a16b02 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; + +public class EndpointConnection { + + private final Peer peer; + private final SocketClientProtocol socketClientProtocol; + private final FlowFileCodec codec; + private volatile long lastUsed; + + public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { + this.peer = peer; + this.socketClientProtocol = socketClientProtocol; + this.codec = codec; + } + + public FlowFileCodec getCodec() { + return codec; + } + + public SocketClientProtocol getSocketClientProtocol() { + return socketClientProtocol; + } + + public Peer getPeer() { + return peer; + } + + public void setLastTimeUsed() { + lastUsed = System.currentTimeMillis(); + } + + public long getLastTimeUsed() { + return lastUsed; + } +}