NIFI-282: Updated documentation

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1d4b1848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1d4b1848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1d4b1848

Branch: refs/heads/site-to-site-client
Commit: 1d4b1848087049788232ef433fb5a505cf05bab3
Parents: 20557d3
Author: Mark Payne <[email protected]>
Authored: Tue Feb 3 20:33:44 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Feb 3 20:33:44 2015 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    |  8 +++
 .../nifi/remote/client/socket/SocketClient.java |  6 ++
 .../nifi/remote/SocketRemoteSiteListener.java   |  3 +
 .../nifi/remote/StandardRemoteGroupPort.java    | 72 +++++++++++++++++++-
 .../apache/nifi/remote/RemoteDestination.java   | 20 ++++++
 5 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 0a05c58..fa94b81 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -98,6 +98,14 @@ public interface SiteToSiteClient extends Closeable {
        
        /**
         * <p>
+        * Returns the configuration object that was built by the Builder
+        * </p>
+        * @return
+        */
+       SiteToSiteClientConfig getConfig();
+       
+       /**
+        * <p>
         * The Builder is the mechanism by which all configuration is passed to 
the SiteToSiteClient.
         * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., 
it is immutable). If
         * a change in configuration should be desired, the client should be 
{@link Closeable#close() closed}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c04a90b..0494d04 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -32,6 +32,7 @@ import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.util.ObjectHolder;
 
 public class SocketClient implements SiteToSiteClient {
+    private final SiteToSiteClientConfig config;
        private final EndpointConnectionStatePool pool;
        private final boolean compress;
        private final String portName;
@@ -42,12 +43,17 @@ public class SocketClient implements SiteToSiteClient {
                pool = new EndpointConnectionStatePool(config.getUrl(), (int) 
config.getTimeout(TimeUnit.MILLISECONDS), 
                                config.getSslContext(), 
config.getEventReporter(), config.getPeerPersistenceFile());
                
+               this.config = config;
                this.compress = config.isUseCompression();
                this.portIdentifier = config.getPortIdentifier();
                this.portName = config.getPortName();
                this.penalizationNanos = 
config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
        }
        
+       @Override
+       public SiteToSiteClientConfig getConfig() {
+           return config;
+       }
        
        @Override
        public boolean isSecure() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index f053e65..3295956 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -122,6 +122,9 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                     }
                     LOG.trace("Got connection");
                     
+                    if ( stopped.get() ) {
+                        return;
+                    }
                     final Socket socket = acceptedSocket;
                     final SocketChannel socketChannel = socket.getChannel();
                     final Thread thread = new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..3fc2f5a 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,11 +33,13 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.client.socket.EndpointConnectionState;
 import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -142,7 +145,74 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         
         final EndpointConnectionState connectionState;
         try {
-               connectionState = 
connectionStatePool.getEndpointConnectionState(this, transferDirection);
+            // TODO: TESTING ONLY!! REMOVE!!
+            final SiteToSiteClientConfig config = new SiteToSiteClientConfig() 
{
+                @Override
+                public boolean isUseCompression() {
+                    return false;
+                }
+                
+                @Override
+                public String getUrl() {
+                    return null;
+                }
+                
+                @Override
+                public long getTimeout(TimeUnit timeUnit) {
+                    return timeUnit.convert(1, TimeUnit.SECONDS);
+                }
+                
+                @Override
+                public SSLContext getSslContext() {
+                    return null;
+                }
+                
+                @Override
+                public long getPreferredBatchSize() {
+                    return 1024 * 1024L;
+                }
+                
+                @Override
+                public long getPreferredBatchDuration(TimeUnit timeUnit) {
+                    return timeUnit.convert(1, TimeUnit.SECONDS);
+                }
+                
+                @Override
+                public int getPreferredBatchCount() {
+                    return 1;
+                }
+                
+                @Override
+                public String getPortName() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public String getPortIdentifier() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public long getPenalizationPeriod(TimeUnit timeUnit) {
+                    // TODO Auto-generated method stub
+                    return 0;
+                }
+                
+                @Override
+                public File getPeerPersistenceFile() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public EventReporter getEventReporter() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+            };
+               connectionState = 
connectionStatePool.getEndpointConnectionState(this, transferDirection, config);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java 
b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 8c972f7..f718581 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -18,10 +18,30 @@ package org.apache.nifi.remote;
 
 import java.util.concurrent.TimeUnit;
 
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for 
site-to-site communications
+ */
 public interface RemoteDestination {
+    /**
+     * Returns the identifier of the remote destination
+     * 
+     * @return
+     */
        String getIdentifier();
        
+       /**
+        * Returns the amount of time that system should pause sending to a 
particular node if unable to 
+        * send data to or receive data from this endpoint
+        * @param timeUnit
+        * @return
+        */
        long getYieldPeriod(TimeUnit timeUnit);
        
+       /**
+        * Returns whether or not compression should be used when transferring 
data to or receiving
+        * data from the remote endpoint
+        * @return
+        */
        boolean isUseCompression();
 }

Reply via email to