NIFI-282: Added ability for client to request batch size and duration when 
pulling data from remote NiFi


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/20557d38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/20557d38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/20557d38

Branch: refs/heads/site-to-site-client
Commit: 20557d386c6fb049836be0109212a903ed818f54
Parents: bbe335d
Author: Mark Payne <[email protected]>
Authored: Mon Feb 2 20:42:34 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Feb 2 20:42:34 2015 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    |  56 +++++++
 .../remote/client/SiteToSiteClientConfig.java   |  30 ++++
 .../socket/EndpointConnectionStatePool.java     |  16 ++
 .../protocol/socket/HandshakeProperty.java      |  40 ++++-
 .../protocol/socket/SocketClientProtocol.java   |  30 +++-
 .../socket/SocketFlowFileServerProtocol.java    | 157 ++++++++++++-------
 6 files changed, 270 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index f4d6f17..0a05c58 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -27,6 +27,7 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 /**
  * <p>
@@ -113,6 +114,9 @@ public interface SiteToSiteClient extends Closeable {
                private boolean useCompression;
                private String portName;
                private String portIdentifier;
+               private int batchCount;
+               private long batchSize;
+               private long batchNanos;
 
                /**
                 * Specifies the URL of the remote NiFi instance. If this URL 
points to the Cluster Manager of
@@ -238,6 +242,43 @@ public interface SiteToSiteClient extends Closeable {
                }
                
                /**
+            * When pulling data from a NiFi instance, the sender chooses how 
large a Transaction is. However,
+            * the client has the ability to request a particular batch 
size/duration. This method specifies
+            * the preferred number of {@link DataPacket}s to include in a 
Transaction.
+            * 
+            * @return
+            */
+               public Builder requestBatchCount(final int count) {
+                   this.batchCount = count;
+                   return this;
+               }
+
+               /**
+            * When pulling data from a NiFi instance, the sender chooses how 
large a Transaction is. However,
+            * the client has the ability to request a particular batch 
size/duration. This method specifies
+            * the preferred number of bytes to include in a Transaction.
+            * 
+            * @return
+            */
+               public Builder requestBatchSize(final long bytes) {
+                   this.batchSize = bytes;
+                   return this;
+               }
+               
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how 
large a Transaction is. However,
+         * the client has the ability to request a particular batch 
size/duration. This method specifies
+         * the preferred amount of time that a Transaction should span.
+         * 
+         * @return
+         */
+               public Builder requestBatchDuration(final long value, final 
TimeUnit unit) {
+                   this.batchNanos = unit.toNanos(value);
+                   return this;
+               }
+               
+               
+               /**
                 * Builds a new SiteToSiteClient that can be used to send and 
receive data with remote instances of NiFi
                 * @return
                 */
@@ -296,6 +337,21 @@ public interface SiteToSiteClient extends Closeable {
                                public EventReporter getEventReporter() {
                                        return Builder.this.getEventReporter();
                                }
+
+                       @Override
+                       public long getPreferredBatchDuration(final TimeUnit 
timeUnit) {
+                           return timeUnit.convert(Builder.this.batchNanos, 
TimeUnit.NANOSECONDS);
+                       }
+                       
+                       @Override
+                       public long getPreferredBatchSize() {
+                           return Builder.this.batchSize;
+                       }
+                       
+                       @Override
+                       public int getPreferredBatchCount() {
+                           return Builder.this.batchCount;
+                       }
                        };
                        
                        return new SocketClient(config);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 6ba2d3f..37c48f8 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 public interface SiteToSiteClientConfig {
 
@@ -81,4 +82,33 @@ public interface SiteToSiteClientConfig {
         * @return
         */
        String getPortIdentifier();
+       
+       /**
+        * When pulling data from a NiFi instance, the sender chooses how large 
a Transaction is. However,
+        * the client has the ability to request a particular batch 
size/duration. This returns the maximum
+        * amount of time that we will request a NiFi instance to send data to 
us in a Transaction.
+        * 
+        * @param timeUnit
+        * @return
+        */
+       long getPreferredBatchDuration(TimeUnit timeUnit);
+       
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a 
Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. 
This returns the maximum
+     * number of bytes that we will request a NiFi instance to send data to us 
in a Transaction.
+     * 
+     * @return
+     */
+       long getPreferredBatchSize();
+       
+       
+       /**
+     * When pulling data from a NiFi instance, the sender chooses how large a 
Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. 
This returns the maximum
+     * number of {@link DataPacket}s that we will request a NiFi instance to 
send data to us in a Transaction.
+     * 
+     * @return
+     */
+       int getPreferredBatchCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index e0ed61f..df42efe 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -63,6 +63,7 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -186,7 +187,14 @@ public class EndpointConnectionStatePool {
        }, 5, 5, TimeUnit.SECONDS);
     }
     
