NIFI-1857: HTTPS Site-to-Site - Enable HTTP(S) for Site-to-Site communication - Support HTTP Proxy in the middle of local and remote NiFi - Support BASIC and DIGEST auth with Proxy Server - Provide 2-phase style commit same as existing socket version - [WIP] Test with the latest cluster env (without NCM) hasn't tested yet
- Fixed Buffer handling issues at asyc http client POST - Fixed JS error when applying Remote Process Group Port setting from UI - Use compression setting from UI - Removed already finished TODO comments - Added additional buffer draining code after receiving EOF - Added inspection and assert code to make sure Site-to-Site client has written data fully to output stream - Changed default nifi.remote.input.secure from true to false This closes #497. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c120c498 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c120c498 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c120c498 Branch: refs/heads/master Commit: c120c4982d4fc811b06b672e3983b8ca5fb8ae64 Parents: a5fecda Author: Koji Kawamura <[email protected]> Authored: Mon Jun 6 22:19:26 2016 +0900 Committer: Mark Payne <[email protected]> Committed: Thu Jun 9 15:09:57 2016 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/util/NiFiProperties.java | 39 +- nifi-commons/nifi-site-to-site-client/pom.xml | 19 + .../apache/nifi/remote/AbstractTransaction.java | 389 +++++++ .../remote/ClientTransactionCompletion.java | 57 + .../remote/client/AbstractSiteToSiteClient.java | 54 + .../apache/nifi/remote/client/PeerSelector.java | 341 ++++++ .../nifi/remote/client/PeerStatusProvider.java | 27 + .../nifi/remote/client/SiteInfoProvider.java | 231 ++++ .../nifi/remote/client/SiteToSiteClient.java | 95 +- .../remote/client/SiteToSiteClientConfig.java | 13 + .../nifi/remote/client/http/HttpClient.java | 200 ++++ .../TransportProtocolVersionNegotiator.java | 42 + .../client/socket/EndpointConnectionPool.java | 559 +--------- .../nifi/remote/client/socket/SocketClient.java | 40 +- .../remote/cluster/AdaptedNodeInformation.java | 11 + .../nifi/remote/cluster/NodeInformation.java | 20 +- .../remote/cluster/NodeInformationAdapter.java | 4 +- .../remote/exception/HandshakeException.java | 15 + .../io/http/HttpCommunicationsSession.java | 97 ++ .../apache/nifi/remote/io/http/HttpInput.java | 58 + .../apache/nifi/remote/io/http/HttpOutput.java | 45 + .../http/HttpServerCommunicationsSession.java | 72 ++ .../nifi/remote/protocol/ClientProtocol.java | 14 +- .../nifi/remote/protocol/HandshakeProperty.java | 59 + .../apache/nifi/remote/protocol/Response.java | 52 + .../nifi/remote/protocol/ResponseCode.java | 152 +++ .../protocol/SiteToSiteTransportProtocol.java | 22 + .../protocol/http/HttpClientTransaction.java | 187 ++++ .../nifi/remote/protocol/http/HttpHeaders.java | 35 + .../nifi/remote/protocol/http/HttpProxy.java | 59 + .../protocol/socket/HandshakeProperty.java | 59 - .../nifi/remote/protocol/socket/Response.java | 52 - .../remote/protocol/socket/ResponseCode.java | 148 --- .../protocol/socket/SocketClientProtocol.java | 173 +-- .../socket/SocketClientTransaction.java | 345 +----- .../SocketClientTransactionCompletion.java | 57 - .../nifi/remote/util/EventReportUtil.java | 50 + .../nifi/remote/util/NiFiRestApiUtil.java | 100 -- .../remote/util/SiteToSiteRestApiClient.java | 992 +++++++++++++++++ .../nifi/remote/client/TestPeerSelector.java | 125 +++ .../nifi/remote/client/http/TestHttpClient.java | 950 ++++++++++++++++ .../socket/TestEndpointConnectionStatePool.java | 92 -- .../remote/protocol/SiteToSiteTestUtils.java | 237 ++++ .../http/TestHttpClientTransaction.java | 346 ++++++ .../socket/TestSocketClientTransaction.java | 334 ++++++ .../src/main/asciidoc/administration-guide.adoc | 12 +- .../src/main/asciidoc/getting-started.adoc | 1 + .../images/configure-remote-process-group.png | Bin 0 -> 36406 bytes nifi-docs/src/main/asciidoc/user-guide.adoc | 37 +- .../apache/nifi/web/api/dto/ControllerDTO.java | 19 + .../nifi/web/api/dto/RemoteProcessGroupDTO.java | 45 + .../apache/nifi/web/api/dto/remote/PeerDTO.java | 78 ++ .../apache/nifi/web/api/entity/PeersEntity.java | 46 + .../web/api/entity/TransactionResultEntity.java | 53 + .../cluster/protocol/ConnectionResponse.java | 10 +- .../nifi/cluster/protocol/NodeIdentifier.java | 26 +- .../jaxb/message/AdaptedConnectionResponse.java | 9 + .../jaxb/message/AdaptedNodeIdentifier.java | 11 + .../jaxb/message/ConnectionResponseAdapter.java | 3 +- .../jaxb/message/NodeIdentifierAdapter.java | 3 +- .../message/ReconnectionRequestMessage.java | 9 + .../jaxb/message/TestJaxbProtocolUtils.java | 4 +- .../node/NodeClusterCoordinator.java | 9 +- .../heartbeat/TestAbstractHeartbeatMonitor.java | 2 +- .../endpoints/TestProcessorEndpointMerger.java | 4 +- .../http/replication/TestResponseUtils.java | 8 +- .../TestThreadPoolRequestReplicator.java | 24 +- .../node/TestNodeClusterCoordinator.java | 6 +- .../apache/nifi/groups/RemoteProcessGroup.java | 21 + .../org/apache/nifi/remote/RootGroupPort.java | 11 +- .../apache/nifi/controller/FlowController.java | 60 +- .../nifi/controller/StandardFlowService.java | 12 +- .../controller/StandardFlowSynchronizer.java | 24 +- .../apache/nifi/controller/TemplateUtils.java | 1 + .../serialization/FlowFromDOMFactory.java | 11 +- .../serialization/StandardFlowSerializer.java | 11 + .../nifi/remote/StandardRemoteProcessGroup.java | 187 ++-- .../src/main/resources/conf/nifi.properties | 6 +- .../nifi/remote/HttpRemoteSiteListener.java | 243 ++++ .../nifi/remote/SocketRemoteSiteListener.java | 32 +- .../nifi/remote/StandardRemoteGroupPort.java | 4 + .../nifi/remote/StandardRootGroupPort.java | 8 +- .../AbstractFlowFileServerProtocol.java | 559 ++++++++++ .../remote/protocol/FlowFileTransaction.java | 71 ++ .../remote/protocol/HandshakenProperties.java | 96 ++ .../http/HttpFlowFileServerProtocol.java | 30 + .../http/HttpFlowFileServerProtocolImpl.java | 223 ++++ .../socket/ClusterManagerServerProtocol.java | 2 + .../socket/SocketFlowFileServerProtocol.java | 529 +-------- .../nifi/remote/TestHttpRemoteSiteListener.java | 101 ++ .../http/TestHttpFlowFileServerProtocol.java | 589 ++++++++++ .../nifi/web/StandardNiFiServiceFacade.java | 2 + .../nifi/web/api/ApplicationResource.java | 2 +- .../apache/nifi/web/api/SiteToSiteResource.java | 1037 +++++++++++++++++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 22 +- .../nifi/web/controller/ControllerFacade.java | 22 + .../dao/impl/StandardRemoteProcessGroupDAO.java | 85 +- .../nifi/web/api/TestSiteToSiteResource.java | 516 +++++++++ .../resources/access-control/nifi.properties | 2 +- .../test/resources/site-to-site/nifi.properties | 174 +++ .../canvas/new-remote-process-group-dialog.jsp | 72 ++ .../remote-process-group-configuration.jsp | 69 +- .../canvas/remote-process-group-details.jsp | 69 +- .../nifi-web-ui/src/main/webapp/css/dialog.css | 17 +- .../css/remote-process-group-configuration.css | 28 +- .../nf-ng-remote-process-group-component.js | 71 +- .../nf-remote-process-group-configuration.js | 33 +- .../canvas/nf-remote-process-group-details.js | 10 + .../nf/canvas/nf-remote-process-group-ports.js | 3 +- 109 files changed, 10283 insertions(+), 2269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 8021cca..bb74b25 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -60,9 +60,11 @@ public class NiFiProperties extends Properties { public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm"; public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider"; public static final String H2_URL_APPEND = "nifi.h2.url.append"; - public static final String REMOTE_INPUT_HOST = "nifi.remote.input.socket.host"; + public static final String REMOTE_INPUT_HOST = "nifi.remote.input.host"; public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port"; public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; + public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; + public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; @@ -210,6 +212,7 @@ public class NiFiProperties extends Properties { public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs"; public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs"; public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi"; + public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 secs"; // cluster common defaults public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; @@ -387,7 +390,7 @@ public class NiFiProperties extends Properties { /** * The socket port to listen on for a Remote Input Port. * - * @return the remote input port + * @return the remote input port for RAW socket communication */ public Integer getRemoteInputPort() { return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT); @@ -408,6 +411,38 @@ public class NiFiProperties extends Properties { } /** + * @return True if property value is 'true'; False otherwise. + */ + public Boolean isSiteToSiteHttpEnabled() { + final String remoteInputHttpEnabled = getProperty(SITE_TO_SITE_HTTP_ENABLED, "false"); + + if ("true".equalsIgnoreCase(remoteInputHttpEnabled)) { + return true; + } else { + return false; + } + + } + + /** + * The HTTP or HTTPS Web API port for a Remote Input Port. + * @return the remote input port for HTTP(S) communication, or null if HTTP(S) Site-to-Site is not enabled + */ + public Integer getRemoteInputHttpPort() { + if (!isSiteToSiteHttpEnabled()) { + return null; + } + + String propertyKey = isSiteToSiteSecure() ? NiFiProperties.WEB_HTTPS_PORT : NiFiProperties.WEB_HTTP_PORT; + Integer port = getIntegerProperty(propertyKey, 0); + if (port == 0) { + throw new RuntimeException("Remote input HTTP" + (isSiteToSiteSecure() ? "S" : "") + + " is enabled but " + propertyKey + " is not specified."); + } + return port; + } + + /** * Returns the directory to which Templates are to be persisted * * @return the template directory http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml index 987e42b..4bd0913 100644 --- a/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi-commons/nifi-site-to-site-client/pom.xml @@ -44,6 +44,15 @@ <artifactId>nifi-client-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + <version>4.1.1</version> + </dependency> <dependency> <groupId>junit</groupId> @@ -56,5 +65,15 @@ <version>2.24.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java new file mode 100644 index 0000000..3e700aa --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +public abstract class AbstractTransaction implements Transaction { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected final Peer peer; + protected final TransferDirection direction; + private final CRC32 crc = new CRC32(); + private final boolean compress; + protected final FlowFileCodec codec; + protected final EventReporter eventReporter; + protected final int protocolVersion; + private final int penaltyMillis; + protected final String destinationId; + protected TransactionState state; + protected boolean dataAvailable = false; + private final long creationNanoTime = System.nanoTime(); + private int transfers = 0; + private long contentBytes = 0; + + public AbstractTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression, + final FlowFileCodec codec, final EventReporter eventReporter, final int protocolVersion, + final int penaltyMillis, final String destinationId) { + this.peer = peer; + this.state = TransactionState.TRANSACTION_STARTED; + this.direction = direction; + this.compress = useCompression; + this.codec = codec; + this.eventReporter = eventReporter; + this.protocolVersion = protocolVersion; + this.penaltyMillis = penaltyMillis; + this.destinationId = destinationId; + } + + protected void close() throws IOException { + } + + @Override + public void send(final byte[] content, final Map<String, String> attributes) throws IOException { + send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length)); + } + + @Override + public void error() { + this.state = TransactionState.ERROR; + try { + close(); + } catch (IOException e) { + logger.warn("Failed to close transaction due to {}", e.getMessage()); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + } + + @Override + public TransactionState getState() { + return state; + } + + @Override + public Communicant getCommunicant() { + return peer; + } + + @Override + public final DataPacket receive() throws IOException { + try { + try { + if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state); + } + + if (direction == TransferDirection.SEND) { + throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction"); + } + + // if we already know there's no data, just return null + if (!dataAvailable) { + return null; + } + + // if we have already received a packet, check if another is available. + if (transfers > 0) { + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = readTransactionResponse(); + switch (dataAvailableCode.getCode()) { + case CONTINUE_TRANSACTION: + logger.debug("{} {} Indicates Transaction should continue", this, peer); + this.dataAvailable = true; + break; + case FINISH_TRANSACTION: + logger.debug("{} {} Indicates Transaction should finish", this, peer); + this.dataAvailable = false; + break; + default: + throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode); + } + } + + // if no data available, return null + if (!dataAvailable) { + return null; + } + + logger.debug("{} Receiving data from {}", this, peer); + final InputStream is = peer.getCommunicationsSession().getInput().getInputStream(); + final InputStream dataIn = compress ? new CompressionInputStream(is) : is; + final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); + + if (packet == null) { + this.dataAvailable = false; + } else { + transfers++; + contentBytes += packet.getSize(); + } + + this.state = TransactionState.DATA_EXCHANGED; + return packet; + } catch (final IOException ioe) { + throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + abstract protected Response readTransactionResponse() throws IOException; + + protected final void writeTransactionResponse(ResponseCode response) throws IOException { + writeTransactionResponse(response, null); + } + abstract protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException; + + @Override + public final void confirm() throws IOException { + try { + try { + if (state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE) { + // client requested to receive data but no data available. no need to confirm. + state = TransactionState.TRANSACTION_CONFIRMED; + return; + } + + if (state != TransactionState.DATA_EXCHANGED) { + throw new IllegalStateException("Cannot confirm Transaction because state is " + state + + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED); + } + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + if (direction == TransferDirection.RECEIVE) { + if (dataAvailable) { + throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + final String calculatedCRC = String.valueOf(crc.getValue()); + writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, calculatedCRC); + + final Response confirmTransactionResponse; + try { + confirmTransactionResponse = readTransactionResponse(); + } catch (final IOException ioe) { + logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction"); + } + throw ioe; + } + + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + state = TransactionState.TRANSACTION_CONFIRMED; + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + writeTransactionResponse(ResponseCode.FINISH_TRANSACTION); + + final String calculatedCRC = String.valueOf(crc.getValue()); + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = readTransactionResponse(); + if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + // CRC was not used before version 4 + if (protocolVersion > 3) { + if (!receivedCRC.equals(calculatedCRC)) { + writeTransactionResponse(ResponseCode.BAD_CHECKSUM); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + + calculatedCRC + " while peer calculated CRC32 Checksum as " + + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + + peer + " but received " + transactionConfirmationResponse); + } + + state = TransactionState.TRANSACTION_CONFIRMED; + } + } catch (final IOException ioe) { + throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public final TransactionCompletion complete() throws IOException { + try { + try { + if (state != TransactionState.TRANSACTION_CONFIRMED) { + throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); + } + + boolean backoff = false; + if (direction == TransferDirection.RECEIVE) { + if (transfers == 0) { + state = TransactionState.TRANSACTION_COMPLETED; + return new ClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime); + } + + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + writeTransactionResponse(ResponseCode.TRANSACTION_FINISHED); + + state = TransactionState.TRANSACTION_COMPLETED; + } else { + final Response transactionResponse; + try { + transactionResponse = readTransactionResponse(); + } catch (final IOException e) { + throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + + "It is unknown whether or not the peer successfully received/processed the data.", e); + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { + peer.penalize(destinationId, penaltyMillis); + backoff = true; + } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { + throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + state = TransactionState.TRANSACTION_COMPLETED; + } + + return new ClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime); + } catch (final IOException ioe) { + throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } finally { + close(); + } + } + + @Override + public final void cancel(final String explanation) throws IOException { + if (state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) { + throw new IllegalStateException("Cannot cancel transaction because state is already " + state); + } + + try { + writeTransactionResponse(ResponseCode.CANCEL_TRANSACTION, explanation == null ? "<No explanation given>" : explanation); + state = TransactionState.TRANSACTION_CANCELED; + } catch (final IOException ioe) { + error(); + throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe); + } finally { + close(); + } + } + + @Override + public final String toString() { + return getClass().getSimpleName() + "[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; + } + + @Override + public final void send(final DataPacket dataPacket) throws IOException { + try { + try { + if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state); + } + + if (direction == TransferDirection.RECEIVE) { + throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction"); + } + + if (transfers > 0) { + writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION); + } + + logger.debug("{} Sending data to {}", this, peer); + + final OutputStream os = peer.getCommunicationsSession().getOutput().getOutputStream(); + final OutputStream dataOut = compress ? new CompressionOutputStream(os) : os; + final OutputStream out = new CheckedOutputStream(dataOut, crc); + + codec.encode(dataPacket, out); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if (compress) { + out.close(); + } + + transfers++; + contentBytes += dataPacket.getSize(); + this.state = TransactionState.DATA_EXCHANGED; + } catch (final IOException ioe) { + throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java new file mode 100644 index 0000000..9778ab4 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.remote.TransactionCompletion; + +public class ClientTransactionCompletion implements TransactionCompletion { + + private final boolean backoff; + private final int dataPacketsTransferred; + private final long bytesTransferred; + private final long durationNanos; + + public ClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) { + this.backoff = backoff; + this.dataPacketsTransferred = dataPacketsTransferred; + this.bytesTransferred = bytesTransferred; + this.durationNanos = durationNanos; + } + + @Override + public boolean isBackoff() { + return backoff; + } + + @Override + public int getDataPacketsTransferred() { + return dataPacketsTransferred; + } + + @Override + public long getBytesTransferred() { + return bytesTransferred; + } + + @Override + public long getDuration(final TimeUnit timeUnit) { + return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java new file mode 100644 index 0000000..0dec3df --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractSiteToSiteClient implements SiteToSiteClient { + + protected final SiteToSiteClientConfig config; + protected final SiteInfoProvider siteInfoProvider; + protected final URI clusterUrl; + + public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) { + this.config = config; + + try { + Objects.requireNonNull(config.getUrl(), "URL cannot be null"); + clusterUrl = new URI(config.getUrl()); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Invalid Cluster URL: " + config.getUrl()); + } + + final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS); + siteInfoProvider = new SiteInfoProvider(); + siteInfoProvider.setClusterUrl(clusterUrl); + siteInfoProvider.setSslContext(config.getSslContext()); + siteInfoProvider.setConnectTimeoutMillis(commsTimeout); + siteInfoProvider.setReadTimeoutMillis(commsTimeout); + siteInfoProvider.setProxy(config.getHttpProxy()); + + } + + @Override + public SiteToSiteClientConfig getConfig() { + return config; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java new file mode 100644 index 0000000..b67e014 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +import static org.apache.nifi.remote.util.EventReportUtil.error; +import static org.apache.nifi.remote.util.EventReportUtil.warn; + +public class PeerSelector { + + private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class); + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + + private static final long PEER_REFRESH_PERIOD = 60000L; + + private final ReentrantLock peerRefreshLock = new ReentrantLock(); + private volatile List<PeerStatus> peerStatuses; + private volatile long peerRefreshTime = 0L; + private final AtomicLong peerIndex = new AtomicLong(0L); + private volatile PeerStatusCache peerStatusCache; + private final File persistenceFile; + + private EventReporter eventReporter; + + private final PeerStatusProvider peerStatusProvider; + private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); + + public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) { + this.peerStatusProvider = peerStatusProvider; + this.persistenceFile = persistenceFile; + Set<PeerStatus> recoveredStatuses; + if (persistenceFile != null && persistenceFile.exists()) { + try { + recoveredStatuses = recoverPersistedPeerStatuses(persistenceFile); + this.peerStatusCache = new PeerStatusCache(recoveredStatuses, persistenceFile.lastModified()); + } catch (final IOException ioe) { + logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); + } + } else { + peerStatusCache = null; + } + } + + private void persistPeerStatuses(final Set<PeerStatus> statuses) { + if (persistenceFile == null) { + return; + } + + try (final OutputStream fos = new FileOutputStream(persistenceFile); + final OutputStream out = new BufferedOutputStream(fos)) { + + for (final PeerStatus status : statuses) { + final PeerDescription description = status.getPeerDescription(); + final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n"; + out.write(line.getBytes(StandardCharsets.UTF_8)); + } + + } catch (final IOException e) { + error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted and peer's NCM is down," + + " may be unable to transfer data until communications with NCM are restored", e.toString()); + logger.error("", e); + } + } + + private static Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { + if (!file.exists()) { + return null; + } + + final Set<PeerStatus> statuses = new HashSet<>(); + try (final InputStream fis = new FileInputStream(file); + final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { + + String line; + while ((line = reader.readLine()) != null) { + final String[] splits = line.split(Pattern.quote(":")); + if (splits.length != 3) { + continue; + } + + final String hostname = splits[0]; + final int port = Integer.parseInt(splits[1]); + final boolean secure = Boolean.parseBoolean(splits[2]); + + statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1)); + } + } + + return statuses; + } + + List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) { + + final int numDestinations = Math.max(128, statuses.size()); + final Map<PeerStatus, Integer> entryCountMap = new HashMap<>(); + + long totalFlowFileCount = 0L; + for (final PeerStatus nodeInfo : statuses) { + totalFlowFileCount += nodeInfo.getFlowFileCount(); + } + + int totalEntries = 0; + for (final PeerStatus nodeInfo : statuses) { + final int flowFileCount = nodeInfo.getFlowFileCount(); + // don't allow any node to get more than 80% of the data + final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); + final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; + final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); + + entryCountMap.put(nodeInfo, Math.max(1, entries)); + totalEntries += entries; + } + + final List<PeerStatus> destinations = new ArrayList<>(totalEntries); + for (int i = 0; i < totalEntries; i++) { + destinations.add(null); + } + for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) { + final PeerStatus nodeInfo = entry.getKey(); + final int numEntries = entry.getValue(); + + int skipIndex = numEntries; + for (int i = 0; i < numEntries; i++) { + int n = (skipIndex * i); + while (true) { + final int index = n % destinations.size(); + PeerStatus status = destinations.get(index); + if (status == null) { + status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount()); + destinations.set(index, status); + break; + } else { + n++; + } + } + } + } + + final StringBuilder distributionDescription = new StringBuilder(); + distributionDescription.append("New Weighted Distribution of Nodes:"); + for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) { + final double percentage = entry.getValue() * 100D / destinations.size(); + distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); + } + logger.info(distributionDescription.toString()); + + // Jumble the list of destinations. + return destinations; + } + + /** + * Updates internal state map to penalize a PeerStatus that points to the + * specified peer + * + * @param peer the peer + * @param penalizationMillis period of time to penalize a given peer + */ + public void penalize(final Peer peer, final long penalizationMillis) { + penalize(peer.getDescription(), penalizationMillis); + } + + public void penalize(final PeerDescription peerDescription, final long penalizationMillis) { + Long expiration = peerTimeoutExpirations.get(peerDescription); + if (expiration == null) { + expiration = Long.valueOf(0L); + } + + final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); + peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration)); + } + + public boolean isPenalized(final PeerStatus peerStatus) { + final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); + return (expirationEnd != null && expirationEnd > System.currentTimeMillis()); + } + + public void clear() { + peerTimeoutExpirations.clear(); + } + + private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) { + return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); + } + + /** + * Return status of a peer that will be used for the next communication. + * The peer with less workload will be selected with higher probability. + * @param direction the amount of workload is calculated based on transaction direction, + * for SEND, a peer with less flow files is preferred, + * for RECEIVE, a peer with more flow files is preferred + * @return a selected peer, if there is no available peer or all peers are penalized, then return null + */ + public PeerStatus getNextPeerStatus(final TransferDirection direction) { + List<PeerStatus> peerList = peerStatuses; + if (isPeerRefreshNeeded(peerList)) { + peerRefreshLock.lock(); + try { + // now that we have the lock, check again that we need to refresh (because another thread + // could have been refreshing while we were waiting for the lock). + peerList = peerStatuses; + if (isPeerRefreshNeeded(peerList)) { + try { + peerList = createPeerStatusList(direction); + } catch (final Exception e) { + final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); + warn(logger, eventReporter, message); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + this.peerStatuses = peerList; + peerRefreshTime = System.currentTimeMillis(); + } + } finally { + peerRefreshLock.unlock(); + } + } + + if (peerList == null || peerList.isEmpty()) { + return null; + } + + PeerStatus peerStatus; + for (int i = 0; i < peerList.size(); i++) { + final long idx = peerIndex.getAndIncrement(); + final int listIndex = (int) (idx % peerList.size()); + peerStatus = peerList.get(listIndex); + + if (isPenalized(peerStatus)) { + logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); + } else { + return peerStatus; + } + } + + logger.debug("{} All peers appear to be penalized; returning null", this); + return null; + } + + private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException { + Set<PeerStatus> statuses = getPeerStatuses(); + if (statuses == null) { + refreshPeers(); + statuses = getPeerStatuses(); + if (statuses == null) { + logger.debug("{} found no peers to connect to", this); + return Collections.emptyList(); + } + } + return formulateDestinationList(statuses, direction); + } + + private Set<PeerStatus> getPeerStatuses() { + final PeerStatusCache cache = this.peerStatusCache; + if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { + return null; + } + + if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { + final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); + for (final PeerStatus status : cache.getStatuses()) { + final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1); + equalizedSet.add(equalizedStatus); + } + + return equalizedSet; + } + + return cache.getStatuses(); + } + + public void refreshPeers() { + final PeerStatusCache existingCache = peerStatusCache; + if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { + return; + } + + try { + final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses(); + persistPeerStatuses(statuses); + peerStatusCache = new PeerStatusCache(statuses); + logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); + } catch (Exception e) { + warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug("", e); + } + } + } + + public void setEventReporter(EventReporter eventReporter) { + this.eventReporter = eventReporter; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java new file mode 100644 index 0000000..68c30af --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import org.apache.nifi.remote.PeerStatus; + +import java.io.IOException; +import java.util.Set; + +public interface PeerStatusProvider { + + Set<PeerStatus> fetchRemotePeerStatuses() throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java new file mode 100644 index 0000000..740ac3a --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SiteInfoProvider { + + private static final Logger logger = LoggerFactory.getLogger(SiteInfoProvider.class); + + private static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + + private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); + private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); + private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); + private Integer siteToSitePort; + private Integer siteToSiteHttpPort; + private Boolean siteToSiteSecure; + private long remoteRefreshTime; + private HttpProxy proxy; + + private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier + private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier + + private URI clusterUrl; + private SSLContext sslContext; + private int connectTimeoutMillis; + private int readTimeoutMillis; + + private ControllerDTO refreshRemoteInfo() throws IOException { + final ControllerDTO controller; + + try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy)) { + apiClient.resolveBaseUrl(clusterUrl); + apiClient.setConnectTimeoutMillis(connectTimeoutMillis); + apiClient.setReadTimeoutMillis(readTimeoutMillis); + controller = apiClient.getController(); + } + + remoteInfoWriteLock.lock(); + try { + this.siteToSitePort = controller.getRemoteSiteListeningPort(); + this.siteToSiteHttpPort = controller.getRemoteSiteHttpListeningPort(); + this.siteToSiteSecure = controller.isSiteToSiteSecure(); + + inputPortMap.clear(); + for (final PortDTO inputPort : controller.getInputPorts()) { + inputPortMap.put(inputPort.getName(), inputPort.getId()); + } + + outputPortMap.clear(); + for (final PortDTO outputPort : controller.getOutputPorts()) { + outputPortMap.put(outputPort.getName(), outputPort.getId()); + } + + this.remoteRefreshTime = System.currentTimeMillis(); + } finally { + remoteInfoWriteLock.unlock(); + } + + return controller; + } + + public boolean isWebInterfaceSecure() { + return clusterUrl.toString().startsWith("https"); + } + + /** + * @return the port that the remote instance is listening on for + * RAW Socket site-to-site communication, or <code>null</code> if the remote instance + * is not configured to allow site-to-site communications. + * + * @throws IOException if unable to communicate with the remote instance + */ + public Integer getSiteToSitePort() throws IOException { + Integer listeningPort; + remoteInfoReadLock.lock(); + try { + listeningPort = this.siteToSitePort; + if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return listeningPort; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + listeningPort = controller.getRemoteSiteListeningPort(); + + return listeningPort; + } + + /** + * @return the port that the remote instance is listening on for + * HTTP(S) site-to-site communication, or <code>null</code> if the remote instance + * is not configured to allow site-to-site communications. + * + * @throws IOException if unable to communicate with the remote instance + */ + public Integer getSiteToSiteHttpPort() throws IOException { + Integer listeningHttpPort; + remoteInfoReadLock.lock(); + try { + listeningHttpPort = this.siteToSiteHttpPort; + if (listeningHttpPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return listeningHttpPort; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + listeningHttpPort = controller.getRemoteSiteHttpListeningPort(); + + return listeningHttpPort; + } + + /** + * @return {@code true} if the remote instance is configured for secure + * site-to-site communications, {@code false} otherwise + * @throws IOException if unable to check if secure + */ + public boolean isSecure() throws IOException { + remoteInfoReadLock.lock(); + try { + final Boolean secure = this.siteToSiteSecure; + if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return secure; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + final Boolean isSecure = controller.isSiteToSiteSecure(); + if (isSecure == null) { + throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); + } + + return isSecure; + } + + public String getPortIdentifier(final String portName, final TransferDirection transferDirection) throws IOException { + if (transferDirection == TransferDirection.RECEIVE) { + return getOutputPortIdentifier(portName); + } else { + return getInputPortIdentifier(portName); + } + } + + public String getInputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, inputPortMap); + } + + public String getOutputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, outputPortMap); + } + + private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { + String identifier; + remoteInfoReadLock.lock(); + try { + identifier = portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + + if (identifier != null) { + return identifier; + } + + refreshRemoteInfo(); + + remoteInfoReadLock.lock(); + try { + return portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + } + + public void setClusterUrl(URI clusterUrl) { + this.clusterUrl = clusterUrl; + } + + public void setSslContext(SSLContext sslContext) { + this.sslContext = sslContext; + } + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + public void setReadTimeoutMillis(int readTimeoutMillis) { + this.readTimeoutMillis = readTimeoutMillis; + } + + public void setProxy(HttpProxy proxy) { + this.proxy = proxy; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index d982cc4..32d1141 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -16,29 +16,31 @@ */ package org.apache.nifi.remote.client; -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.security.KeyStore; -import java.security.SecureRandom; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; - import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.http.HttpClient; import org.apache.nifi.remote.client.socket.SocketClient; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.concurrent.TimeUnit; /** * <p> @@ -163,6 +165,8 @@ public interface SiteToSiteClient extends Closeable { private int batchCount; private long batchSize; private long batchNanos; + private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW; + private HttpProxy httpProxy; /** * Populates the builder with values from the provided config @@ -185,11 +189,13 @@ public interface SiteToSiteClient extends Closeable { this.eventReporter = config.getEventReporter(); this.peerPersistenceFile = config.getPeerPersistenceFile(); this.useCompression = config.isUseCompression(); + this.transportProtocol = config.getTransportProtocol(); this.portName = config.getPortName(); this.portIdentifier = config.getPortIdentifier(); this.batchCount = config.getPreferredBatchCount(); this.batchSize = config.getPreferredBatchSize(); this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + this.httpProxy = config.getHttpProxy(); return this; } @@ -441,6 +447,16 @@ public interface SiteToSiteClient extends Closeable { } /** + * Specifies the protocol to use for site to site data transport. + * @param transportProtocol transport protocol + * @return the builder + */ + public Builder transportProtocol(final SiteToSiteTransportProtocol transportProtocol) { + this.transportProtocol = transportProtocol; + return this; + } + + /** * Specifies the name of the port to communicate with. Either the port * name or the port identifier must be specified. * @@ -521,7 +537,8 @@ public interface SiteToSiteClient extends Closeable { * data with remote instances of NiFi * * @throws IllegalStateException if either the url is not set or neither - * the port name nor port identifier is set. + * the port name nor port identifier is set, + * or if the transport protocol is not supported. */ public SiteToSiteClient build() { if (url == null) { @@ -532,7 +549,14 @@ public interface SiteToSiteClient extends Closeable { throw new IllegalStateException("Must specify either Port Name or Port Identifier to build Site-to-Site client"); } - return new SocketClient(buildConfig()); + switch (transportProtocol){ + case RAW: + return new SocketClient(buildConfig()); + case HTTP: + return new HttpClient(buildConfig()); + default: + throw new IllegalStateException("Transport protocol '" + transportProtocol + "' is not supported."); + } } /** @@ -600,6 +624,13 @@ public interface SiteToSiteClient extends Closeable { } /** + * @return the transport protocol to use, defaults to RAW + */ + public SiteToSiteTransportProtocol getTransportProtocol(){ + return transportProtocol; + } + + /** * @return the name of the port that the client is to communicate with */ public String getPortName() { @@ -613,6 +644,22 @@ public interface SiteToSiteClient extends Closeable { public String getPortIdentifier() { return portIdentifier; } + + + /** + * Specify a HTTP proxy information to use with HTTP protocol of Site-to-Site communication. + * @param httpProxy HTTP proxy information + * @return the builder + */ + public Builder httpProxy(final HttpProxy httpProxy) { + this.httpProxy = httpProxy; + return this; + } + + public HttpProxy getHttpProxy() { + return httpProxy; + } + } @@ -634,11 +681,13 @@ public interface SiteToSiteClient extends Closeable { private final EventReporter eventReporter; private final File peerPersistenceFile; private final boolean useCompression; + private final SiteToSiteTransportProtocol transportProtocol; private final String portName; private final String portIdentifier; private final int batchCount; private final long batchSize; private final long batchNanos; + private final HttpProxy httpProxy; // some serialization frameworks require a default constructor private StandardSiteToSiteClientConfig() { @@ -661,6 +710,8 @@ public interface SiteToSiteClient extends Closeable { this.batchCount = 0; this.batchSize = 0; this.batchNanos = 0; + this.transportProtocol = null; + this.httpProxy = null; } private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { @@ -683,6 +734,8 @@ public interface SiteToSiteClient extends Closeable { this.batchCount = builder.batchCount; this.batchSize = builder.batchSize; this.batchNanos = builder.batchNanos; + this.transportProtocol = builder.getTransportProtocol(); + this.httpProxy = builder.getHttpProxy(); } @Override @@ -830,5 +883,15 @@ public interface SiteToSiteClient extends Closeable { public KeystoreType getTruststoreType() { return truststoreType; } + + @Override + public SiteToSiteTransportProtocol getTransportProtocol() { + return transportProtocol; + } + + @Override + public HttpProxy getHttpProxy() { + return httpProxy; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 59891f0..65a7cfc 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -24,6 +24,8 @@ import javax.net.ssl.SSLContext; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; public interface SiteToSiteClientConfig extends Serializable { @@ -101,6 +103,11 @@ public interface SiteToSiteClientConfig extends Serializable { boolean isUseCompression(); /** + * @return a transport protocol to use + */ + SiteToSiteTransportProtocol getTransportProtocol(); + + /** * @return the name of the port that the client is to communicate with */ String getPortName(); @@ -146,4 +153,10 @@ public interface SiteToSiteClientConfig extends Serializable { */ EventReporter getEventReporter(); + /** + * Return Proxy for HTTP Transport Protocol. + * @return proxy or null if not specified + */ + HttpProxy getHttpProxy(); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java new file mode 100644 index 0000000..3312e88 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.http; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.AbstractSiteToSiteClient; +import org.apache.nifi.remote.client.PeerSelector; +import org.apache.nifi.remote.client.PeerStatusProvider; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.http.HttpCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.http.HttpClientTransaction; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.remote.PeerDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusProvider { + + private static final Logger logger = LoggerFactory.getLogger(HttpClient.class); + + private final ScheduledExecutorService taskExecutor; + private final PeerSelector peerSelector; + + public HttpClient(final SiteToSiteClientConfig config) { + super(config); + + peerSelector = new PeerSelector(this, config.getPeerPersistenceFile()); + peerSelector.setEventReporter(config.getEventReporter()); + + taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("Http Site-to-Site PeerSelector"); + thread.setDaemon(true); + return thread; + } + }); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + peerSelector.refreshPeers(); + } + }, 0, 5, TimeUnit.SECONDS); + + } + + @Override + public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException { + if (siteInfoProvider.getSiteToSiteHttpPort() == null) { + throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications"); + } + + final String scheme = siteInfoProvider.isSecure() ? "https" : "http"; + final URI clusterUrl; + try { + clusterUrl = new URI(config.getUrl()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e); + } + + try ( + SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()) + ) { + String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort()); + + int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); + apiClient.setConnectTimeoutMillis(timeoutMillis); + apiClient.setReadTimeoutMillis(timeoutMillis); + Collection<PeerDTO> peers = apiClient.getPeers(); + if(peers == null || peers.size() == 0){ + throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers."); + } + + return peers.stream() + .map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount())) + .collect(Collectors.toSet()); + } + } + + @Override + public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { + + int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); + + PeerStatus peerStatus; + while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) { + logger.debug("peerStatus={}", peerStatus); + + CommunicationsSession commSession = new HttpCommunicationsSession(); + String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription()); + commSession.setUri(nodeApiUrl); + String clusterUrl = config.getUrl(); + Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl); + + int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS); + String portId = config.getPortIdentifier(); + if (StringUtils.isEmpty(portId)) { + portId = siteInfoProvider.getPortIdentifier(config.getPortName(), direction); + if (StringUtils.isEmpty(portId)) { + peer.close(); + throw new IOException("Failed to determine the identifier of port " + config.getPortName()); + } + } + + SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()); + + apiClient.setBaseUrl(peer.getUrl()); + apiClient.setConnectTimeoutMillis(timeoutMillis); + apiClient.setReadTimeoutMillis(timeoutMillis); + + apiClient.setCompress(config.isUseCompression()); + apiClient.setRequestExpirationMillis(config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS)); + apiClient.setBatchCount(config.getPreferredBatchCount()); + apiClient.setBatchSize(config.getPreferredBatchSize()); + apiClient.setBatchDurationMillis(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS)); + + final String transactionUrl; + try { + transactionUrl = apiClient.initiateTransaction(direction, portId); + commSession.setUserDn(apiClient.getTrustedPeerDn()); + } catch (Exception e) { + logger.debug("Penalizing a peer due to {}", e.getMessage()); + peerSelector.penalize(peer, penaltyMillis); + + if (e instanceof UnknownPortException || e instanceof PortNotRunningException) { + throw e; + } + + logger.debug("Continue trying other peers..."); + continue; + } + + // We found a valid peer to communicate with. + Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion(); + HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction, + config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()); + transaction.initialize(apiClient, transactionUrl); + + return transaction; + } + + logger.info("Couldn't find a valid peer to communicate with."); + return null; + + } + + private String resolveNodeApiUrl(PeerDescription description) { + return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api"; + } + + @Override + public boolean isSecure() throws IOException { + return siteInfoProvider.isWebInterfaceSecure(); + } + + @Override + public void close() throws IOException { + taskExecutor.shutdown(); + peerSelector.clear(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java new file mode 100644 index 0000000..d0a6368 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.http; + +import org.apache.nifi.remote.StandardVersionNegotiator; + +public class TransportProtocolVersionNegotiator extends StandardVersionNegotiator { + + public TransportProtocolVersionNegotiator(final int... supportedVersions) { + super(supportedVersions); + } + + /** + * Returns a transaction protocol version for this transport protocol version. + * This method lets transport protocol to move forward independently from transaction protocol. + * @return a transaction protocol version + */ + public int getTransactionProtocolVersion() { + switch (getVersion()) { + case 1: + return 5; + default: + throw new RuntimeException("Transport protocol version " + getVersion() + + " was not configured with any transaction protocol version."); + } + } + +}
