http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 629032a..5c4ce55 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -36,358 +36,383 @@ import org.apache.nifi.remote.protocol.DataPacket; /** * <p> - * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi - * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster). + * The SiteToSiteClient provides a mechanism for sending data to a remote + * instance of NiFi (or NiFi cluster) and retrieving data from a remote instance + * of NiFi (or NiFi cluster). * </p> - * + * * <p> - * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must - * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance - * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL - * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load - * balancing the transactions across the different nodes in the cluster. + * When configuring the client via the {@link SiteToSiteClient.Builder}, the + * Builder must be provided the URL of the remote NiFi instance. If the URL + * points to a standalone instance of NiFi, all interaction will take place with + * that instance of NiFi. However, if the URL points to the NiFi Cluster Manager + * of a cluster, the client will automatically handle load balancing the + * transactions across the different nodes in the cluster. * </p> - * + * * <p> - * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the - * remote instance takes place. After data has been exchanged or it is determined that no data - * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)} - * method) or can be completed (via the {@link Transaction#complete(boolean)} method). + * The SiteToSiteClient provides a {@link Transaction} through which all + * interaction with the remote instance takes place. After data has been + * exchanged or it is determined that no data is available, the Transaction can + * then be canceled (via the {@link Transaction#cancel(String)} method) or can + * be completed (via the {@link Transaction#complete(boolean)} method). * </p> - * + * * <p> - * An instance of SiteToSiteClient can be obtained by constructing a new instance of the - * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the - * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method. + * An instance of SiteToSiteClient can be obtained by constructing a new + * instance of the {@link SiteToSiteClient.Builder} class, calling the + * appropriate methods to configured the client as desired, and then calling the + * {@link SiteToSiteClient.Builder#build() build()} method. * </p> * * <p> - * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can - * share access to the same client. However, the {@link Transaction} that is created by the client - * is not thread safe and should not be shared among threads. + * The SiteToSiteClient itself is immutable once constructed and is thread-safe. + * Many threads can share access to the same client. However, the + * {@link Transaction} that is created by the client is not thread safe and + * should not be shared among threads. * </p> */ public interface SiteToSiteClient extends Closeable { - /** - * <p> - * Creates a new Transaction that can be used to either send data to a remote NiFi instance - * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument. - * </p> - * - * <p> - * <b>Note:</b> If all of the nodes are penalized (See {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then - * this method will return <code>null</code>. - * </p> - * - * @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND} - * indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates - * that this Transaction will be used to receive data from the remote instance. - * - * @return a Transaction to use for sending or receiving data, or <code>null</code> if all nodes are penalized. - * @throws IOException - */ - Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException; - - /** - * <p> - * Returns {@code true} if site-to-site communications with the remote instance are secure, - * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not - * communications are secure depends on the server, not the client. - * </p> - * - * <p> - * In order to determine whether the server is configured for secure communications, the client may have - * to query the server's RESTful interface. Doing so could result in an IOException. - * </p> - * - * @return - * @throws IOException if unable to query the remote instance's RESTful interface or if the remote - * instance is not configured to allow site-to-site communications - */ - boolean isSecure() throws IOException; - - /** - * <p> - * Returns the configuration object that was built by the Builder - * </p> - * @return - */ - SiteToSiteClientConfig getConfig(); - - /** - * <p> - * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient. - * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If - * a change in configuration should be desired, the client should be {@link Closeable#close() closed} - * and a new client created. - * </p> - */ - public static class Builder implements Serializable { + /** + * <p> + * Creates a new Transaction that can be used to either send data to a + * remote NiFi instance or receive data from a remote NiFi instance, + * depending on the value passed for the {@code direction} argument. + * </p> + * + * <p> + * <b>Note:</b> If all of the nodes are penalized (See + * {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then this method + * will return <code>null</code>. + * </p> + * + * @param direction specifies which direction the data should be + * transferred. A value of {@link TransferDirection#SEND} indicates that + * this Transaction will send data to the remote instance; a value of + * {@link TransferDirection#RECEIVE} indicates that this Transaction will be + * used to receive data from the remote instance. + * + * @return a Transaction to use for sending or receiving data, or + * <code>null</code> if all nodes are penalized. + * @throws org.apache.nifi.remote.exception.HandshakeException he + * @throws org.apache.nifi.remote.exception.PortNotRunningException pnre + * @throws IOException ioe + * @throws org.apache.nifi.remote.exception.UnknownPortException upe + */ + Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException; + + /** + * <p> + * In order to determine whether the server is configured for secure + * communications, the client may have to query the server's RESTful + * interface. Doing so could result in an IOException. + * </p> + * + * @return {@code true} if site-to-site communications with the remote + * instance are secure, {@code false} if site-to-site communications with + * the remote instance are not secure. Whether or not communications are + * secure depends on the server, not the client + * @throws IOException if unable to query the remote instance's RESTful + * interface or if the remote instance is not configured to allow + * site-to-site communications + */ + boolean isSecure() throws IOException; + + /** + * + * @return the configuration object that was built by the Builder + */ + SiteToSiteClientConfig getConfig(); + + /** + * <p> + * The Builder is the mechanism by which all configuration is passed to the + * SiteToSiteClient. Once constructed, the SiteToSiteClient cannot be + * reconfigured (i.e., it is immutable). If a change in configuration should + * be desired, the client should be {@link Closeable#close() closed} and a + * new client created. + * </p> + */ + public static class Builder implements Serializable { + private static final long serialVersionUID = -4954962284343090219L; - + private String url; - private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); - private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); - private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); - private SSLContext sslContext; - private EventReporter eventReporter; - private File peerPersistenceFile; - private boolean useCompression; - private String portName; - private String portIdentifier; - private int batchCount; - private long batchSize; - private long batchNanos; - - /** - * Populates the builder with values from the provided config - * @param config - * @return - */ - public Builder fromConfig(final SiteToSiteClientConfig config) { - this.url = config.getUrl(); - this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); - this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); - this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); - this.sslContext = config.getSslContext(); - this.eventReporter = config.getEventReporter(); - this.peerPersistenceFile = config.getPeerPersistenceFile(); - this.useCompression = config.isUseCompression(); - this.portName = config.getPortName(); - this.portIdentifier = config.getPortIdentifier(); - this.batchCount = config.getPreferredBatchCount(); - this.batchSize = config.getPreferredBatchSize(); - this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); - - return this; - } - - /** - * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of - * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across - * the different nodes. - * - * @param url - * @return - */ - public Builder url(final String url) { - this.url = url; - return this; - } - - /** - * Specifies the communications timeouts to use when interacting with the remote instances. The - * default value is 30 seconds. - * - * @param timeout - * @param unit - * @return - */ - public Builder timeout(final long timeout, final TimeUnit unit) { - this.timeoutNanos = unit.toNanos(timeout); - return this; - } - - /** - * Specifies the amount of time that a connection can remain idle in the connection pool before it - * is "expired" and shutdown. The default value is 30 seconds. - * - * @param timeout - * @param unit - * @return - */ - public Builder idleExpiration(final long timeout, final TimeUnit unit) { - this.idleExpirationNanos = unit.toNanos(timeout); - return this; - } - - /** - * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster - * or the remote instance of NiFi if it is standalone), specifies how long the client should - * wait before attempting to communicate with that node again. While a particular node is penalized, - * all other nodes in the remote cluster (if any) will still be available for communication. - * The default value is 3 seconds. - * - * @param period - * @param unit - * @return - */ - public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { - this.penalizationNanos = unit.toNanos(period); - return this; - } - - /** - * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not - * specified, communications will not be secure. The remote instance of NiFi always determines - * whether or not Site-to-Site communications are secure (i.e., the client will always use - * secure or non-secure communications, depending on what the server dictates). - * - * @param sslContext - * @return - */ - public Builder sslContext(final SSLContext sslContext) { - this.sslContext = sslContext; - return this; - } - - - /** - * Provides an EventReporter that can be used by the client in order to report any events that - * could be of interest when communicating with the remote instance. The EventReporter provided - * must be threadsafe. - * - * @param eventReporter - * @return - */ - public Builder eventReporter(final EventReporter eventReporter) { - this.eventReporter = eventReporter; - return this; - } - - - /** - * Specifies a file that the client can write to in order to persist the list of nodes in the - * remote cluster and recover the list of nodes upon restart. This allows the client to function - * if the remote Cluster Manager is unavailable, even after a restart of the client software. - * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager - * will result in not being able to communicate with the remote instance if a new client - * is created. - * - * @param peerPersistenceFile - * @return - */ - public Builder peerPersistenceFile(final File peerPersistenceFile) { - this.peerPersistenceFile = peerPersistenceFile; - return this; - } - - /** - * Specifies whether or not data should be compressed before being transferred to or from the - * remote instance. - * - * @param compress - * @return - */ - public Builder useCompression(final boolean compress) { - this.useCompression = compress; - return this; - } - - /** - * Specifies the name of the port to communicate with. Either the port name or the port identifier - * must be specified. - * - * @param portName - * @return - */ - public Builder portName(final String portName) { - this.portName = portName; - return this; - } - - /** - * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing - * the port name, as the port name may change. - * - * @param portIdentifier - * @return - */ - public Builder portIdentifier(final String portIdentifier) { - this.portIdentifier = portIdentifier; - return this; - } - - /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This method specifies - * the preferred number of {@link DataPacket}s to include in a Transaction. - * - * @return - */ - public Builder requestBatchCount(final int count) { - this.batchCount = count; - return this; - } - - /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This method specifies - * the preferred number of bytes to include in a Transaction. - * - * @return - */ - public Builder requestBatchSize(final long bytes) { - this.batchSize = bytes; - return this; - } - + private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); + private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); + private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private SSLContext sslContext; + private EventReporter eventReporter; + private File peerPersistenceFile; + private boolean useCompression; + private String portName; + private String portIdentifier; + private int batchCount; + private long batchSize; + private long batchNanos; + + /** + * Populates the builder with values from the provided config + * + * @param config to start with + * @return the builder + */ + public Builder fromConfig(final SiteToSiteClientConfig config) { + this.url = config.getUrl(); + this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); + this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.sslContext = config.getSslContext(); + this.eventReporter = config.getEventReporter(); + this.peerPersistenceFile = config.getPeerPersistenceFile(); + this.useCompression = config.isUseCompression(); + this.portName = config.getPortName(); + this.portIdentifier = config.getPortIdentifier(); + this.batchCount = config.getPreferredBatchCount(); + this.batchSize = config.getPreferredBatchSize(); + this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + + return this; + } + + /** + * Specifies the URL of the remote NiFi instance. If this URL points to + * the Cluster Manager of a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes. + * + * @param url url of remote instance + * @return the builder + */ + public Builder url(final String url) { + this.url = url; + return this; + } + + /** + * Specifies the communications timeouts to use when interacting with + * the remote instances. The default value is 30 seconds. + * + * @param timeout to use when interacting with remote instances + * @param unit unit of time over which to interpret the given timeout + * @return the builder + */ + public Builder timeout(final long timeout, final TimeUnit unit) { + this.timeoutNanos = unit.toNanos(timeout); + return this; + } + + /** + * Specifies the amount of time that a connection can remain idle in the + * connection pool before it is "expired" and shutdown. The default + * value is 30 seconds. + * + * @param timeout to use when interacting with remote instances + * @param unit unit of time over which to interpret the given timeout + * @return the builder + */ + public Builder idleExpiration(final long timeout, final TimeUnit unit) { + this.idleExpirationNanos = unit.toNanos(timeout); + return this; + } + + /** + * If there is a problem communicating with a node (i.e., any node in + * the remote NiFi cluster or the remote instance of NiFi if it is + * standalone), specifies how long the client should wait before + * attempting to communicate with that node again. While a particular + * node is penalized, all other nodes in the remote cluster (if any) + * will still be available for communication. The default value is 3 + * seconds. + * + * @param period time to wait between communication attempts + * @param unit over which to evaluate the given period + * @return the builder + */ + public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { + this.penalizationNanos = unit.toNanos(period); + return this; + } + /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This method specifies - * the preferred amount of time that a Transaction should span. - * - * @return + * Specifies the SSL Context to use when communicating with the remote + * NiFi instance(s). If not specified, communications will not be + * secure. The remote instance of NiFi always determines whether or not + * Site-to-Site communications are secure (i.e., the client will always + * use secure or non-secure communications, depending on what the server + * dictates). + * + * @param sslContext the context + * @return the builder */ - public Builder requestBatchDuration(final long value, final TimeUnit unit) { - this.batchNanos = unit.toNanos(value); - return this; - } - - /** - * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient - * @return - */ - public SiteToSiteClientConfig buildConfig() { - final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + public Builder sslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + return this; + } + + /** + * Provides an EventReporter that can be used by the client in order to + * report any events that could be of interest when communicating with + * the remote instance. The EventReporter provided must be threadsafe. + * + * @param eventReporter reporter + * @return the builder + */ + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + + /** + * Specifies a file that the client can write to in order to persist the + * list of nodes in the remote cluster and recover the list of nodes + * upon restart. This allows the client to function if the remote + * Cluster Manager is unavailable, even after a restart of the client + * software. If not specified, the list of nodes will not be persisted + * and a failure of the Cluster Manager will result in not being able to + * communicate with the remote instance if a new client is created. + * + * @param peerPersistenceFile file + * @return the builder + */ + public Builder peerPersistenceFile(final File peerPersistenceFile) { + this.peerPersistenceFile = peerPersistenceFile; + return this; + } + + /** + * Specifies whether or not data should be compressed before being + * transferred to or from the remote instance. + * + * @param compress true if should compress + * @return the builder + */ + public Builder useCompression(final boolean compress) { + this.useCompression = compress; + return this; + } + + /** + * Specifies the name of the port to communicate with. Either the port + * name or the port identifier must be specified. + * + * @param portName name of port + * @return the builder + */ + public Builder portName(final String portName) { + this.portName = portName; + return this; + } + + /** + * Specifies the unique identifier of the port to communicate with. If + * it is known, this is preferred over providing the port name, as the + * port name may change. + * + * @param portIdentifier identifier of port + * @return the builder + */ + public Builder portIdentifier(final String portIdentifier) { + this.portIdentifier = portIdentifier; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * number of {@link DataPacket}s to include in a Transaction. + * + * @param count client preferred batch size + * @return the builder + */ + public Builder requestBatchCount(final int count) { + this.batchCount = count; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * number of bytes to include in a Transaction. + * + * @param bytes client preferred batch size + * @return the builder + */ + public Builder requestBatchSize(final long bytes) { + this.batchSize = bytes; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large + * a Transaction is. However, the client has the ability to request a + * particular batch size/duration. This method specifies the preferred + * amount of time that a Transaction should span. + * + * @param value client preferred batch duration + * @param unit client preferred batch duration unit + * @return the builder + */ + public Builder requestBatchDuration(final long value, final TimeUnit unit) { + this.batchNanos = unit.toNanos(value); + return this; + } + + /** + * @return a {@link SiteToSiteClientConfig} for the configured values + * but does not create a SiteToSiteClient + */ + public SiteToSiteClientConfig buildConfig() { + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { private static final long serialVersionUID = 1323119754841633818L; @Override public boolean isUseCompression() { return Builder.this.isUseCompression(); } - + @Override public String getUrl() { return Builder.this.getUrl(); } - + @Override public long getTimeout(final TimeUnit timeUnit) { return Builder.this.getTimeout(timeUnit); } - + @Override public long getIdleConnectionExpiration(final TimeUnit timeUnit) { return Builder.this.getIdleConnectionExpiration(timeUnit); } - + @Override public SSLContext getSslContext() { return Builder.this.getSslContext(); } - + @Override public String getPortName() { return Builder.this.getPortName(); } - + @Override public String getPortIdentifier() { return Builder.this.getPortIdentifier(); } - + @Override public long getPenalizationPeriod(final TimeUnit timeUnit) { return Builder.this.getPenalizationPeriod(timeUnit); } - + @Override public File getPeerPersistenceFile() { return Builder.this.getPeerPersistenceFile(); } - + @Override public EventReporter getEventReporter() { return Builder.this.getEventReporter(); @@ -397,123 +422,117 @@ public interface SiteToSiteClient extends Closeable { public long getPreferredBatchDuration(final TimeUnit timeUnit) { return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); } - + @Override public long getPreferredBatchSize() { return Builder.this.batchSize; } - + @Override public int getPreferredBatchCount() { return Builder.this.batchCount; } }; - + return config; - } - - /** - * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi - * @return - * - * @throws IllegalStateException if either the url is not set or neither the port name nor port identifier - * is set. - */ - public SiteToSiteClient build() { - if ( url == null ) { - throw new IllegalStateException("Must specify URL to build Site-to-Site client"); - } - - if ( portName == null && portIdentifier == null ) { - throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); - } - - return new SocketClient(buildConfig()); - } - - /** - * Returns the configured URL for the remote NiFi instance - * @return - */ - public String getUrl() { - return url; - } - - /** - * Returns the communications timeout - * @return - */ - public long getTimeout(final TimeUnit timeUnit) { - return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); - } - - /** - * Returns the amount of of time that a connection can remain idle in the connection - * pool before being shutdown - * @param timeUnit - * @return - */ - public long getIdleConnectionExpiration(final TimeUnit timeUnit) { - return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); - } - - /** - * Returns the amount of time that a particular node will be ignored after a - * communications error with that node occurs - * @param timeUnit - * @return - */ - public long getPenalizationPeriod(TimeUnit timeUnit) { - return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); - } - - /** - * Returns the SSL Context that is configured for this builder - * @return - */ - public SSLContext getSslContext() { - return sslContext; - } - - /** - * Returns the EventReporter that is to be used by clients to report events - * @return - */ - public EventReporter getEventReporter() { - return eventReporter; - } - - /** - * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. - * @return - */ - public File getPeerPersistenceFile() { - return peerPersistenceFile; - } - - /** - * Returns a boolean indicating whether or not compression will be used to transfer data - * to and from the remote instance - * @return - */ - public boolean isUseCompression() { - return useCompression; - } - - /** - * Returns the name of the port that the client is to communicate with. - * @return - */ - public String getPortName() { - return portName; - } - - /** - * Returns the identifier of the port that the client is to communicate with. - * @return - */ - public String getPortIdentifier() { - return portIdentifier; - } - } + } + + /** + * @return a new SiteToSiteClient that can be used to send and receive + * data with remote instances of NiFi + * + * @throws IllegalStateException if either the url is not set or neither + * the port name nor port identifier is set. + */ + public SiteToSiteClient build() { + if (url == null) { + throw new IllegalStateException("Must specify URL to build Site-to-Site client"); + } + + if (portName == null && portIdentifier == null) { + throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); + } + + return new SocketClient(buildConfig()); + } + + /** + * @return the configured URL for the remote NiFi instance + */ + public String getUrl() { + return url; + } + + /** + * @param timeUnit unit over which to interpret the timeout + * @return the communications timeout + */ + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + /** + * @param timeUnit unit over which to interpret the time + * @return the amount of of time that a connection can remain idle in + * the connection pool before being shutdown + */ + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); + } + + /** + * @param timeUnit unit of reported time + * @return the amount of time that a particular node will be ignored + * after a communications error with that node occurs + */ + public long getPenalizationPeriod(TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + /** + * @return the SSL Context that is configured for this builder + */ + public SSLContext getSslContext() { + return sslContext; + } + + /** + * @return the EventReporter that is to be used by clients to report + * events + */ + public EventReporter getEventReporter() { + return eventReporter; + } + + /** + * @return the file that is to be used for persisting the nodes of a + * remote cluster, if any + */ + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + /** + * @return a boolean indicating whether or not compression will be used + * to transfer data to and from the remote instance + */ + public boolean isUseCompression() { + return useCompression; + } + + /** + * @return the name of the port that the client is to communicate with + */ + public String getPortName() { + return portName; + } + + /** + * @return the identifier of the port that the client is to communicate + * with + */ + public String getPortIdentifier() { + return portIdentifier; + } + } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 5e7fbe8..c4b0d22 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -27,97 +27,91 @@ import org.apache.nifi.remote.protocol.DataPacket; public interface SiteToSiteClientConfig extends Serializable { - /** - * Returns the configured URL for the remote NiFi instance - * @return - */ - String getUrl(); - - /** - * Returns the communications timeout in nanoseconds - * @return - */ - long getTimeout(final TimeUnit timeUnit); - - /** - * Returns the amount of time that a connection can remain idle before it is - * "expired" and shut down - * @param timeUnit - * @return - */ - long getIdleConnectionExpiration(TimeUnit timeUnit); - - /** - * Returns the amount of time that a particular node will be ignored after a - * communications error with that node occurs - * @param timeUnit - * @return - */ - long getPenalizationPeriod(TimeUnit timeUnit); - - /** - * Returns the SSL Context that is configured for this builder - * @return - */ - SSLContext getSslContext(); - - /** - * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. - * @return - */ - File getPeerPersistenceFile(); - - /** - * Returns a boolean indicating whether or not compression will be used to transfer data - * to and from the remote instance - * @return - */ - boolean isUseCompression(); - - /** - * Returns the name of the port that the client is to communicate with. - * @return - */ - String getPortName(); - - /** - * Returns the identifier of the port that the client is to communicate with. - * @return - */ - String getPortIdentifier(); - - /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This returns the maximum - * amount of time that we will request a NiFi instance to send data to us in a Transaction. - * - * @param timeUnit - * @return - */ - long getPreferredBatchDuration(TimeUnit timeUnit); - /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This returns the maximum - * number of bytes that we will request a NiFi instance to send data to us in a Transaction. - * - * @return + * @return the configured URL for the remote NiFi instance */ - long getPreferredBatchSize(); - - - /** - * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, - * the client has the ability to request a particular batch size/duration. This returns the maximum - * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction. - * - * @return + String getUrl(); + + /** + * @param timeUnit unit over which to report the timeout + * @return the communications timeout in given unit + */ + long getTimeout(final TimeUnit timeUnit); + + /** + * @param timeUnit the unit for which to report the time + * @return the amount of time that a connection can remain idle before it is + * "expired" and shut down + */ + long getIdleConnectionExpiration(TimeUnit timeUnit); + + /** + * @param timeUnit unit over which to report the time + * @return the amount of time that a particular node will be ignored after a + * communications error with that node occurs + */ + long getPenalizationPeriod(TimeUnit timeUnit); + + /** + * @return the SSL Context that is configured for this builder + */ + SSLContext getSslContext(); + + /** + * @return the file that is to be used for persisting the nodes of a remote + * cluster, if any + */ + File getPeerPersistenceFile(); + + /** + * @return a boolean indicating whether or not compression will be used to + * transfer data to and from the remote instance */ - int getPreferredBatchCount(); - - /** - * Returns the EventReporter that is to be used by clients to report events - * @return + boolean isUseCompression(); + + /** + * @return the name of the port that the client is to communicate with + */ + String getPortName(); + + /** + * @return the identifier of the port that the client is to communicate with + */ + String getPortIdentifier(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @param timeUnit unit of time over which to report the duration + * @return the maximum amount of time that we will request a NiFi instance + * to send data to us in a Transaction + */ + long getPreferredBatchDuration(TimeUnit timeUnit); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @return returns the maximum number of bytes that we will request a NiFi + * instance to send data to us in a Transaction + */ + long getPreferredBatchSize(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a + * Transaction is. However, the client has the ability to request a + * particular batch size/duration. + * + * @return the maximum number of {@link DataPacket}s that we will request a + * NiFi instance to send data to us in a Transaction + */ + int getPreferredBatchCount(); + + /** + * @return the EventReporter that is to be used by clients to report events */ EventReporter getEventReporter(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java index 651ae50..1a16b02 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java @@ -21,33 +21,34 @@ import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; public class EndpointConnection { - private final Peer peer; + + private final Peer peer; private final SocketClientProtocol socketClientProtocol; private final FlowFileCodec codec; private volatile long lastUsed; - + public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { this.peer = peer; this.socketClientProtocol = socketClientProtocol; this.codec = codec; } - + public FlowFileCodec getCodec() { return codec; } - + public SocketClientProtocol getSocketClientProtocol() { return socketClientProtocol; } - + public Peer getPeer() { return peer; } - + public void setLastTimeUsed() { lastUsed = System.currentTimeMillis(); } - + public long getLastTimeUsed() { return lastUsed; }
