Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client 1a08fead3 -> 1d4b18480
NIFI-282: Add SiteToSiteConfig object Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bbe335dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bbe335dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bbe335dc Branch: refs/heads/site-to-site-client Commit: bbe335dc6d1b9ffcdf973600aff7712444937a49 Parents: 1a08fea Author: Mark Payne <[email protected]> Authored: Tue Jan 27 21:15:33 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Jan 27 21:15:33 2015 -0500 ---------------------------------------------------------------------- .../nifi/remote/client/SiteToSiteClient.java | 50 +++++++++++- .../remote/client/SiteToSiteClientConfig.java | 84 ++++++++++++++++++++ .../nifi/remote/client/socket/SocketClient.java | 15 ++-- 3 files changed, 141 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 9a63e5b..f4d6f17 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -250,7 +250,55 @@ public interface SiteToSiteClient extends Closeable { throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); } - return new SocketClient(this); + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + + @Override + public boolean isUseCompression() { + return Builder.this.isUseCompression(); + } + + @Override + public String getUrl() { + return Builder.this.getUrl(); + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return Builder.this.getTimeout(timeUnit); + } + + @Override + public SSLContext getSslContext() { + return Builder.this.getSslContext(); + } + + @Override + public String getPortName() { + return Builder.this.getPortName(); + } + + @Override + public String getPortIdentifier() { + return Builder.this.getPortIdentifier(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return Builder.this.getPenalizationPeriod(timeUnit); + } + + @Override + public File getPeerPersistenceFile() { + return Builder.this.getPeerPersistenceFile(); + } + + @Override + public EventReporter getEventReporter() { + return Builder.this.getEventReporter(); + } + }; + + return new SocketClient(config); } /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java new file mode 100644 index 0000000..6ba2d3f --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; + +public interface SiteToSiteClientConfig { + + /** + * Returns the configured URL for the remote NiFi instance + * @return + */ + String getUrl(); + + /** + * Returns the communications timeout in nanoseconds + * @return + */ + long getTimeout(final TimeUnit timeUnit); + + /** + * Returns the amount of time that a particular node will be ignored after a + * communications error with that node occurs + * @param timeUnit + * @return + */ + long getPenalizationPeriod(TimeUnit timeUnit); + + /** + * Returns the SSL Context that is configured for this builder + * @return + */ + SSLContext getSslContext(); + + /** + * Returns the EventReporter that is to be used by clients to report events + * @return + */ + EventReporter getEventReporter(); + + /** + * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. + * @return + */ + File getPeerPersistenceFile(); + + /** + * Returns a boolean indicating whether or not compression will be used to transfer data + * to and from the remote instance + * @return + */ + boolean isUseCompression(); + + /** + * Returns the name of the port that the client is to communicate with. + * @return + */ + String getPortName(); + + /** + * Returns the identifier of the port that the client is to communicate with. + * @return + */ + String getPortIdentifier(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 3314c52..c04a90b 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -23,6 +23,7 @@ import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; @@ -37,14 +38,14 @@ public class SocketClient implements SiteToSiteClient { private final long penalizationNanos; private volatile String portIdentifier; - public SocketClient(final Builder builder) { - pool = new EndpointConnectionStatePool(builder.getUrl(), (int) builder.getTimeout(TimeUnit.MILLISECONDS), - builder.getSslContext(), builder.getEventReporter(), builder.getPeerPersistenceFile()); + public SocketClient(final SiteToSiteClientConfig config) { + pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), + config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); - this.compress = builder.isUseCompression(); - this.portIdentifier = builder.getPortIdentifier(); - this.portName = builder.getPortName(); - this.penalizationNanos = builder.getPenalizationPeriod(TimeUnit.NANOSECONDS); + this.compress = config.isUseCompression(); + this.portIdentifier = config.getPortIdentifier(); + this.portName = config.getPortName(); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); }
