Repository: incubator-nifi Updated Branches: refs/heads/nifi-site-to-site-client 8f0402fbb -> e16fc7972
NIFI-282: Refactored to remove Jersey client from dependencies; made site-to-site config serializable; allowed SiteToSiteClient.Builder to build a SiteToSiteClientConfig without building the client itself. Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e16fc797 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e16fc797 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e16fc797 Branch: refs/heads/nifi-site-to-site-client Commit: e16fc7972c24a04d8212e26f66fdcb6e940ffe86 Parents: 8f0402f Author: Mark Payne <[email protected]> Authored: Mon Feb 16 14:18:24 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Feb 16 14:18:24 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/remote/RemoteDestination.java | 8 +- .../nifi-site-to-site-client/pom.xml | 80 +++---- .../nifi/remote/client/SiteToSiteClient.java | 156 ++++++++------ .../remote/client/SiteToSiteClientConfig.java | 3 +- .../client/socket/EndpointConnectionPool.java | 83 +++++-- .../nifi/remote/client/socket/SocketClient.java | 10 +- .../nifi/remote/util/NiFiRestApiUtil.java | 98 +++++++++ .../nifi/remote/util/RemoteNiFiUtils.java | 216 ------------------- .../client/socket/TestSiteToSiteClient.java | 17 +- .../org/apache/nifi/remote/RemoteNiFiUtils.java | 216 +++++++++++++++++++ .../nifi/remote/StandardRemoteProcessGroup.java | 1 - 11 files changed, 533 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java index f718581..508ab37 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java @@ -29,7 +29,13 @@ public interface RemoteDestination { * @return */ String getIdentifier(); - + + /** + * Returns the human-readable name of the remote destination + * @return + */ + String getName(); + /** * Returns the amount of time that system should pause sending to a particular node if unable to * send data to or receive data from this endpoint http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml index 3fc00a2..0d21a3d 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml @@ -1,43 +1,45 @@ <?xml version="1.0"?> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons</artifactId> - <version>0.0.2-incubating-SNAPSHOT</version> - </parent> - - <artifactId>nifi-site-to-site-client</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <parent> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-client-dto</artifactId> + <artifactId>nifi-commons</artifactId> <version>0.0.2-incubating-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-web-utils</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> + </parent> + + <artifactId>nifi-site-to-site-client</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <!-- <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-web-utils</artifactId> + </dependency> --> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-client-dto</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 0591b5a..5f84382 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -19,6 +19,7 @@ package org.apache.nifi.remote.client; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -122,8 +123,10 @@ public interface SiteToSiteClient extends Closeable { * and a new client created. * </p> */ - public static class Builder { - private String url; + public static class Builder implements Serializable { + private static final long serialVersionUID = -4954962284343090219L; + + private String url; private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); @@ -309,10 +312,89 @@ public interface SiteToSiteClient extends Closeable { return this; } + /** + * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient + * @return + */ + public SiteToSiteClientConfig buildConfig() { + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + private static final long serialVersionUID = 1323119754841633818L; + + @Override + public boolean isUseCompression() { + return Builder.this.isUseCompression(); + } + + @Override + public String getUrl() { + return Builder.this.getUrl(); + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return Builder.this.getTimeout(timeUnit); + } + + @Override + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return Builder.this.getIdleConnectionExpiration(timeUnit); + } + + @Override + public SSLContext getSslContext() { + return Builder.this.getSslContext(); + } + + @Override + public String getPortName() { + return Builder.this.getPortName(); + } + + @Override + public String getPortIdentifier() { + return Builder.this.getPortIdentifier(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return Builder.this.getPenalizationPeriod(timeUnit); + } + + @Override + public File getPeerPersistenceFile() { + return Builder.this.getPeerPersistenceFile(); + } + + @Override + public EventReporter getEventReporter() { + return Builder.this.getEventReporter(); + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return Builder.this.batchSize; + } + + @Override + public int getPreferredBatchCount() { + return Builder.this.batchCount; + } + }; + + return config; + } /** * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi * @return + * + * @throws IllegalStateException if either the url is not set or neither the port name nor port identifier + * is set. */ public SiteToSiteClient build() { if ( url == null ) { @@ -323,75 +405,7 @@ public interface SiteToSiteClient extends Closeable { throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); } - final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { - - @Override - public boolean isUseCompression() { - return Builder.this.isUseCompression(); - } - - @Override - public String getUrl() { - return Builder.this.getUrl(); - } - - @Override - public long getTimeout(final TimeUnit timeUnit) { - return Builder.this.getTimeout(timeUnit); - } - - @Override - public long getIdleConnectionExpiration(final TimeUnit timeUnit) { - return Builder.this.getIdleConnectionExpiration(timeUnit); - } - - @Override - public SSLContext getSslContext() { - return Builder.this.getSslContext(); - } - - @Override - public String getPortName() { - return Builder.this.getPortName(); - } - - @Override - public String getPortIdentifier() { - return Builder.this.getPortIdentifier(); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return Builder.this.getPenalizationPeriod(timeUnit); - } - - @Override - public File getPeerPersistenceFile() { - return Builder.this.getPeerPersistenceFile(); - } - - @Override - public EventReporter getEventReporter() { - return Builder.this.getEventReporter(); - } - - @Override - public long getPreferredBatchDuration(final TimeUnit timeUnit) { - return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); - } - - @Override - public long getPreferredBatchSize() { - return Builder.this.batchSize; - } - - @Override - public int getPreferredBatchCount() { - return Builder.this.batchCount; - } - }; - - return new SocketClient(config); + return new SocketClient(buildConfig()); } /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index d03ab3c..5e7fbe8 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote.client; import java.io.File; +import java.io.Serializable; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -24,7 +25,7 @@ import javax.net.ssl.SSLContext; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.protocol.DataPacket; -public interface SiteToSiteClientConfig { +public interface SiteToSiteClientConfig extends Serializable { /** * Returns the configured URL for the remote NiFi instance http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index c0e4761..f9a8a38 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -79,8 +79,8 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.util.NiFiRestApiUtil; import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.remote.util.RemoteNiFiUtils; import org.apache.nifi.reporting.Severity; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.web.api.dto.ControllerDTO; @@ -201,6 +201,17 @@ public class EndpointConnectionPool { }, 5, 5, TimeUnit.SECONDS); } + private String getPortIdentifier(final TransferDirection transferDirection) throws IOException { + if ( remoteDestination.getIdentifier() != null ) { + return remoteDestination.getIdentifier(); + } + + if ( transferDirection == TransferDirection.RECEIVE ) { + return getOutputPortIdentifier(remoteDestination.getName()); + } else { + return getInputPortIdentifier(remoteDestination.getName()); + } + } public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { return getEndpointConnection(direction, null); @@ -222,14 +233,15 @@ public class EndpointConnectionPool { do { connection = connectionQueue.poll(); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); + final String portId = getPortIdentifier(direction); if ( connection == null && !addBack.isEmpty() ) { // all available connections have been penalized. - logger.debug("{} all Connections for {} are penalized; returning no Connection", this, remoteDestination.getIdentifier()); + logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId); return null; } - if ( connection != null && connection.getPeer().isPenalized(remoteDestination.getIdentifier()) ) { + if ( connection != null && connection.getPeer().isPenalized(portId) ) { // we have a connection, but it's penalized. We want to add it back to the queue // when we've found one to use. addBack.add(connection); @@ -238,9 +250,9 @@ public class EndpointConnectionPool { // if we can't get an existing Connection, create one if ( connection == null ) { - logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier()); + logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId); protocol = new SocketClientProtocol(); - protocol.setDestination(remoteDestination); + protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId)); logger.debug("{} getting next peer status", this); final PeerStatus peerStatus = getNextPeerStatus(direction); @@ -249,11 +261,12 @@ public class EndpointConnectionPool { return null; } + final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS); try { logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus); commsSession = establishSiteToSiteConnection(peerStatus); } catch (final IOException ioe) { - penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + penalize(peerStatus, penalizationMillis); throw ioe; } @@ -289,17 +302,17 @@ public class EndpointConnectionPool { // handle error cases if ( protocol.isDestinationFull() ) { logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + penalize(peer, penalizationMillis); connectionQueue.offer(connection); continue; } else if ( protocol.isPortInvalid() ) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + penalize(peer, penalizationMillis); cleanup(protocol, peer); - throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running"); + throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running"); } else if ( protocol.isPortUnknown() ) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + penalize(peer, penalizationMillis); cleanup(protocol, peer); - throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known"); + throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known"); } // negotiate the FlowFileCodec to use @@ -309,7 +322,7 @@ public class EndpointConnectionPool { } catch (final PortNotRunningException | UnknownPortException e) { throw e; } catch (final Exception e) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + penalize(peer, penalizationMillis); cleanup(protocol, peer); final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); @@ -539,7 +552,16 @@ public class EndpointConnectionPool { clientProtocol.setTimeout(commsTimeout); if (clientProtocol.getVersionNegotiator().getVersion() < 5) { - clientProtocol.handshake(peer, remoteDestination.getIdentifier()); + String portId = getPortIdentifier(TransferDirection.RECEIVE); + if ( portId == null ) { + portId = getPortIdentifier(TransferDirection.SEND); + } + + if ( portId == null ) { + peer.close(); + throw new IOException("Failed to determine the identifier of port " + remoteDestination.getName()); + } + clientProtocol.handshake(peer, portId); } else { clientProtocol.handshake(peer, null); } @@ -818,8 +840,8 @@ public class EndpointConnectionPool { private ControllerDTO refreshRemoteInfo() throws IOException { final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); - final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null); - final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout); + final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null); + final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout); remoteInfoWriteLock.lock(); try { @@ -898,4 +920,35 @@ public class EndpointConnectionPool { return isSecure; } + + + private class IdEnrichedRemoteDestination implements RemoteDestination { + private final RemoteDestination original; + private final String identifier; + + public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) { + this.original = original; + this.identifier = identifier; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getName() { + return original.getName(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return original.getYieldPeriod(timeUnit); + } + + @Override + public boolean isUseCompression() { + return original.isUseCompression(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 016e67f..c11c2ab 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -43,7 +43,8 @@ public class SocketClient implements SiteToSiteClient { private volatile boolean closed = false; public SocketClient(final SiteToSiteClientConfig config) { - pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()), + pool = new EndpointConnectionPool(config.getUrl(), + createRemoteDestination(config.getPortIdentifier(), config.getPortName()), (int) config.getTimeout(TimeUnit.MILLISECONDS), (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); @@ -88,12 +89,17 @@ public class SocketClient implements SiteToSiteClient { } - private RemoteDestination createRemoteDestination(final String portId) { + private RemoteDestination createRemoteDestination(final String portId, final String portName) { return new RemoteDestination() { @Override public String getIdentifier() { return portId; } + + @Override + public String getName() { + return portName; + } @Override public long getYieldPeriod(final TimeUnit timeUnit) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java new file mode 100644 index 0000000..10352ec --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +public class NiFiRestApiUtil { + public static final int RESPONSE_CODE_OK = 200; + + private final SSLContext sslContext; + + public NiFiRestApiUtil(final SSLContext sslContext) { + this.sslContext = sslContext; + } + + private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException { + final URL url = new URL(connUrl); + final HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(timeoutMillis); + connection.setReadTimeout(timeoutMillis); + + // special handling for https + if (sslContext != null && connection instanceof HttpsURLConnection) { + HttpsURLConnection secureConnection = (HttpsURLConnection) connection; + secureConnection.setSSLSocketFactory(sslContext.getSocketFactory()); + + // check the trusted hostname property and override the HostnameVerifier + secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(), + secureConnection.getHostnameVerifier())); + } + + return connection; + } + + public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException { + final HttpURLConnection connection = getConnection(url, timeoutMillis); + connection.setRequestMethod("GET"); + final int responseCode = connection.getResponseCode(); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + StreamUtils.copy(connection.getInputStream(), baos); + final String responseMessage = baos.toString(); + + if ( responseCode == RESPONSE_CODE_OK ) { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(responseMessage); + final JsonNode controllerNode = jsonNode.get("controller"); + return mapper.readValue(controllerNode, ControllerDTO.class); + } else { + throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage); + } + } + + private static class OverrideHostnameVerifier implements HostnameVerifier { + private final String trustedHostname; + private final HostnameVerifier delegate; + + private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { + this.trustedHostname = trustedHostname; + this.delegate = delegate; + } + + @Override + public boolean verify(String hostname, SSLSession session) { + if (trustedHostname.equalsIgnoreCase(hostname)) { + return true; + } + return delegate.verify(hostname, session); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java deleted file mode 100644 index b2dbdcd..0000000 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.util; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; - -import javax.net.ssl.SSLContext; -import javax.ws.rs.core.MediaType; - -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.entity.ControllerEntity; -import org.apache.nifi.web.util.WebUtils; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.UniformInterfaceException; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.core.util.MultivaluedMapImpl; - -/** - * - */ -public class RemoteNiFiUtils { - - public static final String CONTROLLER_URI_PATH = "/controller"; - - private static final int CONNECT_TIMEOUT = 10000; - private static final int READ_TIMEOUT = 10000; - - private final Client client; - - public RemoteNiFiUtils(final SSLContext sslContext) { - this.client = getClient(sslContext); - } - - - /** - * Gets the content at the specified URI. - * - * @param uri - * @param timeoutMillis - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { - return get(uri, timeoutMillis, null); - } - - /** - * Gets the content at the specified URI using the given query parameters. - * - * @param uri - * @param timeoutMillis - * @param queryParams - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { - // perform the request - WebResource webResource = client.resource(uri); - if ( queryParams != null ) { - for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) { - webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); - } - } - - webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); - webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); - - return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - } - - /** - * Performs a HEAD request to the specified URI. - * - * @param uri - * @param timeoutMillis - * @return - * @throws ClientHandlerException - * @throws UniformInterfaceException - */ - public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { - // perform the request - WebResource webResource = client.resource(uri); - webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); - webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); - return webResource.head(); - } - - /** - * Gets a client based on the specified URI. - * - * @param uri - * @return - */ - private Client getClient(final SSLContext sslContext) { - final Client client; - if (sslContext == null) { - client = WebUtils.createClient(null); - } else { - client = WebUtils.createClient(null, sslContext); - } - - client.setReadTimeout(READ_TIMEOUT); - client.setConnectTimeout(CONNECT_TIMEOUT); - - return client; - } - - - /** - * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance - * is not configured to use Site-to-Site transfers. - * - * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port. - * @param timeoutMillis - * @return - * @throws IOException - */ - public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getRemoteListeningPort(uriObject, timeoutMillis); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getRemoteRootGroupId(uriObject, timeoutMillis); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException { - try { - final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); - return getController(uriObject, timeoutMillis).getInstanceId(); - } catch (URISyntaxException e) { - throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); - } - } - - /** - * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance - * is not configured to use Site-to-Site transfers. - * - * @param uri the full URI to fetch, including the path. - * @return - * @throws IOException - */ - private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException { - return getController(uri, timeoutMillis).getRemoteSiteListeningPort(); - } - - private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException { - return getController(uri, timeoutMillis).getId(); - } - - public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { - final ClientResponse response = get(uri, timeoutMillis); - - if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) { - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - return entity.getController(); - } else { - final String responseMessage = response.getEntity(String.class); - throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage); - } - } - - /** - * Issues a registration request on behalf of the current user. - * - * @param baseApiUri - * @return - */ - public ClientResponse issueRegistrationRequest(String baseApiUri) { - final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users")); - - // set up the query params - MultivaluedMapImpl entity = new MultivaluedMapImpl(); - entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first."); - - // create the web resource - WebResource webResource = client.resource(uri); - - // get the client utils and make the request - return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index 8781421..bb16a34 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -36,7 +36,7 @@ import org.junit.Test; public class TestSiteToSiteClient { @Test - @Ignore("For local testing only; not really a unit test but a manual test") + //@Ignore("For local testing only; not really a unit test but a manual test") public void testReceive() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); @@ -58,7 +58,6 @@ public class TestSiteToSiteClient { final byte[] buff = new byte[(int) size]; StreamUtils.fillBuffer(in, buff); - System.out.println(buff.length); Assert.assertNull(transaction.receive()); @@ -71,7 +70,7 @@ public class TestSiteToSiteClient { @Test - @Ignore("For local testing only; not really a unit test but a manual test") + //@Ignore("For local testing only; not really a unit test but a manual test") public void testSend() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); @@ -84,12 +83,12 @@ public class TestSiteToSiteClient { final Transaction transaction = client.createTransaction(TransferDirection.SEND); Assert.assertNotNull(transaction); - final Map<String, String> attrs = new HashMap<>(); - attrs.put("site-to-site", "yes, please!"); - final byte[] bytes = "Hello".getBytes(); - final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); - transaction.send(packet); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("site-to-site", "yes, please!"); + final byte[] bytes = "Hello".getBytes(); + final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); + transaction.send(packet); transaction.confirm(); transaction.complete(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java new file mode 100644 index 0000000..23dfdda --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/RemoteNiFiUtils.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.MediaType; + +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.entity.ControllerEntity; +import org.apache.nifi.web.util.WebUtils; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +/** + * + */ +public class RemoteNiFiUtils { + + public static final String CONTROLLER_URI_PATH = "/controller"; + + private static final int CONNECT_TIMEOUT = 10000; + private static final int READ_TIMEOUT = 10000; + + private final Client client; + + public RemoteNiFiUtils(final SSLContext sslContext) { + this.client = getClient(sslContext); + } + + + /** + * Gets the content at the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + return get(uri, timeoutMillis, null); + } + + /** + * Gets the content at the specified URI using the given query parameters. + * + * @param uri + * @param timeoutMillis + * @param queryParams + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + if ( queryParams != null ) { + for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) { + webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); + } + } + + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + + return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } + + /** + * Performs a HEAD request to the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + return webResource.head(); + } + + /** + * Gets a client based on the specified URI. + * + * @param uri + * @return + */ + private Client getClient(final SSLContext sslContext) { + final Client client; + if (sslContext == null) { + client = WebUtils.createClient(null); + } else { + client = WebUtils.createClient(null, sslContext); + } + + client.setReadTimeout(READ_TIMEOUT); + client.setConnectTimeout(CONNECT_TIMEOUT); + + return client; + } + + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port. + * @param timeoutMillis + * @return + * @throws IOException + */ + public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteListeningPort(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteRootGroupId(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getController(uriObject, timeoutMillis).getInstanceId(); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the full URI to fetch, including the path. + * @return + * @throws IOException + */ + private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getRemoteSiteListeningPort(); + } + + private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getId(); + } + + public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { + final ClientResponse response = get(uri, timeoutMillis); + + if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) { + final ControllerEntity entity = response.getEntity(ControllerEntity.class); + return entity.getController(); + } else { + final String responseMessage = response.getEntity(String.class); + throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage); + } + } + + /** + * Issues a registration request on behalf of the current user. + * + * @param baseApiUri + * @return + */ + public ClientResponse issueRegistrationRequest(String baseApiUri) { + final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users")); + + // set up the query params + MultivaluedMapImpl entity = new MultivaluedMapImpl(); + entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first."); + + // create the web resource + WebResource webResource = client.resource(uri); + + // get the client utils and make the request + return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e16fc797/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 79ef7a8..6b70fe6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.remote.util.RemoteNiFiUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils;
