http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 22ca29f..a2a7223 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -16,106 +16,50 @@ */ package org.apache.nifi.remote.protocol.socket; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; - -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PortAuthorizationResult; import org.apache.nifi.remote.RemoteResourceFactory; -import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.HandshakenProperties; import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.remote.protocol.ServerProtocol; -import org.apache.nifi.remote.util.StandardDataPacket; -import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.StopWatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketFlowFileServerProtocol implements ServerProtocol { - - public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; - private ProcessGroup rootGroup; - private String commsIdentifier; - private boolean handshakeCompleted; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; - private Boolean useGzip; - private long requestExpirationMillis; - private RootGroupPort port; - private boolean shutdown = false; - private FlowFileCodec negotiatedFlowFileCodec = null; - private String transitUriPrefix = null; +public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol { - private int requestedBatchCount = 0; - private long requestedBatchBytes = 0L; - private long requestedBatchNanos = 0L; - private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); + public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); - private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class); @Override - public void setRootProcessGroup(final ProcessGroup group) { - if (!group.isRootGroup()) { - throw new IllegalArgumentException(); - } - this.rootGroup = group; - } + protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { - @Override - public void handshake(final Peer peer) throws IOException, HandshakeException { - if (handshakeCompleted) { - throw new IllegalStateException("Handshake has already been completed"); - } - if (shutdown) { - throw new IllegalStateException("Protocol is shutdown"); - } + HandshakenProperties confirmed = new HandshakenProperties(); - logger.debug("{} Handshaking with {}", this, peer); final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - commsIdentifier = dis.readUTF(); + confirmed.setCommsIdentifier(dis.readUTF()); if (versionNegotiator.getVersion() >= 3) { - transitUriPrefix = dis.readUTF(); + String transitUriPrefix = dis.readUTF(); if (!transitUriPrefix.endsWith("/")) { transitUriPrefix = transitUriPrefix + "/"; } + confirmed.setTransitUriPrefix(transitUriPrefix); } final Map<String, String> properties = new HashMap<>(); @@ -128,128 +72,33 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { // evaluate the properties received boolean responseWritten = false; - for (final Map.Entry<String, String> entry : properties.entrySet()) { - final String propertyName = entry.getKey(); - final String value = entry.getValue(); - final HandshakeProperty property; - try { - property = HandshakeProperty.valueOf(propertyName); - } catch (final Exception e) { - ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName); - throw new HandshakeException("Received unknown property: " + propertyName); + try { + validateHandshakeRequest(confirmed, peer, properties); + } catch (HandshakeException e) { + ResponseCode handshakeResult = e.getResponseCode(); + if(handshakeResult.containsMessage()){ + handshakeResult.writeResponse(dos, e.getMessage()); + } else { + handshakeResult.writeResponse(dos); } - - try { - switch (property) { - case GZIP: { - useGzip = Boolean.parseBoolean(value); - break; - } - case REQUEST_EXPIRATION_MILLIS: - requestExpirationMillis = Long.parseLong(value); - break; - case BATCH_COUNT: - requestedBatchCount = Integer.parseInt(value); - if (requestedBatchCount < 0) { - throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value); - } - break; - case BATCH_SIZE: - requestedBatchBytes = Long.parseLong(value); - if (requestedBatchBytes < 0) { - throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value); - } - break; - case BATCH_DURATION: - requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)); - if (requestedBatchNanos < 0) { - throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value); - } - break; - case PORT_IDENTIFIER: { - Port receivedPort = rootGroup.getInputPort(value); - if (receivedPort == null) { - receivedPort = rootGroup.getOutputPort(value); - } - if (receivedPort == null) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received unknown port identifier: " + value); - } - if (!(receivedPort instanceof RootGroupPort)) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); - } - - this.port = (RootGroupPort) receivedPort; - final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); - if (!portAuthResult.isAuthorized()) { - logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); - ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); - responseWritten = true; - break; - } - - if (!receivedPort.isValid()) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); - responseWritten = true; - break; - } - - if (!receivedPort.isRunning()) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); - responseWritten = true; - break; - } - - // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this - // we we will simply not service the request but the sender will timeout - if (getVersionNegotiator().getVersion() > 1) { - for (final Connection connection : port.getConnections()) { - if (connection.getFlowFileQueue().isFull()) { - logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); - ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); - responseWritten = true; - break; - } - } - } - - break; - } - } - } catch (final NumberFormatException nfe) { - throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value); + switch (handshakeResult) { + case UNAUTHORIZED: + case PORT_NOT_IN_VALID_STATE: + case PORTS_DESTINATION_FULL: + responseWritten = true; + break; + default: + throw e; } } - if (useGzip == null) { - logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing"); - ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name()); - throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name()); - } - // send "OK" response if (!responseWritten) { ResponseCode.PROPERTIES_OK.writeResponse(dos); } - logger.debug("{} Finished handshake with {}", this, peer); - handshakeCompleted = true; - } - - @Override - public boolean isHandshakeSuccessful() { - return handshakeCompleted; - } - - @Override - public RootGroupPort getPort() { - return port; + return confirmed; } @Override @@ -280,290 +129,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { } } - @Override - public FlowFileCodec getPreNegotiatedCodec() { - return negotiatedFlowFileCodec; - } - - @Override - public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if (!handshakeCompleted) { - throw new IllegalStateException("Handshake has not been completed"); - } - if (shutdown) { - throw new IllegalStateException("Protocol is shutdown"); - } - - logger.debug("{} Sending FlowFiles to {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String remoteDn = commsSession.getUserDn(); - if (remoteDn == null) { - remoteDn = "none"; - } - - FlowFile flowFile = session.get(); - if (flowFile == null) { - // we have no data to send. Notify the peer. - logger.debug("{} No data to send to {}", this, peer); - ResponseCode.NO_MORE_DATA.writeResponse(dos); - return 0; - } - - // we have data to send. - logger.debug("{} Data is available to send to {}", this, peer); - ResponseCode.MORE_DATA.writeResponse(dos); - - final StopWatch stopWatch = new StopWatch(true); - long bytesSent = 0L; - final Set<FlowFile> flowFilesSent = new HashSet<>(); - final CRC32 crc = new CRC32(); - - // send data until we reach some batch size - boolean continueTransaction = true; - final long startNanos = System.nanoTime(); - String calculatedCRC = ""; - while (continueTransaction) { - final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos; - logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer}); - - final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); - - final StopWatch transferWatch = new StopWatch(true); - - final FlowFile toSend = flowFile; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); - codec.encode(dataPacket, checkedOutputStream); - } - }); - - final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if (useGzip) { - checkedOutputStream.close(); - } - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); - session.remove(flowFile); - - // determine if we should check for more data on queue. - final long sendingNanos = System.nanoTime() - startNanos; - boolean poll = true; - if (sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L) { - poll = false; - } - if (bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L) { - poll = false; - } - if (flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0) { - poll = false; - } - - if (requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0) { - poll = (sendingNanos < DEFAULT_BATCH_NANOS); - } - - if (poll) { - // we've not elapsed the requested sending duration, so get more data. - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - if (continueTransaction) { - logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer); - ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); - } else { - logger.debug("{} Sending FinishTransaction indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue()); - } - } - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(dis); - if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { - // Confirm Checksum and echo back the confirmation. - logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - if (versionNegotiator.getVersion() > 3) { - if (!receivedCRC.equals(calculatedCRC)) { - ResponseCode.BAD_CHECKSUM.writeResponse(dos); - session.rollback(); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " - + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC - + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); - } - - final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - - final Response transactionResponse; - try { - transactionResponse = Response.read(dis); - } catch (final IOException e) { - logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." - + " It is unknown whether or not the peer successfully received/processed the data." - + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", - this, peer, session, flowFileDescription); - session.rollback(); - throw e; - } - - logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer}); - if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { - peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - - session.commit(); - - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - - return flowFilesSent.size(); - } - - @Override - public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if (!handshakeCompleted) { - throw new IllegalStateException("Handshake has not been completed"); - } - if (shutdown) { - throw new IllegalStateException("Protocol is shutdown"); - } - - logger.debug("{} receiving FlowFiles from {}", this, peer); - - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String remoteDn = commsSession.getUserDn(); - if (remoteDn == null) { - remoteDn = "none"; - } - - final StopWatch stopWatch = new StopWatch(true); - final CRC32 crc = new CRC32(); - - // Peer has data. Otherwise, we would not have been called, because they would not have sent - // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's - // finished sending data. - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - boolean continueTransaction = true; - String calculatedCRC = ""; - while (continueTransaction) { - final long startNanos = System.nanoTime(); - final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis; - final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); - - final DataPacket dataPacket = codec.decode(checkedInputStream); - FlowFile flowFile = session.create(); - flowFile = session.importFrom(dataPacket.getData(), flowFile); - flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); - flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid; - session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null - ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); - session.transfer(flowFile, Relationship.ANONYMOUS); - flowFilesReceived.add(flowFile); - bytesReceived += flowFile.getSize(); - - final Response transactionResponse = Response.read(dis); - switch (transactionResponse.getCode()) { - case CONTINUE_TRANSACTION: - logger.debug("{} Received ContinueTransaction indicator from {}", this, peer); - break; - case FINISH_TRANSACTION: - logger.debug("{} Received FinishTransaction indicator from {}", this, peer); - continueTransaction = false; - calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue()); - break; - case CANCEL_TRANSACTION: - logger.info("{} Received CancelTransaction indicator from {} with explanation {}", this, peer, transactionResponse.getMessage()); - session.rollback(); - return 0; - default: - throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse); - } - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - - final Response confirmTransactionResponse = Response.read(dis); - logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - session.rollback(); - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - // Commit the session so that we have persisted the data - session.commit(); - - if (context.getAvailableRelationships().isEmpty()) { - // Confirm that we received the data and the peer can now discard it but that the peer should not - // send any more data for a bit - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); - } else { - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - } - - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - - return flowFilesReceived.size(); - } @Override public RequestType getRequestType(final Peer peer) throws IOException { @@ -582,22 +147,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { } @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public void shutdown(final Peer peer) { - logger.debug("{} Shutting down with {}", this, peer); - shutdown = true; - } - - @Override - public boolean isShutdown() { - return shutdown; - } - - @Override public void sendPeerList(final Peer peer) throws IOException { if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); @@ -632,17 +181,9 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { return RESOURCE_NAME; } - @Override - public void setNodeInformant(final NodeInformant nodeInformant) { - } @Override - public long getRequestExpiration() { - return requestExpirationMillis; - } - - @Override - public String toString() { - return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]"; + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java new file mode 100644 index 0000000..ded2042 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.protocol.FlowFileTransaction; +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestHttpRemoteSiteListener { + + @BeforeClass + public static void setup() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + } + + @Test + public void testNormalTransactionProgress() { + HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); + String transactionId = transactionManager.createTransaction(); + + assertTrue("Transaction should be active.", transactionManager.isTransactionActive(transactionId)); + + ProcessSession processSession = Mockito.mock(ProcessSession.class); + FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); + transactionManager.holdTransaction(transactionId, transaction); + + transaction = transactionManager.finalizeTransaction(transactionId); + assertNotNull(transaction); + + assertFalse("Transaction should not be active anymore.", transactionManager.isTransactionActive(transactionId)); + + } + + @Test + public void testDuplicatedTransactionId() { + HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); + String transactionId = transactionManager.createTransaction(); + + assertTrue("Transaction should be active.", transactionManager.isTransactionActive(transactionId)); + + ProcessSession processSession = Mockito.mock(ProcessSession.class); + FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); + transactionManager.holdTransaction(transactionId, transaction); + + try { + transactionManager.holdTransaction(transactionId, transaction); + fail("The same transaction id can't hold another transaction"); + } catch (IllegalStateException e) { + } + + } + + @Test + public void testNoneExistingTransaction() { + HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); + + String transactionId = "does-not-exist-1"; + assertFalse("Transaction should not be active.", transactionManager.isTransactionActive(transactionId)); + + ProcessSession processSession = Mockito.mock(ProcessSession.class); + FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); + try { + transactionManager.holdTransaction(transactionId, transaction); + } catch (IllegalStateException e) { + fail("Transaction can be held even if the transaction id is not valid anymore," + + " in order to support large file or slow network."); + } + + transactionId = "does-not-exist-2"; + try { + transactionManager.finalizeTransaction(transactionId); + fail("But transaction should not be finalized if it isn't active."); + } catch (IllegalStateException e) { + } + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java new file mode 100644 index 0000000..a8900c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.remote.HttpRemoteSiteListener; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PortAuthorizationResult; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.io.http.HttpInput; +import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.remote.protocol.HandshakeProperty; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class TestHttpFlowFileServerProtocol { + + @BeforeClass + public static void setup() throws Exception { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + } + + private Peer getDefaultPeer() { + return getDefaultPeer(null); + } + + private Peer getDefaultPeer(final String transactionId) { + final PeerDescription description = new PeerDescription("peer-host", 8080, false); + final InputStream inputStream = new ByteArrayInputStream(new byte[]{}); + final OutputStream outputStream = new ByteArrayOutputStream(); + final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId); + commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false"); + commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, "1234"); + final String peerUrl = "http://peer-host:8080/"; + final String clusterUrl = "cluster-url"; + return new Peer(description, commsSession, peerUrl, clusterUrl); + } + + private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() { + final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); + return new HttpFlowFileServerProtocolImpl(versionNegotiator); + } + + @Test + public void testIllegalHandshakeProperty() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + final Peer peer = getDefaultPeer(); + ((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getHandshakeParams().clear(); + try { + serverProtocol.handshake(peer); + fail(); + } catch (HandshakeException e) { + assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode()); + } + + assertFalse(serverProtocol.isHandshakeSuccessful()); + } + + @Test + public void testUnknownPort() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + final Peer peer = getDefaultPeer(); + ((HttpServerCommunicationsSession)peer.getCommunicationsSession()) + .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + doReturn(true).when(processGroup).isRootGroup(); + + serverProtocol.setRootProcessGroup(processGroup); + try { + serverProtocol.handshake(peer); + fail(); + } catch (HandshakeException e) { + assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode()); + } + + assertFalse(serverProtocol.isHandshakeSuccessful()); + } + + @Test + public void testUnauthorized() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + final Peer peer = getDefaultPeer(); + ((HttpServerCommunicationsSession)peer.getCommunicationsSession()) + .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RootGroupPort port = mock(RootGroupPort.class); + final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class); + doReturn(true).when(processGroup).isRootGroup(); + doReturn(port).when(processGroup).getOutputPort("port-identifier"); + doReturn(authResult).when(port).checkUserAuthorization(any(String.class)); + + serverProtocol.setRootProcessGroup(processGroup); + try { + serverProtocol.handshake(peer); + fail(); + } catch (HandshakeException e) { + assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode()); + } + + assertFalse(serverProtocol.isHandshakeSuccessful()); + } + + @Test + public void testPortNotInValidState() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + final Peer peer = getDefaultPeer(); + ((HttpServerCommunicationsSession)peer.getCommunicationsSession()) + .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RootGroupPort port = mock(RootGroupPort.class); + final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class); + doReturn(true).when(processGroup).isRootGroup(); + doReturn(port).when(processGroup).getOutputPort("port-identifier"); + doReturn(authResult).when(port).checkUserAuthorization(any(String.class)); + doReturn(true).when(authResult).isAuthorized(); + + serverProtocol.setRootProcessGroup(processGroup); + try { + serverProtocol.handshake(peer); + fail(); + } catch (HandshakeException e) { + assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode()); + } + + assertFalse(serverProtocol.isHandshakeSuccessful()); + } + + @Test + public void testPortDestinationFull() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + final Peer peer = getDefaultPeer(); + ((HttpServerCommunicationsSession)peer.getCommunicationsSession()) + .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, "port-identifier"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RootGroupPort port = mock(RootGroupPort.class); + final PortAuthorizationResult authResult = mock(PortAuthorizationResult.class); + doReturn(true).when(processGroup).isRootGroup(); + doReturn(port).when(processGroup).getOutputPort("port-identifier"); + doReturn(authResult).when(port).checkUserAuthorization(any(String.class)); + doReturn(true).when(authResult).isAuthorized(); + doReturn(true).when(port).isValid(); + doReturn(true).when(port).isRunning(); + Set<Connection> connections = new HashSet<>(); + final Connection connection = mock(Connection.class); + connections.add(connection); + doReturn(connections).when(port).getConnections(); + final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); + doReturn(flowFileQueue).when(connection).getFlowFileQueue(); + doReturn(true).when(flowFileQueue).isFull(); + + serverProtocol.setRootProcessGroup(processGroup); + try { + serverProtocol.handshake(peer); + fail(); + } catch (HandshakeException e) { + assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode()); + } + + assertFalse(serverProtocol.isHandshakeSuccessful()); + } + + @Test + public void testShutdown() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final Peer peer = getDefaultPeer(); + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + assertTrue(negotiatedCoded instanceof StandardFlowFileCodec); + + assertEquals(negotiatedCoded, serverProtocol.getPreNegotiatedCodec()); + assertEquals(1234, serverProtocol.getRequestExpiration()); + + serverProtocol.shutdown(peer); + + final ProcessContext context = null; + final ProcessSession processSession = null; + try { + serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); + fail("transferFlowFiles should fail since it's already shutdown."); + } catch (IllegalStateException e) { + } + + try { + serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); + fail("receiveFlowFiles should fail since it's already shutdown."); + } catch (IllegalStateException e) { + } + } + + @Test + public void testTransferZeroFile() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final Peer peer = getDefaultPeer(); + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = null; + final ProcessSession processSession = mock(ProcessSession.class); + + // Execute test using mock + final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(0, flowFileSent); + } + + @Test + public void testTransferOneFile() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testTransferOneFile"; + final Peer peer = transferOneFile(serverProtocol, transactionId); + + // Commit transaction + final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "2077607535"); + assertEquals(1, flowFileSent); + } + + @Test + public void testTransferOneFileBadChecksum() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testTransferOneFileBadChecksum"; + final Peer peer = transferOneFile(serverProtocol, transactionId); + + // Commit transaction + try { + serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum"); + fail(); + } catch (IOException e) { + assertTrue(e.getMessage().contains("CRC32 Checksum")); + } + } + + private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId) throws IOException { + final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); + final Peer peer = getDefaultPeer(transactionId); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1"); + commsSession.setUserDn("unit-test"); + + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = mock(ProcessContext.class); + final ProcessSession processSession = mock(ProcessSession.class); + final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); + final FlowFile flowFile = mock(FlowFile.class); + doReturn(flowFile).when(processSession).get(); + doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { + String peerUrl = (String)invocation.getArguments()[1]; + String detail = (String)invocation.getArguments()[2]; + assertEquals("http://peer-host:8080/", peerUrl); + assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); + return null; + }).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); + + doAnswer(invocation -> { + InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; + callback.process(new java.io.ByteArrayInputStream("Server content".getBytes())); + return null; + }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class)); + + // Execute test using mock + int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(1, flowFileSent); + + assertTrue(remoteSiteListener.isTransactionActive(transactionId)); + return peer; + } + + @Test + public void testTransferTwoFiles() throws Exception { + final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testTransferTwoFiles"; + final Peer peer = getDefaultPeer(transactionId); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2"); + commsSession.setUserDn("unit-test"); + + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = mock(ProcessContext.class); + final ProcessSession processSession = mock(ProcessSession.class); + final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); + final FlowFile flowFile1 = mock(FlowFile.class); + final FlowFile flowFile2 = mock(FlowFile.class); + doReturn(flowFile1) + .doReturn(flowFile2) + .when(processSession).get(); + + doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { + String peerUrl = (String)invocation.getArguments()[1]; + String detail = (String)invocation.getArguments()[2]; + assertEquals("http://peer-host:8080/", peerUrl); + assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); + return null; + }).when(provenanceReporter).send(eq(flowFile1), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); + + doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { + String peerUrl = (String)invocation.getArguments()[1]; + String detail = (String)invocation.getArguments()[2]; + assertEquals("http://peer-host:8080/", peerUrl); + assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); + return null; + }).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); + + doAnswer(invocation -> { + InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; + callback.process(new java.io.ByteArrayInputStream("Server content".getBytes())); + return null; + }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class)); + + // Execute test using mock + int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(2, flowFileSent); + + assertTrue(remoteSiteListener.isTransactionActive(transactionId)); + + // Commit transaction + flowFileSent = serverProtocol.commitTransferTransaction(peer, "2747386400"); + assertEquals(2, flowFileSent); + } + + private DataPacket createClientDataPacket() { + final String contents = "Content from client."; + final byte[] bytes = contents.getBytes(); + final InputStream in = new ByteArrayInputStream(bytes); + Map<String, String> attributes = new HashMap<>(); + attributes.put("client-attr-1", "client-attr-1-value"); + attributes.put("client-attr-2", "client-attr-2-value"); + return new StandardDataPacket(attributes, in, bytes.length); + } + + @Test + public void testReceiveZeroFile() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final Peer peer = getDefaultPeer("testReceiveZeroFile"); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + commsSession.setUserDn("unit-test"); + + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = null; + final ProcessSession processSession = mock(ProcessSession.class); + + + final InputStream httpInputStream = new ByteArrayInputStream(new byte[]{}); + + ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); + + // Execute test using mock + final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(0, flowFileReceived); + } + + @Test + public void testReceiveOneFile() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testReceiveOneFile"; + final Peer peer = getDefaultPeer(transactionId); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + receiveOneFile(serverProtocol, transactionId, peer); + + // Commit transaction + commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION); + final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer); + assertEquals(1, flowFileReceived); + } + + @Test + public void testReceiveOneFileBadChecksum() throws Exception { + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testReceiveOneFileBadChecksum"; + final Peer peer = getDefaultPeer(transactionId); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + receiveOneFile(serverProtocol, transactionId, peer); + + // Commit transaction + commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM); + try { + serverProtocol.commitReceiveTransaction(peer); + fail(); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Received a BadChecksum response")); + } + } + + private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId, Peer peer) throws IOException { + final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1"); + commsSession.setUserDn("unit-test"); + + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = mock(ProcessContext.class); + final ProcessSession processSession = mock(ProcessSession.class); + final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); + final FlowFile flowFile = mock(FlowFile.class); + + DataPacket dataPacket = createClientDataPacket(); + + final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream(); + negotiatedCoded.encode(dataPacket, testDataOs); + final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray()); + + ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); + + doAnswer(invocation -> { + InputStream is = (InputStream) invocation.getArguments()[0]; + for (int b; (b = is.read()) >= 0;) { + // consume stream. + } + return flowFile; + }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); + // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution + // which returns flowFile instance used later. + doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); + doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { + String peerUrl = (String)invocation.getArguments()[1]; + String detail = (String)invocation.getArguments()[3]; + assertEquals("http://peer-host:8080/", peerUrl); + assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); + return null; + }).when(provenanceReporter) + .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class)); + + Set<Relationship> relations = new HashSet<>(); + final Relationship relationship = new Relationship.Builder().build(); + relations.add(relationship); + doReturn(relations).when(context).getAvailableRelationships(); + + // Execute test using mock + int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(1, flowFileReceived); + + assertTrue(remoteSiteListener.isTransactionActive(transactionId)); + } + + @Test + public void testReceiveTwoFiles() throws Exception { + final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); + final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol(); + + final String transactionId = "testReceiveTwoFile"; + final Peer peer = getDefaultPeer(transactionId); + final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2"); + commsSession.setUserDn("unit-test"); + + serverProtocol.handshake(peer); + + assertTrue(serverProtocol.isHandshakeSuccessful()); + + final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer); + final ProcessContext context = mock(ProcessContext.class); + final ProcessSession processSession = mock(ProcessSession.class); + final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); + final FlowFile flowFile1 = mock(FlowFile.class); + final FlowFile flowFile2 = mock(FlowFile.class); + + final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream(); + negotiatedCoded.encode(createClientDataPacket(), testDataOs); + negotiatedCoded.encode(createClientDataPacket(), testDataOs); + final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray()); + + ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); + + doAnswer(invocation -> { + InputStream is = (InputStream) invocation.getArguments()[0]; + for (int b; (b = is.read()) >= 0;) { + // consume stream. + } + return flowFile1; + }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class)); + // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution + // which returns flowFile instance used later. + doReturn(flowFile1) + .doReturn(flowFile2) + .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); + doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); + doAnswer(invocation -> { + String peerUrl = (String)invocation.getArguments()[1]; + String detail = (String)invocation.getArguments()[3]; + assertEquals("http://peer-host:8080/", peerUrl); + assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); + return null; + }).when(provenanceReporter) + .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class)); + + Set<Relationship> relations = new HashSet<>(); + doReturn(relations).when(context).getAvailableRelationships(); + + // Execute test using mock + int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); + assertEquals(2, flowFileReceived); + + assertTrue(remoteSiteListener.isTransactionActive(transactionId)); + + // Commit transaction + commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION); + flowFileReceived = serverProtocol.commitReceiveTransaction(peer); + assertEquals(2, flowFileReceived); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 161be64..864c06e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -2338,9 +2338,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // determine the site to site configuration if (isClustered()) { controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort()); + controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort()); controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure()); } else { controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); + controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 2138264..4dee472 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -103,7 +103,7 @@ public abstract class ApplicationResource { @Context private HttpContext httpContext; - private NiFiProperties properties; + protected NiFiProperties properties; private RequestReplicator requestReplicator; private ClusterCoordinator clusterCoordinator;
