http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 809147e..a5d4bbe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -16,6 +16,19 @@ */ package org.apache.nifi.remote; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.protocol.ServerProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -30,24 +43,9 @@ import java.net.SocketTimeoutException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; - -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.remote.cluster.NodeInformant; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.remote.protocol.ServerProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class SocketRemoteSiteListener implements RemoteSiteListener { public static final String DEFAULT_FLOWFILE_PATH = "./"; @@ -261,11 +259,11 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { break; case RECEIVE_FLOWFILES: // peer wants to receive FlowFiles, so we will transfer FlowFiles. - protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>()); + protocol.getPort().transferFlowFiles(peer, protocol); break; case SEND_FLOWFILES: // Peer wants to send FlowFiles, so we will receive. - protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, String>()); + protocol.getPort().receiveFlowFiles(peer, protocol); break; case REQUEST_PEER_LIST: protocol.sendPeerList(peer);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 02a44b7..3f59b50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -47,6 +47,7 @@ import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -137,10 +138,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { .url(remoteGroup.getTargetUri().toString()) .portIdentifier(getIdentifier()) .sslContext(sslContext) + .useCompression(isUseCompression()) .eventReporter(remoteGroup.getEventReporter()) .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) .nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS) .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .transportProtocol(remoteGroup.getTransportProtocol()) + .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword())) .build(); clientRef.set(client); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 7507935..58f4804 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -50,7 +50,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -236,7 +235,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort return; } - session.commit(); + // TODO: Comfirm this. Session.commit here is not required since it has been committed inside receiveFlowFiles/transferFlowFiles. + // session.commit(); responseQueue.add(new ProcessingResult(transferCount)); } @@ -451,7 +451,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } @Override - public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) + public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException { if (getConnectableType() != ConnectableType.INPUT_PORT) { throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port"); @@ -505,7 +505,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } @Override - public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) + public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException { if (getConnectableType() != ConnectableType.OUTPUT_PORT) { throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port"); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java new file mode 100644 index 0000000..43428e0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PortAuthorizationResult; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { + + protected ProcessGroup rootGroup; + protected RootGroupPort port; + + protected boolean handshakeCompleted; + protected boolean shutdown = false; + protected FlowFileCodec negotiatedFlowFileCodec = null; + + protected HandshakenProperties handshakenProperties; + + protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public void setRootProcessGroup(final ProcessGroup group) { + if (!group.isRootGroup()) { + throw new IllegalArgumentException("Specified group was not a root group."); + } + this.rootGroup = group; + } + + @Override + public boolean isHandshakeSuccessful() { + return handshakeCompleted; + } + + protected void validateHandshakeRequest(HandshakenProperties confirmed, final Peer peer, final Map<String, String> properties) throws HandshakeException { + Boolean useGzip = null; + for (final Map.Entry<String, String> entry : properties.entrySet()) { + final String propertyName = entry.getKey(); + final String value = entry.getValue(); + + final HandshakeProperty property; + try { + property = HandshakeProperty.valueOf(propertyName); + } catch (final Exception e) { + throw new HandshakeException(ResponseCode.UNKNOWN_PROPERTY_NAME, "Received unknown property: " + propertyName); + } + + try { + switch (property) { + case GZIP: { + useGzip = Boolean.parseBoolean(value); + confirmed.setUseGzip(useGzip); + break; + } + case REQUEST_EXPIRATION_MILLIS: + confirmed.setExpirationMillis(Long.parseLong(value)); + break; + case BATCH_COUNT: + confirmed.setBatchCount(Integer.parseInt(value)); + break; + case BATCH_SIZE: + confirmed.setBatchBytes(Long.parseLong(value)); + break; + case BATCH_DURATION: + confirmed.setBatchDurationNanos(TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value))); + break; + case PORT_IDENTIFIER: { + checkPortStatus(peer, value); + } + } + } catch (final NumberFormatException nfe) { + throw new HandshakeException(ResponseCode.ILLEGAL_PROPERTY_VALUE, "Received invalid value for property '" + property + "'; invalid value: " + value); + } + } + + if (useGzip == null) { + logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing"); + throw new HandshakeException(ResponseCode.MISSING_PROPERTY, "Missing Property " + HandshakeProperty.GZIP.name()); + } + + } + + protected void checkPortStatus(final Peer peer, String portId) throws HandshakeException { + Port receivedPort = rootGroup.getInputPort(portId); + if (receivedPort == null) { + receivedPort = rootGroup.getOutputPort(portId); + } + if (receivedPort == null) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", portId); + throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received unknown port identifier: " + portId); + } + if (!(receivedPort instanceof RootGroupPort)) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", portId); + throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received port identifier " + portId + ", but this Port is not a RootGroupPort"); + } + + this.port = (RootGroupPort) receivedPort; + final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); + if (!portAuthResult.isAuthorized()) { + logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); + throw new HandshakeException(ResponseCode.UNAUTHORIZED, portAuthResult.getExplanation()); + } + + if (!receivedPort.isValid()) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, "Port is not valid"); + } + + if (!receivedPort.isRunning()) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, "Port not running"); + } + + // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this + // we we will simply not service the request but the sender will timeout + if (getVersionNegotiator().getVersion() > 1) { + for (final Connection connection : port.getConnections()) { + if (connection.getFlowFileQueue().isFull()) { + logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", port); + throw new HandshakeException(ResponseCode.PORTS_DESTINATION_FULL, "Received port identifier " + portId + ", but its destination is full"); + } + } + } + + } + + @Override + public RootGroupPort getPort() { + return port; + } + + @Override + public FlowFileCodec getPreNegotiatedCodec() { + return negotiatedFlowFileCodec; + } + + @Override + public final void handshake(final Peer peer) throws IOException, HandshakeException { + if (handshakeCompleted) { + throw new IllegalStateException("Handshake has already been completed"); + } + if (shutdown) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Handshaking with {}", this, peer); + + this.handshakenProperties = doHandshake(peer); + + logger.debug("{} Finished handshake with {}", this, peer); + handshakeCompleted = true; + } + + abstract protected HandshakenProperties doHandshake(final Peer peer) throws IOException, HandshakeException; + + @Override + public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if (!handshakeCompleted) { + throw new IllegalStateException("Handshake has not been completed"); + } + if (shutdown) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Sending FlowFiles to {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + String remoteDn = commsSession.getUserDn(); + if (remoteDn == null) { + remoteDn = "none"; + } + + FlowFile flowFile = session.get(); + if (flowFile == null) { + // we have no data to send. Notify the peer. + logger.debug("{} No data to send to {}", this, peer); + writeTransactionResponse(true, ResponseCode.NO_MORE_DATA, commsSession); + return 0; + } + + // we have data to send. + logger.debug("{} Data is available to send to {}", this, peer); + writeTransactionResponse(true, ResponseCode.MORE_DATA, commsSession); + + final StopWatch stopWatch = new StopWatch(true); + long bytesSent = 0L; + final Set<FlowFile> flowFilesSent = new HashSet<>(); + final CRC32 crc = new CRC32(); + + // send data until we reach some batch size + boolean continueTransaction = true; + final long startNanos = System.nanoTime(); + String calculatedCRC = ""; + OutputStream os = new DataOutputStream(commsSession.getOutput().getOutputStream()); + while (continueTransaction) { + final boolean useGzip = handshakenProperties.isUseGzip(); + final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(os) : os; + logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer}); + + final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); + + final StopWatch transferWatch = new StopWatch(true); + + final FlowFile toSend = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); + codec.encode(dataPacket, checkedOutputStream); + } + }); + + final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if (useGzip) { + checkedOutputStream.close(); + } + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + + String transitUriPrefix = handshakenProperties.getTransitUriPrefix(); + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); + session.remove(flowFile); + + // determine if we should check for more data on queue. + final long sendingNanos = System.nanoTime() - startNanos; + boolean poll = true; + double batchDurationNanos = handshakenProperties.getBatchDurationNanos(); + if (sendingNanos >= batchDurationNanos && batchDurationNanos > 0L) { + poll = false; + } + double batchBytes = handshakenProperties.getBatchBytes(); + if (bytesSent >= batchBytes && batchBytes > 0L) { + poll = false; + } + double batchCount = handshakenProperties.getBatchCount(); + if (flowFilesSent.size() >= batchCount && batchCount > 0) { + poll = false; + } + + if (batchDurationNanos == 0 && batchBytes == 0 && batchCount == 0) { + poll = (sendingNanos < DEFAULT_BATCH_NANOS); + } + + if (poll) { + // we've not elapsed the requested sending duration, so get more data. + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + if (continueTransaction) { + logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer); + writeTransactionResponse(true, ResponseCode.CONTINUE_TRANSACTION, commsSession); + } else { + logger.debug("{} Sending FinishTransaction indicator to {}", this, peer); + writeTransactionResponse(true, ResponseCode.FINISH_TRANSACTION, commsSession); + calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue()); + } + } + + FlowFileTransaction transaction = new FlowFileTransaction(session, context, stopWatch, bytesSent, flowFilesSent, calculatedCRC); + return commitTransferTransaction(peer, transaction); + + } + + protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException { + ProcessSession session = transaction.getSession(); + Set<FlowFile> flowFilesSent = transaction.getFlowFilesSent(); + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + CommunicationsSession commsSession = peer.getCommunicationsSession(); + final Response transactionConfirmationResponse = readTransactionResponse(true, commsSession); + if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { + // Confirm Checksum and echo back the confirmation. + logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + if (getVersionNegotiator().getVersion() > 3) { + String calculatedCRC = transaction.getCalculatedCRC(); + if (!receivedCRC.equals(calculatedCRC)) { + writeTransactionResponse(true, ResponseCode.BAD_CHECKSUM, commsSession); + session.rollback(); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + + "; canceling transaction and rolling back session"); + } + } + + writeTransactionResponse(true, ResponseCode.CONFIRM_TRANSACTION, commsSession, ""); + + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + + final Response transactionResponse; + try { + transactionResponse = readTransactionResponse(true, commsSession); + } catch (final IOException e) { + logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + + " It is unknown whether or not the peer successfully received/processed the data." + + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", + this, peer, session, flowFileDescription); + session.rollback(); + throw e; + } + + logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer}); + if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { + peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS)); + } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + session.commit(); + + StopWatch stopWatch = transaction.getStopWatch(); + long bytesSent = transaction.getBytesSent(); + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesSent.size(); + } + + protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException { + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + return Response.read(dis); + } + + protected final void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession) throws IOException { + writeTransactionResponse(isTransfer, response, commsSession, null); + } + protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException { + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + if(explanation == null){ + response.writeResponse(dos); + } else { + response.writeResponse(dos, explanation); + } + } + + @Override + public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if (!handshakeCompleted) { + throw new IllegalStateException("Handshake has not been completed"); + } + if (shutdown) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} receiving FlowFiles from {}", this, peer); + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + String remoteDn = commsSession.getUserDn(); + if (remoteDn == null) { + remoteDn = "none"; + } + + final StopWatch stopWatch = new StopWatch(true); + final CRC32 crc = new CRC32(); + + // Peer has data. Otherwise, we would not have been called, because they would not have sent + // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's + // finished sending data. + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + boolean continueTransaction = true; + while (continueTransaction) { + final long startNanos = System.nanoTime(); + final InputStream flowFileInputStream = handshakenProperties.isUseGzip() ? new CompressionInputStream(dis) : dis; + final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); + + final DataPacket dataPacket = codec.decode(checkedInputStream); + if(dataPacket == null){ + logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer); + break; + } + FlowFile flowFile = session.create(); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); + flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + String transitUriPrefix = handshakenProperties.getTransitUriPrefix(); + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid; + session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null + ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); + session.transfer(flowFile, Relationship.ANONYMOUS); + flowFilesReceived.add(flowFile); + bytesReceived += flowFile.getSize(); + + final Response transactionResponse = readTransactionResponse(false, commsSession); + switch (transactionResponse.getCode()) { + case CONTINUE_TRANSACTION: + logger.debug("{} Received ContinueTransaction indicator from {}", this, peer); + break; + case FINISH_TRANSACTION: + logger.debug("{} Received FinishTransaction indicator from {}", this, peer); + continueTransaction = false; + break; + case CANCEL_TRANSACTION: + logger.info("{} Received CancelTransaction indicator from {} with explanation {}", this, peer, transactionResponse.getMessage()); + session.rollback(); + return 0; + default: + throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse); + } + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + String calculatedCRC = String.valueOf(crc.getValue()); + writeTransactionResponse(false, ResponseCode.CONFIRM_TRANSACTION, commsSession, calculatedCRC); + + FlowFileTransaction transaction = new FlowFileTransaction(session, context, stopWatch, bytesReceived, flowFilesReceived, calculatedCRC); + return commitReceiveTransaction(peer, transaction); + } + + protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException { + CommunicationsSession commsSession = peer.getCommunicationsSession(); + ProcessSession session = transaction.getSession(); + final Response confirmTransactionResponse = readTransactionResponse(false, commsSession); + logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + session.rollback(); + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + // Commit the session so that we have persisted the data + session.commit(); + + if (transaction.getContext().getAvailableRelationships().isEmpty()) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + writeTransactionResponse(false, ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL, commsSession); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + writeTransactionResponse(false, ResponseCode.TRANSACTION_FINISHED, commsSession); + } + + Set<FlowFile> flowFilesReceived = transaction.getFlowFilesSent(); + long bytesReceived = transaction.getBytesSent(); + StopWatch stopWatch = transaction.getStopWatch(); + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesReceived.size(); + } + + @Override + public void shutdown(final Peer peer) { + logger.debug("{} Shutting down with {}", this, peer); + shutdown = true; + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public void setNodeInformant(final NodeInformant nodeInformant) { + } + + @Override + public long getRequestExpiration() { + return handshakenProperties.getExpirationMillis(); + } + + @Override + public String toString() { + String commid = handshakenProperties != null ? handshakenProperties.getCommsIdentifier() : null; + return getClass().getSimpleName() + "[CommsID=" + commid + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java new file mode 100644 index 0000000..560cbaf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.StopWatch; + +import java.util.Set; + +public class FlowFileTransaction { + + private final ProcessSession session; + private final ProcessContext context; + private final StopWatch stopWatch; + private final long bytesSent; + private final Set<FlowFile> flowFilesSent; + private final String calculatedCRC; + + public FlowFileTransaction() { + this(null, null, new StopWatch(true), 0, null, null); + } + + public FlowFileTransaction(ProcessSession session, ProcessContext context, StopWatch stopWatch, long bytesSent, Set<FlowFile> flowFilesSent, String calculatedCRC) { + this.session = session; + this.context = context; + this.stopWatch = stopWatch; + this.bytesSent = bytesSent; + this.flowFilesSent = flowFilesSent; + this.calculatedCRC = calculatedCRC; + } + + public ProcessSession getSession() { + return session; + } + + public StopWatch getStopWatch() { + return stopWatch; + } + + public long getBytesSent() { + return bytesSent; + } + + public Set<FlowFile> getFlowFilesSent() { + return flowFilesSent; + } + + public String getCalculatedCRC() { + return calculatedCRC; + } + + public ProcessContext getContext() { + return context; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java new file mode 100644 index 0000000..816689b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import org.apache.nifi.remote.exception.HandshakeException; + +public class HandshakenProperties { + + private String commsIdentifier; + private String transitUriPrefix = null; + private boolean useGzip; + private long expirationMillis; + private int batchCount = 0; + private long batchBytes = 0L; + private long batchDurationNanos = 0L; + + + public String getCommsIdentifier() { + return commsIdentifier; + } + + public void setCommsIdentifier(String commsIdentifier) { + this.commsIdentifier = commsIdentifier; + } + + public String getTransitUriPrefix() { + return transitUriPrefix; + } + + public void setTransitUriPrefix(String transitUriPrefix) { + this.transitUriPrefix = transitUriPrefix; + } + + public boolean isUseGzip() { + return useGzip; + } + + public void setUseGzip(Boolean useGzip) { + this.useGzip = useGzip; + } + + public long getExpirationMillis() { + return expirationMillis; + } + + public void setExpirationMillis(long expirationMillis) { + this.expirationMillis = expirationMillis; + } + + public int getBatchCount() { + return batchCount; + } + + public void setBatchCount(int batchCount) throws HandshakeException { + if (batchCount < 0) { + throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + batchCount); + } + this.batchCount = batchCount; + } + + public long getBatchBytes() { + return batchBytes; + } + + public void setBatchBytes(long batchBytes) throws HandshakeException { + if (batchBytes < 0) { + throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + batchBytes); + } + this.batchBytes = batchBytes; + } + + public long getBatchDurationNanos() { + return batchDurationNanos; + } + + public void setBatchDurationNanos(long batchDurationNanos) throws HandshakeException { + if (batchDurationNanos < 0) { + throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + batchDurationNanos); + } + this.batchDurationNanos = batchDurationNanos; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java new file mode 100644 index 0000000..69c0e66 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.protocol.ServerProtocol; + +import java.io.IOException; + +public interface HttpFlowFileServerProtocol extends ServerProtocol { + + int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException; + + int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java new file mode 100644 index 0000000..f187625 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +import org.apache.nifi.remote.HttpRemoteSiteListener; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; +import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.FlowFileTransaction; +import org.apache.nifi.remote.protocol.HandshakenProperties; +import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol { + + public static final String RESOURCE_NAME = "HttpFlowFileProtocol"; + + private final FlowFileCodec codec = new StandardFlowFileCodec(); + private final VersionNegotiator versionNegotiator; + private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); + + public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) { + super(); + this.versionNegotiator = versionNegotiator; + } + + @Override + public FlowFileCodec negotiateCodec(final Peer peer) throws IOException { + return codec; + } + + @Override + public FlowFileCodec getPreNegotiatedCodec() { + return codec; + } + + @Override + protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { + HandshakenProperties confirmed = new HandshakenProperties(); + + HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + confirmed.setCommsIdentifier(commsSession.getTransactionId()); + validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams()); + + logger.debug("{} Done handshake, confirmed={}", this, confirmed); + return confirmed; + } + + @Override + protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException { + HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession; + + commSession.setResponseCode(response); + if(isTransfer){ + switch (response) { + case NO_MORE_DATA: + logger.debug("{} There's no data to send.", this); + break; + case CONTINUE_TRANSACTION: + logger.debug("{} Continue transaction... expecting more flow files.", this); + commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED); + break; + case BAD_CHECKSUM: + logger.debug("{} Received BAD_CHECKSUM.", this); + commSession.setStatus(Transaction.TransactionState.ERROR); + break; + case CONFIRM_TRANSACTION: + logger.debug("{} Transaction is confirmed.", this); + commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED); + break; + case FINISH_TRANSACTION: + logger.debug("{} transaction is completed.", this); + commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED); + break; + } + } else { + switch (response) { + case CONFIRM_TRANSACTION: + logger.debug("{} Confirming transaction. checksum={}", this, explanation); + commSession.setChecksum(explanation); + commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED); + break; + case TRANSACTION_FINISHED: + case TRANSACTION_FINISHED_BUT_DESTINATION_FULL: + logger.debug("{} Transaction is completed. responseCode={}", this, response); + commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED); + break; + } + } + } + + @Override + protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException { + // Returns Response based on current status. + HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession; + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Transaction.TransactionState currentStatus = commSession.getStatus(); + if(isTransfer){ + switch (currentStatus){ + case DATA_EXCHANGED: + String clientChecksum = commSession.getChecksum(); + logger.debug("readTransactionResponse. clientChecksum={}", clientChecksum); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(new DataOutputStream(bos), clientChecksum); + break; + case TRANSACTION_CONFIRMED: + logger.debug("readTransactionResponse. finishing."); + ResponseCode.TRANSACTION_FINISHED.writeResponse(new DataOutputStream(bos)); + break; + } + } else { + switch (currentStatus){ + case TRANSACTION_STARTED: + logger.debug("readTransactionResponse. returning CONTINUE_TRANSACTION."); + // We don't know if there's more data to receive, so just continue it. + ResponseCode.CONTINUE_TRANSACTION.writeResponse(new DataOutputStream(bos)); + break; + case TRANSACTION_CONFIRMED: + // Checksum was successfully validated at client side, or BAD_CHECKSUM is returned. + ResponseCode responseCode = commSession.getResponseCode(); + logger.debug("readTransactionResponse. responseCode={}", responseCode); + if(responseCode.containsMessage()){ + responseCode.writeResponse(new DataOutputStream(bos), ""); + } else { + responseCode.writeResponse(new DataOutputStream(bos)); + } + break; + } + } + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + return Response.read(new DataInputStream(bis)); + } + + private int holdTransaction(Peer peer, FlowFileTransaction transaction) { + // We don't commit the session here yet, + // to avoid losing sent flow files in case some issue happens at client side while it is processing, + // hold the transaction until we confirm additional request from client. + HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + String transactionId = commSession.getTransactionId(); + logger.debug("{} Holding transaction. transactionId={}", this, transactionId); + transactionManager.holdTransaction(transactionId, transaction); + + return transaction.getFlowFilesSent().size(); + } + + @Override + protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException { + return holdTransaction(peer, transaction); + } + + public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException { + logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum); + HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + String transactionId = commSession.getTransactionId(); + FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId); + commSession.setChecksum(clientChecksum); + commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED); + return super.commitTransferTransaction(peer, transaction); + } + + @Override + protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException { + return holdTransaction(peer, transaction); + } + + public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException { + logger.debug("{} Committing the receive transaction. peer={}", this, peer); + HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + String transactionId = commSession.getTransactionId(); + FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId); + commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED); + return super.commitReceiveTransaction(peer, transaction); + } + + @Override + public RequestType getRequestType(final Peer peer) throws IOException { + return null; + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public void sendPeerList(final Peer peer) throws IOException { + } + + @Override + public String getResourceName() { + return RESOURCE_NAME; + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java index ef7a61c..af6860b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java @@ -34,7 +34,9 @@ import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.HandshakeProperty; import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.ServerProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
