NIFI-282: Updated documentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1d4b1848 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1d4b1848 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1d4b1848 Branch: refs/heads/site-to-site-client Commit: 1d4b1848087049788232ef433fb5a505cf05bab3 Parents: 20557d3 Author: Mark Payne <[email protected]> Authored: Tue Feb 3 20:33:44 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Feb 3 20:33:44 2015 -0500 ---------------------------------------------------------------------- .../nifi/remote/client/SiteToSiteClient.java | 8 +++ .../nifi/remote/client/socket/SocketClient.java | 6 ++ .../nifi/remote/SocketRemoteSiteListener.java | 3 + .../nifi/remote/StandardRemoteGroupPort.java | 72 +++++++++++++++++++- .../apache/nifi/remote/RemoteDestination.java | 20 ++++++ 5 files changed, 108 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 0a05c58..fa94b81 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -98,6 +98,14 @@ public interface SiteToSiteClient extends Closeable { /** * <p> + * Returns the configuration object that was built by the Builder + * </p> + * @return + */ + SiteToSiteClientConfig getConfig(); + + /** + * <p> * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient. * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If * a change in configuration should be desired, the client should be {@link Closeable#close() closed} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index c04a90b..0494d04 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -32,6 +32,7 @@ import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.util.ObjectHolder; public class SocketClient implements SiteToSiteClient { + private final SiteToSiteClientConfig config; private final EndpointConnectionStatePool pool; private final boolean compress; private final String portName; @@ -42,12 +43,17 @@ public class SocketClient implements SiteToSiteClient { pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); + this.config = config; this.compress = config.isUseCompression(); this.portIdentifier = config.getPortIdentifier(); this.portName = config.getPortName(); this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); } + @Override + public SiteToSiteClientConfig getConfig() { + return config; + } @Override public boolean isSecure() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index f053e65..3295956 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -122,6 +122,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { } LOG.trace("Got connection"); + if ( stopped.get() ) { + return; + } final Socket socket = acceptedSocket; final SocketChannel socketChannel = socket.getChannel(); final Thread thread = new Thread(new Runnable() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index a51cdba..3fc2f5a 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.remote; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -32,11 +33,13 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.socket.EndpointConnectionState; import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; import org.apache.nifi.remote.codec.FlowFileCodec; @@ -142,7 +145,74 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final EndpointConnectionState connectionState; try { - connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection); + // TODO: TESTING ONLY!! REMOVE!! + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + @Override + public boolean isUseCompression() { + return false; + } + + @Override + public String getUrl() { + return null; + } + + @Override + public long getTimeout(TimeUnit timeUnit) { + return timeUnit.convert(1, TimeUnit.SECONDS); + } + + @Override + public SSLContext getSslContext() { + return null; + } + + @Override + public long getPreferredBatchSize() { + return 1024 * 1024L; + } + + @Override + public long getPreferredBatchDuration(TimeUnit timeUnit) { + return timeUnit.convert(1, TimeUnit.SECONDS); + } + + @Override + public int getPreferredBatchCount() { + return 1; + } + + @Override + public String getPortName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getPortIdentifier() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getPenalizationPeriod(TimeUnit timeUnit) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public File getPeerPersistenceFile() { + // TODO Auto-generated method stub + return null; + } + + @Override + public EventReporter getEventReporter() { + // TODO Auto-generated method stub + return null; + } + }; + connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection, config); } catch (final PortNotRunningException e) { context.yield(); this.targetRunning.set(false); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java index 8c972f7..f718581 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java @@ -18,10 +18,30 @@ package org.apache.nifi.remote; import java.util.concurrent.TimeUnit; + +/** + * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications + */ public interface RemoteDestination { + /** + * Returns the identifier of the remote destination + * + * @return + */ String getIdentifier(); + /** + * Returns the amount of time that system should pause sending to a particular node if unable to + * send data to or receive data from this endpoint + * @param timeUnit + * @return + */ long getYieldPeriod(TimeUnit timeUnit); + /** + * Returns whether or not compression should be used when transferring data to or receiving + * data from the remote endpoint + * @return + */ boolean isUseCompression(); }
