NIFI-282: Added ability for client to request batch size and duration when pulling data from remote NiFi
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/20557d38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/20557d38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/20557d38 Branch: refs/heads/site-to-site-client Commit: 20557d386c6fb049836be0109212a903ed818f54 Parents: bbe335d Author: Mark Payne <[email protected]> Authored: Mon Feb 2 20:42:34 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Feb 2 20:42:34 2015 -0500 ---------------------------------------------------------------------- .../nifi/remote/client/SiteToSiteClient.java | 56 +++++++ .../remote/client/SiteToSiteClientConfig.java | 30 ++++ .../socket/EndpointConnectionStatePool.java | 16 ++ .../protocol/socket/HandshakeProperty.java | 40 ++++- .../protocol/socket/SocketClientProtocol.java | 30 +++- .../socket/SocketFlowFileServerProtocol.java | 157 ++++++++++++------- 6 files changed, 270 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index f4d6f17..0a05c58 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -27,6 +27,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.socket.SocketClient; +import org.apache.nifi.remote.protocol.DataPacket; /** * <p> @@ -113,6 +114,9 @@ public interface SiteToSiteClient extends Closeable { private boolean useCompression; private String portName; private String portIdentifier; + private int batchCount; + private long batchSize; + private long batchNanos; /** * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of @@ -238,6 +242,43 @@ public interface SiteToSiteClient extends Closeable { } /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of {@link DataPacket}s to include in a Transaction. + * + * @return + */ + public Builder requestBatchCount(final int count) { + this.batchCount = count; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of bytes to include in a Transaction. + * + * @return + */ + public Builder requestBatchSize(final long bytes) { + this.batchSize = bytes; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred amount of time that a Transaction should span. + * + * @return + */ + public Builder requestBatchDuration(final long value, final TimeUnit unit) { + this.batchNanos = unit.toNanos(value); + return this; + } + + + /** * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi * @return */ @@ -296,6 +337,21 @@ public interface SiteToSiteClient extends Closeable { public EventReporter getEventReporter() { return Builder.this.getEventReporter(); } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return Builder.this.batchSize; + } + + @Override + public int getPreferredBatchCount() { + return Builder.this.batchCount; + } }; return new SocketClient(config); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 6ba2d3f..37c48f8 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.protocol.DataPacket; public interface SiteToSiteClientConfig { @@ -81,4 +82,33 @@ public interface SiteToSiteClientConfig { * @return */ String getPortIdentifier(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * amount of time that we will request a NiFi instance to send data to us in a Transaction. + * + * @param timeUnit + * @return + */ + long getPreferredBatchDuration(TimeUnit timeUnit); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of bytes that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + long getPreferredBatchSize(); + + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + int getPreferredBatchCount(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java index e0ed61f..df42efe 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -63,6 +63,7 @@ import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteResourceInitiator; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; @@ -186,7 +187,14 @@ public class EndpointConnectionStatePool { }, 5, 5, TimeUnit.SECONDS); } + public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + return getEndpointConnectionState(remoteDestination, direction, null); + } + + + + public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { // // Attempt to get a connection state that already exists for this URL. // @@ -229,6 +237,14 @@ public class EndpointConnectionStatePool { final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); peer = new Peer(commsSession, peerUrl, clusterUrl.toString()); + + // set properties based on config + if ( config != null ) { + protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS)); + protocol.setPreferredBatchCount(config.getPreferredBatchCount()); + protocol.setPreferredBatchSize(config.getPreferredBatchSize()); + protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS)); + } // perform handshake try { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java index c4519cd..41dc276 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java @@ -16,8 +16,46 @@ */ package org.apache.nifi.remote.protocol.socket; + +/** + * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol. + */ public enum HandshakeProperty { + /** + * Boolean value indicating whether or not the contents of a FlowFile should be + * GZipped when transferred. + */ GZIP, + + /** + * The unique identifier of the port to communicate with + */ PORT_IDENTIFIER, - REQUEST_EXPIRATION_MILLIS; + + /** + * Indicates the number of milliseconds after the request was made that the client + * will wait for a response. If no response has been received by the time this value + * expires, the server can move on without attempting to service the request because + * the client will have already disconnected. + */ + REQUEST_EXPIRATION_MILLIS, + + /** + * The preferred number of FlowFiles that the server should send to the client + * when pulling data. This property was introduced in version 5 of the protocol. + */ + BATCH_COUNT, + + /** + * The preferred number of bytes that the server should send to the client when + * pulling data. This property was introduced in version 5 of the protocol. + */ + BATCH_SIZE, + + /** + * The preferred amount of time that the server should send data to the client + * when pulling data. This property was introduced in version 5 of the protocol. + * Value is in milliseconds. + */ + BATCH_DURATION; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 4222edf..6976cd8 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -56,7 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketClientProtocol implements ClientProtocol { - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); private RemoteDestination destination; private boolean useCompression = false; @@ -70,12 +70,28 @@ public class SocketClientProtocol implements ClientProtocol { private boolean readyForFileTransfer = false; private String transitUriPrefix = null; private int timeoutMillis = 30000; + + private int batchCount; + private long batchSize; + private long batchMillis; private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds public SocketClientProtocol() { } + public void setPreferredBatchCount(final int count) { + this.batchCount = count; + } + + public void setPreferredBatchSize(final long bytes) { + this.batchSize = bytes; + } + + public void setPreferredBatchDuration(final long millis) { + this.batchMillis = millis; + } + public void setDestination(final RemoteDestination destination) { this.destination = destination; this.useCompression = destination.isUseCompression(); @@ -106,6 +122,18 @@ public class SocketClientProtocol implements ClientProtocol { properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); + if ( versionNegotiator.getVersion() >= 5 ) { + if ( batchCount > 0 ) { + properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount)); + } + if ( batchSize > 0L ) { + properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize)); + } + if ( batchMillis > 0L ) { + properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis)); + } + } + final CommunicationsSession commsSession = peer.getCommunicationsSession(); commsSession.setTimeout(timeoutMillis); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 12c234e..eb22b0e 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -78,10 +78,14 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { private FlowFileCodec negotiatedFlowFileCodec = null; private String transitUriPrefix = null; - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + private int requestedBatchCount = 0; + private long requestedBatchBytes = 0L; + private long requestedBatchNanos = 0L; + private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); + + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class); - private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds @Override @@ -137,68 +141,90 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { throw new HandshakeException("Received unknown property: " + propertyName); } - switch (property) { - case GZIP: { - useGzip = Boolean.parseBoolean(value); - break; - } - case REQUEST_EXPIRATION_MILLIS: - requestExpirationMillis = Long.parseLong(value); - break; - case PORT_IDENTIFIER: { - Port receivedPort = rootGroup.getInputPort(value); - if ( receivedPort == null ) { - receivedPort = rootGroup.getOutputPort(value); - } - if ( receivedPort == null ) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received unknown port identifier: " + value); - } - if ( !(receivedPort instanceof RootGroupPort) ) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); - } - - this.port = (RootGroupPort) receivedPort; - final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); - if ( !portAuthResult.isAuthorized() ) { - logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); - ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); - responseWritten = true; + try { + switch (property) { + case GZIP: { + useGzip = Boolean.parseBoolean(value); break; } - - if ( !receivedPort.isValid() ) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); - responseWritten = true; + case REQUEST_EXPIRATION_MILLIS: + requestExpirationMillis = Long.parseLong(value); break; - } - - if ( !receivedPort.isRunning() ) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); - responseWritten = true; + case BATCH_COUNT: + requestedBatchCount = Integer.parseInt(value); + if ( requestedBatchCount < 0 ) { + throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value); + } break; - } - - // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this - // we we will simply not service the request but the sender will timeout - if ( getVersionNegotiator().getVersion() > 1 ) { - for ( final Connection connection : port.getConnections() ) { - if ( connection.getFlowFileQueue().isFull() ) { - logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); - ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); - responseWritten = true; - break; + case BATCH_SIZE: + requestedBatchBytes = Long.parseLong(value); + if ( requestedBatchBytes < 0 ) { + throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value); + } + break; + case BATCH_DURATION: + requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)); + if ( requestedBatchNanos < 0 ) { + throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value); + } + break; + case PORT_IDENTIFIER: { + Port receivedPort = rootGroup.getInputPort(value); + if ( receivedPort == null ) { + receivedPort = rootGroup.getOutputPort(value); + } + if ( receivedPort == null ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received unknown port identifier: " + value); + } + if ( !(receivedPort instanceof RootGroupPort) ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); + } + + this.port = (RootGroupPort) receivedPort; + final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); + if ( !portAuthResult.isAuthorized() ) { + logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); + ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); + responseWritten = true; + break; + } + + if ( !receivedPort.isValid() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); + responseWritten = true; + break; + } + + if ( !receivedPort.isRunning() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); + responseWritten = true; + break; + } + + // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this + // we we will simply not service the request but the sender will timeout + if ( getVersionNegotiator().getVersion() > 1 ) { + for ( final Connection connection : port.getConnections() ) { + if ( connection.getFlowFileQueue().isFull() ) { + logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); + ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); + responseWritten = true; + break; + } } } + + break; } - - break; } + } catch (final NumberFormatException nfe) { + throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value); } } @@ -333,8 +359,25 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); session.remove(flowFile); + // determine if we should check for more data on queue. final long sendingNanos = System.nanoTime() - startNanos; - if ( sendingNanos < BATCH_NANOS ) { + boolean poll = true; + if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) { + poll = false; + } + if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) { + poll = false; + } + if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) { + poll = false; + } + + if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) { + poll = (sendingNanos < DEFAULT_BATCH_NANOS); + } + + if ( poll ) { + // we've not elapsed the requested sending duration, so get more data. flowFile = session.get(); } else { flowFile = null;