+    
     public EndpointConnectionState getEndpointConnectionState(final 
RemoteDestination remoteDestination, final TransferDirection direction) throws 
IOException, HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
+        return getEndpointConnectionState(remoteDestination, direction, null);
+    }
+    
+    
+    
+    public EndpointConnectionState getEndpointConnectionState(final 
RemoteDestination remoteDestination, final TransferDirection direction, final 
SiteToSiteClientConfig config) throws IOException, HandshakeException, 
PortNotRunningException, UnknownPortException, ProtocolException {
        //
         // Attempt to get a connection state that already exists for this URL.
         //
@@ -229,6 +237,14 @@ public class EndpointConnectionStatePool {
                 
                 final String peerUrl = "nifi://" + peerStatus.getHostname() + 
":" + peerStatus.getPort();
                 peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+
+                // set properties based on config
+                if ( config != null ) {
+                    protocol.setTimeout((int) 
config.getTimeout(TimeUnit.MILLISECONDS));
+                    
protocol.setPreferredBatchCount(config.getPreferredBatchCount());
+                    
protocol.setPreferredBatchSize(config.getPreferredBatchSize());
+                    
protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
+                }
                 
                 // perform handshake
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
index c4519cd..41dc276 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -16,8 +16,46 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket 
Protocol.
+ */
 public enum HandshakeProperty {
+    /**
+     * Boolean value indicating whether or not the contents of a FlowFile 
should be
+     * GZipped when transferred.
+     */
     GZIP,
+    
+    /**
+     * The unique identifier of the port to communicate with
+     */
     PORT_IDENTIFIER,
-    REQUEST_EXPIRATION_MILLIS;
+    
+    /**
+     * Indicates the number of milliseconds after the request was made that 
the client
+     * will wait for a response. If no response has been received by the time 
this value
+     * expires, the server can move on without attempting to service the 
request because
+     * the client will have already disconnected.
+     */
+    REQUEST_EXPIRATION_MILLIS,
+    
+    /**
+     * The preferred number of FlowFiles that the server should send to the 
client
+     * when pulling data. This property was introduced in version 5 of the 
protocol.
+     */
+    BATCH_COUNT,
+    
+    /**
+     * The preferred number of bytes that the server should send to the client 
when
+     * pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_SIZE,
+    
+    /**
+     * The preferred amount of time that the server should send data to the 
client
+     * when pulling data. This property was introduced in version 5 of the 
protocol.
+     * Value is in milliseconds.
+     */
+    BATCH_DURATION;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 4222edf..6976cd8 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -56,7 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketClientProtocol implements ClientProtocol {
-    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(4, 3, 2, 1);
+    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
 
     private RemoteDestination destination;
     private boolean useCompression = false;
@@ -70,12 +70,28 @@ public class SocketClientProtocol implements ClientProtocol 
{
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
+    
+    private int batchCount;
+    private long batchSize;
+    private long batchMillis;
 
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
     
     public SocketClientProtocol() {
     }
 
+    public void setPreferredBatchCount(final int count) {
+        this.batchCount = count;
+    }
+    
+    public void setPreferredBatchSize(final long bytes) {
+        this.batchSize = bytes;
+    }
+    
+    public void setPreferredBatchDuration(final long millis) {
+        this.batchMillis = millis;
+    }
+    
     public void setDestination(final RemoteDestination destination) {
         this.destination = destination;
         this.useCompression = destination.isUseCompression();
@@ -106,6 +122,18 @@ public class SocketClientProtocol implements 
ClientProtocol {
         
         properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, 
String.valueOf(timeoutMillis) );
         
+        if ( versionNegotiator.getVersion() >= 5 ) {
+            if ( batchCount > 0 ) {
+                properties.put(HandshakeProperty.BATCH_COUNT, 
String.valueOf(batchCount));
+            }
+            if ( batchSize > 0L ) {
+                properties.put(HandshakeProperty.BATCH_SIZE, 
String.valueOf(batchSize));
+            }
+            if ( batchMillis > 0L ) {
+                properties.put(HandshakeProperty.BATCH_DURATION, 
String.valueOf(batchMillis));
+            }
+        }
+        
         final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
         commsSession.setTimeout(timeoutMillis);
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 12c234e..eb22b0e 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -78,10 +78,14 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
     private FlowFileCodec negotiatedFlowFileCodec = null;
     private String transitUriPrefix = null;
     
-    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(4, 3, 2, 1);
+    private int requestedBatchCount = 0;
+    private long requestedBatchBytes = 0L;
+    private long requestedBatchNanos = 0L;
+    private static final long DEFAULT_BATCH_NANOS = 
TimeUnit.SECONDS.toNanos(5L);
+    
+    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
     private final Logger logger = 
LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
     
-    private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // 
send batches of up to 5 seconds
 
     
     @Override
@@ -137,68 +141,90 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
                 throw new HandshakeException("Received unknown property: " + 
propertyName);
             }
             
-            switch (property) {
-                case GZIP: {
-                    useGzip = Boolean.parseBoolean(value);
-                    break;
-                }
-                case REQUEST_EXPIRATION_MILLIS:
-                    requestExpirationMillis = Long.parseLong(value);
-                    break;
-                case PORT_IDENTIFIER: {
-                    Port receivedPort = rootGroup.getInputPort(value);
-                    if ( receivedPort == null ) {
-                        receivedPort = rootGroup.getOutputPort(value);
-                    }
-                    if ( receivedPort == null ) {
-                        logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received unknown port 
identifier: " + value);
-                    }
-                    if ( !(receivedPort instanceof RootGroupPort) ) {
-                        logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received port identifier 
" + value + ", but this Port is not a RootGroupPort");
-                    }
-                    
-                    this.port = (RootGroupPort) receivedPort;
-                    final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
-                    if ( !portAuthResult.isAuthorized() ) {
-                        logger.debug("Responding with ResponseCode 
UNAUTHORIZED: ", portAuthResult.getExplanation());
-                        ResponseCode.UNAUTHORIZED.writeResponse(dos, 
portAuthResult.getExplanation());
-                        responseWritten = true;
+            try {
+                switch (property) {
+                    case GZIP: {
+                        useGzip = Boolean.parseBoolean(value);
                         break;
                     }
-                    
-                    if ( !receivedPort.isValid() ) {
-                        logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
-                        responseWritten = true;
+                    case REQUEST_EXPIRATION_MILLIS:
+                        requestExpirationMillis = Long.parseLong(value);
                         break;
-                    }
-                    
-                    if ( !receivedPort.isRunning() ) {
-                        logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
-                        responseWritten = true;
+                    case BATCH_COUNT:
+                        requestedBatchCount = Integer.parseInt(value);
+                        if ( requestedBatchCount < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Count less than 1; requested value: " + value);
+                        }
                         break;
-                    }
-                    
-                    // PORTS_DESTINATION_FULL was introduced in version 2. If 
version 1, just ignore this
-                    // we we will simply not service the request but the 
sender will timeout
-                    if ( getVersionNegotiator().getVersion() > 1 ) {
-                        for ( final Connection connection : 
port.getConnections() ) {
-                            if ( connection.getFlowFileQueue().isFull() ) {
-                                logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", receivedPort);
-                                
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
-                                responseWritten = true;
-                                break;
+                    case BATCH_SIZE:
+                        requestedBatchBytes = Long.parseLong(value);
+                        if ( requestedBatchBytes < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Size less than 1; requested value: " + value);
+                        }
+                        break;
+                    case BATCH_DURATION:
+                        requestedBatchNanos = 
TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
+                        if ( requestedBatchNanos < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Duration less than 1; requested value: " + value);
+                        }
+                        break;
+                    case PORT_IDENTIFIER: {
+                        Port receivedPort = rootGroup.getInputPort(value);
+                        if ( receivedPort == null ) {
+                            receivedPort = rootGroup.getOutputPort(value);
+                        }
+                        if ( receivedPort == null ) {
+                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received unknown 
port identifier: " + value);
+                        }
+                        if ( !(receivedPort instanceof RootGroupPort) ) {
+                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received port 
identifier " + value + ", but this Port is not a RootGroupPort");
+                        }
+                        
+                        this.port = (RootGroupPort) receivedPort;
+                        final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+                        if ( !portAuthResult.isAuthorized() ) {
+                            logger.debug("Responding with ResponseCode 
UNAUTHORIZED: ", portAuthResult.getExplanation());
+                            ResponseCode.UNAUTHORIZED.writeResponse(dos, 
portAuthResult.getExplanation());
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isValid() ) {
+                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isRunning() ) {
+                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        // PORTS_DESTINATION_FULL was introduced in version 2. 
If version 1, just ignore this
+                        // we we will simply not service the request but the 
sender will timeout
+                        if ( getVersionNegotiator().getVersion() > 1 ) {
+                            for ( final Connection connection : 
port.getConnections() ) {
+                                if ( connection.getFlowFileQueue().isFull() ) {
+                                    logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", receivedPort);
+                                    
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+                                    responseWritten = true;
+                                    break;
+                                }
                             }
                         }
+                        
+                        break;
                     }
-                    
-                    break;
                 }
+            } catch (final NumberFormatException nfe) {
+                throw new HandshakeException("Received invalid value for 
property '" + property + "'; invalid value: " + value);
             }
         }
         
@@ -333,8 +359,25 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
             session.remove(flowFile);
             
+            // determine if we should check for more data on queue.
             final long sendingNanos = System.nanoTime() - startNanos;
-            if ( sendingNanos < BATCH_NANOS ) { 
+            boolean poll = true;
+            if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 
0L ) {
+                poll = false;
+            }
+            if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L 
) {
+                poll = false;
+            }
+            if ( flowFilesSent.size() >= requestedBatchCount && 
requestedBatchCount > 0 ) {
+                poll = false;
+            }
+            
+            if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && 
requestedBatchCount == 0 ) {
+                poll = (sendingNanos < DEFAULT_BATCH_NANOS);
+            }
+            
+            if ( poll ) { 
+                // we've not elapsed the requested sending duration, so get 
more data.
                 flowFile = session.get();
             } else {
                 flowFile = null;

Reply via email to