http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index fa35f28..8a6a91f 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -16,29 +16,43 @@ */ package org.apache.nifi.remote.client.socket; -import java.io.BufferedReader; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.PeerSelector; +import org.apache.nifi.remote.client.PeerStatusProvider; +import org.apache.nifi.remote.client.SiteInfoProvider; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.security.cert.CertificateException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -49,125 +63,47 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; -import javax.net.ssl.SSLContext; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerDescription; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.RemoteDestination; -import org.apache.nifi.remote.RemoteResourceInitiator; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformation; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.TransmissionDisabledException; -import org.apache.nifi.remote.exception.UnknownPortException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; -import org.apache.nifi.remote.util.NiFiRestApiUtil; -import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.helpers.MessageFormatter; -public class EndpointConnectionPool { +import static org.apache.nifi.remote.util.EventReportUtil.error; +import static org.apache.nifi.remote.util.EventReportUtil.warn; - public static final long PEER_REFRESH_PERIOD = 60000L; - public static final String CATEGORY = "Site-to-Site"; - public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); - - private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); +public class EndpointConnectionPool implements PeerStatusProvider { private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); private final URI clusterUrl; - private final String apiUri; - - private final AtomicLong peerIndex = new AtomicLong(0L); - private final ReentrantLock peerRefreshLock = new ReentrantLock(); - private volatile List<PeerStatus> peerStatuses; - private volatile long peerRefreshTime = 0L; - private volatile PeerStatusCache peerStatusCache; - private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<EndpointConnection>()); + private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<>()); - private final File peersFile; private final EventReporter eventReporter; private final SSLContext sslContext; private final ScheduledExecutorService taskExecutor; private final int idleExpirationMillis; private final RemoteDestination remoteDestination; - private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); - private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); - private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); - private Integer siteToSitePort; - private Boolean siteToSiteSecure; - private long remoteRefreshTime; - private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier - private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier - private volatile int commsTimeout; private volatile boolean shutdown = false; - public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, - final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) { - this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile); - } + private final SiteInfoProvider siteInfoProvider; + private final PeerSelector peerSelector; - public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, - final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + public EndpointConnectionPool(final URI clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, + final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) { Objects.requireNonNull(clusterUrl, "URL cannot be null"); Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); - try { - this.clusterUrl = new URI(clusterUrl); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); - } - - // Trim the trailing / - String uriPath = this.clusterUrl.getPath(); - if (uriPath.endsWith("/")) { - uriPath = uriPath.substring(0, uriPath.length() - 1); - } - apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; + this.clusterUrl = clusterUrl; this.remoteDestination = remoteDestination; this.sslContext = sslContext; - this.peersFile = persistenceFile; this.eventReporter = eventReporter; this.commsTimeout = commsTimeoutMillis; this.idleExpirationMillis = idleExpirationMillis; - Set<PeerStatus> recoveredStatuses; - if (persistenceFile != null && persistenceFile.exists()) { - try { - recoveredStatuses = recoverPersistedPeerStatuses(peersFile); - this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); - } catch (final IOException ioe) { - logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); - } - } else { - peerStatusCache = null; - } + this.siteInfoProvider = siteInfoProvider; + + peerSelector = new PeerSelector(this, persistenceFile); + peerSelector.setEventReporter(eventReporter); // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused // connections and keep our list of peers up-to-date. @@ -186,7 +122,7 @@ public class EndpointConnectionPool { taskExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { - refreshPeers(); + peerSelector.refreshPeers(); } }, 0, 5, TimeUnit.SECONDS); @@ -198,38 +134,11 @@ public class EndpointConnectionPool { }, 5, 5, TimeUnit.SECONDS); } - void warn(final String msg, final Object... args) { - logger.warn(msg, args); - if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); - } - } - - void warn(final String msg, final Throwable t) { - logger.warn(msg, t); - - if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString()); - } - } - - void error(final String msg, final Object... args) { - logger.error(msg, args); - if (eventReporter != null) { - eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); - } - } - private String getPortIdentifier(final TransferDirection transferDirection) throws IOException { if (remoteDestination.getIdentifier() != null) { return remoteDestination.getIdentifier(); } - - if (transferDirection == TransferDirection.RECEIVE) { - return getOutputPortIdentifier(remoteDestination.getName()); - } else { - return getInputPortIdentifier(remoteDestination.getName()); - } + return siteInfoProvider.getPortIdentifier(remoteDestination.getName(), transferDirection); } public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException { @@ -250,7 +159,7 @@ public class EndpointConnectionPool { do { final List<EndpointConnection> addBack = new ArrayList<>(); logger.debug("{} getting next peer status", this); - final PeerStatus peerStatus = getNextPeerStatus(direction); + final PeerStatus peerStatus = peerSelector.getNextPeerStatus(direction); logger.debug("{} next peer status = {}", this, peerStatus); if (peerStatus == null) { return null; @@ -296,7 +205,7 @@ public class EndpointConnectionPool { logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus); commsSession = establishSiteToSiteConnection(peerStatus); } catch (final IOException ioe) { - penalize(peerStatus.getPeerDescription(), penalizationMillis); + peerSelector.penalize(peerStatus.getPeerDescription(), penalizationMillis); throw ioe; } @@ -334,7 +243,7 @@ public class EndpointConnectionPool { logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()); - penalize(peer, penalizationMillis); + peerSelector.penalize(peer, penalizationMillis); try { peer.close(); } catch (final IOException ioe) { @@ -342,11 +251,11 @@ public class EndpointConnectionPool { continue; } else if (protocol.isPortInvalid()) { - penalize(peer, penalizationMillis); + peerSelector.penalize(peer, penalizationMillis); cleanup(protocol, peer); throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running"); } else if (protocol.isPortUnknown()) { - penalize(peer, penalizationMillis); + peerSelector.penalize(peer, penalizationMillis); cleanup(protocol, peer); throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known"); } @@ -358,11 +267,11 @@ public class EndpointConnectionPool { } catch (final PortNotRunningException | UnknownPortException e) { throw e; } catch (final Exception e) { - penalize(peer, penalizationMillis); + peerSelector.penalize(peer, penalizationMillis); cleanup(protocol, peer); final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); - error(message); + error(logger, eventReporter, message); if (logger.isDebugEnabled()) { logger.error("", e); } @@ -427,27 +336,6 @@ public class EndpointConnectionPool { } } - private void penalize(final PeerDescription peerDescription, final long penalizationMillis) { - Long expiration = peerTimeoutExpirations.get(peerDescription); - if (expiration == null) { - expiration = Long.valueOf(0L); - } - - final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); - peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration)); - } - - /** - * Updates internal state map to penalize a PeerStatus that points to the - * specified peer - * - * @param peer the peer - * @param penalizationMillis period of time to penalize a given peer - */ - public void penalize(final Peer peer, final long penalizationMillis) { - penalize(peer.getDescription(), penalizationMillis); - } - private void cleanup(final SocketClientProtocol protocol, final Peer peer) { if (protocol != null && peer != null) { try { @@ -470,113 +358,11 @@ public class EndpointConnectionPool { } } - private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) { - return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); - } - - private PeerStatus getNextPeerStatus(final TransferDirection direction) { - List<PeerStatus> peerList = peerStatuses; - if (isPeerRefreshNeeded(peerList)) { - peerRefreshLock.lock(); - try { - // now that we have the lock, check again that we need to refresh (because another thread - // could have been refreshing while we were waiting for the lock). - peerList = peerStatuses; - if (isPeerRefreshNeeded(peerList)) { - try { - peerList = createPeerStatusList(direction); - } catch (final Exception e) { - final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); - warn(message); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - - if (eventReporter != null) { - eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); - } - } - - this.peerStatuses = peerList; - peerRefreshTime = System.currentTimeMillis(); - } - } finally { - peerRefreshLock.unlock(); - } - } - - if (peerList == null || peerList.isEmpty()) { - return null; - } - - PeerStatus peerStatus; - for (int i = 0; i < peerList.size(); i++) { - final long idx = peerIndex.getAndIncrement(); - final int listIndex = (int) (idx % peerList.size()); - peerStatus = peerList.get(listIndex); - - if (isPenalized(peerStatus)) { - logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); - } else { - return peerStatus; - } - } - - logger.debug("{} All peers appear to be penalized; returning null", this); - return null; - } - - private boolean isPenalized(final PeerStatus peerStatus) { - final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); - return (expirationEnd != null && expirationEnd > System.currentTimeMillis()); - } - - private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException { - Set<PeerStatus> statuses = getPeerStatuses(); - if (statuses == null) { - refreshPeers(); - statuses = getPeerStatuses(); - if (statuses == null) { - logger.debug("{} found no peers to connect to", this); - return Collections.emptyList(); - } - } - - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> nodeInfos = new ArrayList<>(); - for (final PeerStatus peerStatus : statuses) { - final PeerDescription description = peerStatus.getPeerDescription(); - final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount()); - nodeInfos.add(nodeInfo); - } - clusterNodeInfo.setNodeInformation(nodeInfos); - return formulateDestinationList(clusterNodeInfo, direction); - } - - private Set<PeerStatus> getPeerStatuses() { - final PeerStatusCache cache = this.peerStatusCache; - if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { - return null; - } - - if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { - final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); - for (final PeerStatus status : cache.getStatuses()) { - final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1); - equalizedSet.add(equalizedStatus); - } - - return equalizedSet; - } - - return cache.getStatuses(); - } - - private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException { + public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException { final String hostname = clusterUrl.getHost(); - final Integer port = getSiteToSitePort(); + final Integer port = siteInfoProvider.getSiteToSitePort(); if (port == null) { - throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications"); + throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); } final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://")); @@ -604,13 +390,12 @@ public class EndpointConnectionPool { } final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); - persistPeerStatuses(peerStatuses); try { clientProtocol.shutdown(peer); } catch (final IOException e) { final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); - warn(message); + warn(logger, eventReporter, message); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -620,7 +405,7 @@ public class EndpointConnectionPool { peer.close(); } catch (final IOException e) { final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); - warn(message); + warn(logger, eventReporter, message); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -629,60 +414,13 @@ public class EndpointConnectionPool { return peerStatuses; } - private void persistPeerStatuses(final Set<PeerStatus> statuses) { - if (peersFile == null) { - return; - } - - try (final OutputStream fos = new FileOutputStream(peersFile); - final OutputStream out = new BufferedOutputStream(fos)) { - - for (final PeerStatus status : statuses) { - final PeerDescription description = status.getPeerDescription(); - final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n"; - out.write(line.getBytes(StandardCharsets.UTF_8)); - } - - } catch (final IOException e) { - error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString()); - logger.error("", e); - } - } - - private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { - if (!file.exists()) { - return null; - } - - final Set<PeerStatus> statuses = new HashSet<>(); - try (final InputStream fis = new FileInputStream(file); - final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { - - String line; - while ((line = reader.readLine()) != null) { - final String[] splits = line.split(Pattern.quote(":")); - if (splits.length != 3) { - continue; - } - - final String hostname = splits[0]; - final int port = Integer.parseInt(splits[1]); - final boolean secure = Boolean.parseBoolean(splits[2]); - - statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1)); - } - } - - return statuses; - } - private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { final PeerDescription description = peerStatus.getPeerDescription(); return establishSiteToSiteConnection(description.getHostname(), description.getPort()); } private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { - final boolean siteToSiteSecure = isSecure(); + final boolean siteToSiteSecure = siteInfoProvider.isSecure(); final String destinationUri = "nifi://" + hostname + ":" + port; CommunicationsSession commsSession = null; @@ -724,66 +462,6 @@ public class EndpointConnectionPool { return commsSession; } - static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { - final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); - final int numDestinations = Math.max(128, nodeInfoSet.size()); - final Map<NodeInformation, Integer> entryCountMap = new HashMap<>(); - - long totalFlowFileCount = 0L; - for (final NodeInformation nodeInfo : nodeInfoSet) { - totalFlowFileCount += nodeInfo.getTotalFlowFiles(); - } - - int totalEntries = 0; - for (final NodeInformation nodeInfo : nodeInfoSet) { - final int flowFileCount = nodeInfo.getTotalFlowFiles(); - // don't allow any node to get more than 80% of the data - final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); - final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; - final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); - - entryCountMap.put(nodeInfo, Math.max(1, entries)); - totalEntries += entries; - } - - final List<PeerStatus> destinations = new ArrayList<>(totalEntries); - for (int i = 0; i < totalEntries; i++) { - destinations.add(null); - } - for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { - final NodeInformation nodeInfo = entry.getKey(); - final int numEntries = entry.getValue(); - - int skipIndex = numEntries; - for (int i = 0; i < numEntries; i++) { - int n = (skipIndex * i); - while (true) { - final int index = n % destinations.size(); - PeerStatus status = destinations.get(index); - if (status == null) { - final PeerDescription description = new PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); - status = new PeerStatus(description, nodeInfo.getTotalFlowFiles()); - destinations.set(index, status); - break; - } else { - n++; - } - } - } - } - - final StringBuilder distributionDescription = new StringBuilder(); - distributionDescription.append("New Weighted Distribution of Nodes:"); - for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) { - final double percentage = entry.getValue() * 100D / destinations.size(); - distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data"); - } - logger.info(distributionDescription.toString()); - - // Jumble the list of destinations. - return destinations; - } - private void cleanupExpiredSockets() { for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) { final List<EndpointConnection> connections = new ArrayList<>(); @@ -813,7 +491,7 @@ public class EndpointConnectionPool { public void shutdown() { shutdown = true; taskExecutor.shutdown(); - peerTimeoutExpirations.clear(); + peerSelector.clear(); for (final EndpointConnection conn : activeConnections) { conn.getPeer().getCommunicationsSession().interrupt(); @@ -832,138 +510,11 @@ public class EndpointConnectionPool { cleanup(connection.getSocketClientProtocol(), connection.getPeer()); } - private void refreshPeers() { - final PeerStatusCache existingCache = peerStatusCache; - if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { - return; - } - - try { - final Set<PeerStatus> statuses = fetchRemotePeerStatuses(); - peerStatusCache = new PeerStatusCache(statuses); - logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); - } catch (Exception e) { - warn("{} Unable to refresh Remote Group's peers due to {}", this, e); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - } - } - - public String getInputPortIdentifier(final String portName) throws IOException { - return getPortIdentifier(portName, inputPortMap); - } - - public String getOutputPortIdentifier(final String portName) throws IOException { - return getPortIdentifier(portName, outputPortMap); - } - - private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { - String identifier; - remoteInfoReadLock.lock(); - try { - identifier = portMap.get(portName); - } finally { - remoteInfoReadLock.unlock(); - } - - if (identifier != null) { - return identifier; - } - - refreshRemoteInfo(); - - remoteInfoReadLock.lock(); - try { - return portMap.get(portName); - } finally { - remoteInfoReadLock.unlock(); - } - } - - private ControllerDTO refreshRemoteInfo() throws IOException { - final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); - final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null); - final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout); - - remoteInfoWriteLock.lock(); - try { - this.siteToSitePort = controller.getRemoteSiteListeningPort(); - this.siteToSiteSecure = controller.isSiteToSiteSecure(); - - inputPortMap.clear(); - for (final PortDTO inputPort : controller.getInputPorts()) { - inputPortMap.put(inputPort.getName(), inputPort.getId()); - } - - outputPortMap.clear(); - for (final PortDTO outputPort : controller.getOutputPorts()) { - outputPortMap.put(outputPort.getName(), outputPort.getId()); - } - - this.remoteRefreshTime = System.currentTimeMillis(); - } finally { - remoteInfoWriteLock.unlock(); - } - - return controller; - } - - /** - * @return the port that the remote instance is listening on for - * site-to-site communication, or <code>null</code> if the remote instance - * is not configured to allow site-to-site communications. - * - * @throws IOException if unable to communicate with the remote instance - */ - private Integer getSiteToSitePort() throws IOException { - Integer listeningPort; - remoteInfoReadLock.lock(); - try { - listeningPort = this.siteToSitePort; - if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { - return listeningPort; - } - } finally { - remoteInfoReadLock.unlock(); - } - - final ControllerDTO controller = refreshRemoteInfo(); - listeningPort = controller.getRemoteSiteListeningPort(); - - return listeningPort; - } - @Override public String toString() { return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]"; } - /** - * @return {@code true} if the remote instance is configured for secure - * site-to-site communications, {@code false} otherwise - * @throws IOException if unable to check if secure - */ - public boolean isSecure() throws IOException { - remoteInfoReadLock.lock(); - try { - final Boolean secure = this.siteToSiteSecure; - if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { - return secure; - } - } finally { - remoteInfoReadLock.unlock(); - } - - final ControllerDTO controller = refreshRemoteInfo(); - final Boolean isSecure = controller.isSiteToSiteSecure(); - if (isSecure == null) { - throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); - } - - return isSecure; - } - private class IdEnrichedRemoteDestination implements RemoteDestination { private final RemoteDestination original; @@ -994,4 +545,6 @@ public class EndpointConnectionPool { return original.isUseCompression(); } } + + }
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 33e4a66..d04234f 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -16,27 +16,26 @@ */ package org.apache.nifi.remote.client.socket; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.remote.Communicant; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.AbstractSiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.util.ObjectHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SocketClient implements SiteToSiteClient { +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class SocketClient extends AbstractSiteToSiteClient { private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); - private final SiteToSiteClientConfig config; private final EndpointConnectionPool pool; private final boolean compress; private final String portName; @@ -45,13 +44,17 @@ public class SocketClient implements SiteToSiteClient { private volatile boolean closed = false; public SocketClient(final SiteToSiteClientConfig config) { - pool = new EndpointConnectionPool(config.getUrl(), + super(config); + + final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS); + pool = new EndpointConnectionPool(clusterUrl, createRemoteDestination(config.getPortIdentifier(), config.getPortName()), - (int) config.getTimeout(TimeUnit.MILLISECONDS), + commsTimeout, (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), - config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); + config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(), + siteInfoProvider + ); - this.config = config; this.compress = config.isUseCompression(); this.portIdentifier = config.getPortIdentifier(); this.portName = config.getPortName(); @@ -59,13 +62,8 @@ public class SocketClient implements SiteToSiteClient { } @Override - public SiteToSiteClientConfig getConfig() { - return config; - } - - @Override public boolean isSecure() throws IOException { - return pool.isSecure(); + return siteInfoProvider.isSecure(); } private String getPortIdentifier(final TransferDirection direction) throws IOException { @@ -76,9 +74,9 @@ public class SocketClient implements SiteToSiteClient { final String portId; if (direction == TransferDirection.SEND) { - portId = pool.getInputPortIdentifier(this.portName); + portId = siteInfoProvider.getInputPortIdentifier(this.portName); } else { - portId = pool.getOutputPortIdentifier(this.portName); + portId = siteInfoProvider.getOutputPortIdentifier(this.portName); } if (portId == null) { @@ -142,7 +140,7 @@ public class SocketClient implements SiteToSiteClient { // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever // the transaction is either completed or canceled. - final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState); + final AtomicReference<EndpointConnection> connectionStateRef = new AtomicReference<>(connectionState); return new Transaction() { @Override public void confirm() throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java index 6ca5812..61df4cf 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java @@ -20,6 +20,8 @@ public class AdaptedNodeInformation { private String hostname; private Integer siteToSitePort; + private Integer siteToSiteHttpApiPort; + private int apiPort; private boolean isSiteToSiteSecure; private int totalFlowFiles; @@ -63,4 +65,13 @@ public class AdaptedNodeInformation { public void setTotalFlowFiles(int totalFlowFiles) { this.totalFlowFiles = totalFlowFiles; } + + public Integer getSiteToSiteHttpApiPort() { + return siteToSiteHttpApiPort; + } + + public void setSiteToSiteHttpApiPort(Integer siteToSiteHttpApiPort) { + this.siteToSiteHttpApiPort = siteToSiteHttpApiPort; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java index abfcc85..c348e13 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java @@ -20,14 +20,16 @@ public class NodeInformation { private final String siteToSiteHostname; private final Integer siteToSitePort; + private final Integer siteToSiteHttpApiPort; private final int apiPort; private final boolean isSiteToSiteSecure; private final int totalFlowFiles; - public NodeInformation(final String siteToSiteHostname, final Integer siteToSitePort, final int apiPort, - final boolean isSiteToSiteSecure, final int totalFlowFiles) { + public NodeInformation(final String siteToSiteHostname, final Integer siteToSitePort, final Integer siteToSiteHttpApiPort, + final int apiPort, final boolean isSiteToSiteSecure, final int totalFlowFiles) { this.siteToSiteHostname = siteToSiteHostname; this.siteToSitePort = siteToSitePort; + this.siteToSiteHttpApiPort = siteToSiteHttpApiPort; this.apiPort = apiPort; this.isSiteToSiteSecure = isSiteToSiteSecure; this.totalFlowFiles = totalFlowFiles; @@ -45,6 +47,10 @@ public class NodeInformation { return siteToSitePort; } + public Integer getSiteToSiteHttpApiPort() { + return siteToSiteHttpApiPort; + } + public boolean isSiteToSiteSecure() { return isSiteToSiteSecure; } @@ -77,6 +83,16 @@ public class NodeInformation { } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { return false; } + + if (siteToSiteHttpApiPort == null && other.siteToSiteHttpApiPort != null) { + return false; + } + if (siteToSiteHttpApiPort != null && other.siteToSiteHttpApiPort == null) { + return false; + } else if (siteToSiteHttpApiPort != null && siteToSiteHttpApiPort.intValue() != other.siteToSiteHttpApiPort.intValue()) { + return false; + } + if (apiPort != other.apiPort) { return false; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java index b2dead0..c17849e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java @@ -24,7 +24,8 @@ public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, N @Override public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { - return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); + return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getSiteToSiteHttpApiPort(), + adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); } @Override @@ -32,6 +33,7 @@ public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, N final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); adapted.setHostname(nodeInformation.getSiteToSiteHostname()); adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); + adapted.setSiteToSiteHttpApiPort(nodeInformation.getSiteToSiteHttpApiPort()); adapted.setApiPort(nodeInformation.getAPIPort()); adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java index 198aaef..ec20e50 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.remote.exception; +import org.apache.nifi.remote.protocol.ResponseCode; + import java.io.IOException; /** @@ -28,11 +30,24 @@ public class HandshakeException extends IOException { private static final long serialVersionUID = 178192341908726L; + private final ResponseCode responseCode; + public HandshakeException(final String message) { super(message); + this.responseCode = null; } public HandshakeException(final Throwable cause) { super(cause); + this.responseCode = null; + } + + public HandshakeException(final ResponseCode responseCode, final String message) { + super(message); + this.responseCode = responseCode; + } + + public ResponseCode getResponseCode() { + return responseCode; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java new file mode 100644 index 0000000..d561833 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.http; + +import org.apache.nifi.remote.AbstractCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsInput; +import org.apache.nifi.remote.protocol.CommunicationsOutput; + +import java.io.IOException; + +public class HttpCommunicationsSession extends AbstractCommunicationsSession { + + protected int timeout = 30000; + + protected final HttpInput input; + protected final HttpOutput output; + protected String checksum; + + public HttpCommunicationsSession(){ + super(null); + this.input = new HttpInput(); + this.output = new HttpOutput(); + } + + @Override + public void setTimeout(final int millis) throws IOException { + this.timeout = millis; + } + + @Override + public int getTimeout() throws IOException { + return timeout; + } + + @Override + public CommunicationsInput getInput() { + return input; + } + + @Override + public CommunicationsOutput getOutput() { + return output; + } + + @Override + public boolean isDataAvailable() { + return false; + } + + @Override + public long getBytesWritten() { + return output.getBytesWritten(); + } + + @Override + public long getBytesRead() { + return input.getBytesRead(); + } + + @Override + public void interrupt() { + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() throws IOException { + + } + + public String getChecksum() { + return checksum; + } + + public void setChecksum(String checksum) { + this.checksum = checksum; + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java new file mode 100644 index 0000000..5048306 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.http; + +import org.apache.nifi.remote.protocol.CommunicationsInput; +import org.apache.nifi.stream.io.ByteCountingInputStream; + +import java.io.IOException; +import java.io.InputStream; + +public class HttpInput implements CommunicationsInput { + + private ByteCountingInputStream countingIn; + + @Override + public InputStream getInputStream() throws IOException { + return countingIn; + } + + @Override + public long getBytesRead() { + if (countingIn != null) { + return countingIn.getBytesRead(); + } + return 0L; + } + + @Override + public void consume() throws IOException { + if (countingIn == null) { + return; + } + + final byte[] b = new byte[4096]; + int bytesRead; + do { + bytesRead = countingIn.read(b); + } while (bytesRead > 0); + } + + public void setInputStream(InputStream inputStream) { + this.countingIn = new ByteCountingInputStream(inputStream); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java new file mode 100644 index 0000000..b78be18 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.http; + +import org.apache.nifi.remote.protocol.CommunicationsOutput; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class HttpOutput implements CommunicationsOutput { + + private ByteCountingOutputStream countingOut; + + @Override + public OutputStream getOutputStream() throws IOException { + return countingOut; + } + + @Override + public long getBytesWritten() { + if (countingOut != null) { + return countingOut.getBytesWritten(); + } + return 0L; + } + + public void setOutputStream(OutputStream outputStream) { + this.countingOut = new ByteCountingOutputStream(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java new file mode 100644 index 0000000..ae12c67 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.http; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.protocol.HandshakeProperty; +import org.apache.nifi.remote.protocol.ResponseCode; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +public class HttpServerCommunicationsSession extends HttpCommunicationsSession { + + private final Map<String, String> handshakeParams = new HashMap<>(); + private final String transactionId; + private Transaction.TransactionState status = Transaction.TransactionState.TRANSACTION_STARTED; + private ResponseCode responseCode; + + public HttpServerCommunicationsSession(InputStream inputStream, OutputStream outputStream, String transactionId){ + super(); + input.setInputStream(inputStream); + output.setOutputStream(outputStream); + this.transactionId = transactionId; + } + + // This status is only needed by HttpFlowFileServerProtocol, HttpClientTransaction has its own status. + // Because multiple HttpFlowFileServerProtocol instances have to carry on a single transaction + // throughout multiple HTTP requests, status has to be embedded here. + public Transaction.TransactionState getStatus() { + return status; + } + + public void setStatus(Transaction.TransactionState status) { + this.status = status; + } + + public String getTransactionId() { + return transactionId; + } + + public ResponseCode getResponseCode() { + return responseCode; + } + + public void setResponseCode(ResponseCode responseCode) { + this.responseCode = responseCode; + } + + public void putHandshakeParam(HandshakeProperty key, String value) { + handshakeParams.put(key.name(), value); + } + + public Map<String, String> getHandshakeParams() { + return handshakeParams; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index 2efea11..1f21faf 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -16,11 +16,6 @@ */ package org.apache.nifi.remote.protocol; -import java.io.IOException; -import java.util.Set; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.Transaction; @@ -32,6 +27,9 @@ import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; +import java.io.IOException; +import java.util.Set; + public interface ClientProtocol extends VersionedRemoteResource { void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; @@ -40,14 +38,8 @@ public interface ClientProtocol extends VersionedRemoteResource { FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; - int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - void shutdown(Peer peer) throws IOException, ProtocolException; - boolean isReadyForFileTransfer(); - Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java new file mode 100644 index 0000000..34274ae --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +/** + * Enumeration of Properties that can be used for the Site-to-Site Socket + * Protocol. + */ +public enum HandshakeProperty { + + /** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ + GZIP, + /** + * The unique identifier of the port to communicate with + */ + PORT_IDENTIFIER, + /** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ + REQUEST_EXPIRATION_MILLIS, + /** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ + BATCH_COUNT, + /** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ + BATCH_SIZE, + /** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ + BATCH_DURATION; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java new file mode 100644 index 0000000..0dcd1d0 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.nifi.remote.exception.ProtocolException; + +public class Response { + + private final ResponseCode code; + private final String message; + + private Response(final ResponseCode code, final String explanation) { + this.code = code; + this.message = explanation; + } + + public ResponseCode getCode() { + return code; + } + + public String getMessage() { + return message; + } + + public static Response read(final DataInputStream in) throws IOException, ProtocolException { + final ResponseCode code = ResponseCode.readCode(in); + final String message = code.containsMessage() ? in.readUTF() : null; + return new Response(code, message); + } + + @Override + public String toString() { + return code + ": " + message; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java new file mode 100644 index 0000000..18594e7 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import org.apache.nifi.remote.exception.ProtocolException; + +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public enum ResponseCode { + + RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of + // ResponseCode, so that we can indicate a 0 followed by some other bytes + + // handshaking properties + PROPERTIES_OK(1, "Properties OK", false), + UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), + ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), + MISSING_PROPERTY(232, "Missing Property", true), + // transaction indicators + CONTINUE_TRANSACTION(10, "Continue Transaction", false), + FINISH_TRANSACTION(11, "Finish Transaction", false), + CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum + TRANSACTION_FINISHED(13, "Transaction Finished", false), + TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), + CANCEL_TRANSACTION(15, "Cancel Transaction", true), + BAD_CHECKSUM(19, "Bad Checksum", false), + // data availability indicators + MORE_DATA(20, "More Data Exists", false), + NO_MORE_DATA(21, "No More Data Exists", false), + // port state indicators + UNKNOWN_PORT(200, "Unknown Port", false), + PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), + PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), + // authorization + UNAUTHORIZED(240, "User Not Authorized", true), + // error indicators + ABORT(250, "Abort", true), + UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), + END_OF_STREAM(255, "End of Stream", false); + + private static final ResponseCode[] codeArray = new ResponseCode[256]; + + static { + for (final ResponseCode responseCode : ResponseCode.values()) { + codeArray[responseCode.getCode()] = responseCode; + } + } + + private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; + private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; + private final int code; + private final byte[] codeSequence; + private final String description; + private final boolean containsMessage; + + private ResponseCode(final int code, final String description, final boolean containsMessage) { + this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; + this.code = code; + this.description = description; + this.containsMessage = containsMessage; + } + + public int getCode() { + return code; + } + + public byte[] getCodeSequence() { + return codeSequence; + } + + @Override + public String toString() { + return description; + } + + public boolean containsMessage() { + return containsMessage; + } + + public void writeResponse(final DataOutputStream out) throws IOException { + if (containsMessage()) { + throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); + } + + out.write(getCodeSequence()); + out.flush(); + } + + public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { + if (!containsMessage()) { + throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); + } + + out.write(getCodeSequence()); + out.writeUTF(explanation); + out.flush(); + } + + static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { + final int byte1 = in.read(); + if (byte1 < 0) { + throw new EOFException(); + } else if (byte1 != CODE_SEQUENCE_VALUE_1) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte2 = in.read(); + if (byte2 < 0) { + throw new EOFException(); + } else if (byte2 != CODE_SEQUENCE_VALUE_2) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte3 = in.read(); + if (byte3 < 0) { + throw new EOFException(); + } + + final ResponseCode responseCode = codeArray[byte3]; + if (responseCode == null) { + throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); + } + return responseCode; + } + + public static ResponseCode fromSequence(final byte[] value) { + final int code = value[3] & 0xFF; + return fromCode(code); + } + + public static ResponseCode fromCode(final int code) { + final ResponseCode responseCode = codeArray[code]; + return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java new file mode 100644 index 0000000..b67506d --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +public enum SiteToSiteTransportProtocol { + RAW, + HTTP +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java new file mode 100644 index 0000000..d4085ca --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.AbstractTransaction; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.io.http.HttpCommunicationsSession; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.web.api.entity.TransactionResultEntity; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class HttpClientTransaction extends AbstractTransaction { + + private SiteToSiteRestApiClient apiClient; + private String transactionUrl; + + public HttpClientTransaction(final int protocolVersion, final Peer peer, TransferDirection direction, + final boolean useCompression, final String portId, int penaltyMillis, EventReporter eventReporter) throws IOException { + super(peer, direction, useCompression, new StandardFlowFileCodec(), eventReporter, protocolVersion, penaltyMillis, portId); + } + + public void initialize(SiteToSiteRestApiClient apiUtil, String transactionUrl) throws IOException { + this.transactionUrl = transactionUrl; + this.apiClient = apiUtil; + if(TransferDirection.RECEIVE.equals(direction)){ + dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer.getCommunicationsSession()); + } else { + apiUtil.openConnectionForSend(transactionUrl, peer.getCommunicationsSession()); + } + } + + @Override + protected Response readTransactionResponse() throws IOException { + HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + if(TransferDirection.RECEIVE.equals(direction)){ + switch (state){ + case TRANSACTION_STARTED: + case DATA_EXCHANGED: + logger.debug("{} {} readTransactionResponse. checksum={}", this, peer, commSession.getChecksum()); + if(StringUtils.isEmpty(commSession.getChecksum())){ + // We don't know if there's more data to receive, so just continue it. + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } else { + // We got a checksum to send to server. + if (TransactionState.TRANSACTION_STARTED.equals(state)) { + logger.debug("{} {} There's no transaction to confirm.", this, peer); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + TransactionResultEntity transactionResult + = apiClient.commitReceivingFlowFiles(transactionUrl, ResponseCode.CONFIRM_TRANSACTION, commSession.getChecksum()); + ResponseCode responseCode = ResponseCode.fromCode(transactionResult.getResponseCode()); + if(responseCode.containsMessage()){ + String message = transactionResult.getMessage(); + responseCode.writeResponse(dos, message == null ? "" : message); + } else { + responseCode.writeResponse(dos); + } + } + } + break; + } + } else { + switch (state){ + case DATA_EXCHANGED: + // Some flow files have been sent via stream, finish transferring. + apiClient.finishTransferFlowFiles(commSession); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, commSession.getChecksum()); + break; + case TRANSACTION_CONFIRMED: + TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.CONFIRM_TRANSACTION); + ResponseCode responseCode = ResponseCode.fromCode(resultEntity.getResponseCode()); + if(responseCode.containsMessage()){ + responseCode.writeResponse(dos, resultEntity.getMessage()); + } else { + responseCode.writeResponse(dos); + } + break; + } + } + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + return Response.read(new DataInputStream(bis)); + } + + @Override + protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException { + HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession(); + if(TransferDirection.RECEIVE.equals(direction)){ + switch (response) { + case CONFIRM_TRANSACTION: + logger.debug("{} Confirming transaction. checksum={}", this, explanation); + commSession.setChecksum(explanation); + break; + case TRANSACTION_FINISHED: + logger.debug("{} Finishing transaction.", this); + break; + case CANCEL_TRANSACTION: + logger.debug("{} Canceling transaction. explanation={}", this, explanation); + TransactionResultEntity resultEntity = apiClient.commitReceivingFlowFiles(transactionUrl, ResponseCode.CANCEL_TRANSACTION, null); + ResponseCode cancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode()); + switch (cancelResponse) { + case CANCEL_TRANSACTION: + logger.debug("{} CANCEL_TRANSACTION, The transaction is canceled on server properly.", this); + break; + default: + logger.warn("{} CANCEL_TRANSACTION, Expected the transaction is canceled on server, but received {}.", this, cancelResponse); + break; + } + break; + } + } else { + switch (response) { + case FINISH_TRANSACTION: + // The actual HTTP request will be sent in readTransactionResponse. + logger.debug("{} Finished sending flow files.", this); + break; + case BAD_CHECKSUM: { + TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.BAD_CHECKSUM); + ResponseCode badChecksumCancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode()); + switch (badChecksumCancelResponse) { + case CANCEL_TRANSACTION: + logger.debug("{} BAD_CHECKSUM, The transaction is canceled on server properly.", this); + break; + default: + logger.warn("{} BAD_CHECKSUM, Expected the transaction is canceled on server, but received {}.", this, badChecksumCancelResponse); + break; + } + + } + break; + case CONFIRM_TRANSACTION: + // The actual HTTP request will be sent in readTransactionResponse. + logger.debug("{} Transaction is confirmed.", this); + break; + case CANCEL_TRANSACTION: { + logger.debug("{} Canceling transaction.", this); + TransactionResultEntity resultEntity = apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.CANCEL_TRANSACTION); + ResponseCode cancelResponse = ResponseCode.fromCode(resultEntity.getResponseCode()); + switch (cancelResponse) { + case CANCEL_TRANSACTION: + logger.debug("{} CANCEL_TRANSACTION, The transaction is canceled on server properly.", this); + break; + default: + logger.warn("{} CANCEL_TRANSACTION, Expected the transaction is canceled on server, but received {}.", this, cancelResponse); + break; + } + } + break; + } + } + } + + + @Override + protected void close() throws IOException { + if (apiClient != null) { + apiClient.close(); + } + } +} + http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java new file mode 100644 index 0000000..d15b335 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +public class HttpHeaders { + + public static final String LOCATION_HEADER_NAME = "Location"; + public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent"; + public static final String LOCATION_URI_INTENT_VALUE = "transaction-url"; + + public static final String ACCEPT_ENCODING = "Accept-Encoding"; + public static final String CONTENT_ENCODING = "Content-Encoding"; + public static final String PROTOCOL_VERSION = "x-nifi-site-to-site-protocol-version"; + public static final String SERVER_SIDE_TRANSACTION_TTL = "x-nifi-site-to-site-server-transaction-ttl"; + public static final String HANDSHAKE_PROPERTY_USE_COMPRESSION = "x-nifi-site-to-site-use-compression"; + public static final String HANDSHAKE_PROPERTY_REQUEST_EXPIRATION = "x-nifi-site-to-site-request-expiration"; + public static final String HANDSHAKE_PROPERTY_BATCH_COUNT = "x-nifi-site-to-site-batch-count"; + public static final String HANDSHAKE_PROPERTY_BATCH_SIZE = "x-nifi-site-to-site-batch-size"; + public static final String HANDSHAKE_PROPERTY_BATCH_DURATION = "x-nifi-site-to-site-batch-duration"; + +}
