Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client c174d3a60 -> 2aaed7021
NIFI-4: Refactoring protocols to be used with DataPacket instead of FlowFile as a more generic data model Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2aaed702 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2aaed702 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2aaed702 Branch: refs/heads/site-to-site-client Commit: 2aaed7021d0ed0b6cc81415edbef19c95d8ea68c Parents: c174d3a Author: Mark Payne <[email protected]> Authored: Wed Jan 21 07:35:09 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Jan 21 07:35:09 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/remote/client/Transaction.java | 21 -- .../apache/nifi/remote/codec/FlowFileCodec.java | 32 +-- .../nifi/remote/protocol/ClientProtocol.java | 10 +- .../protocol/socket/SocketClientProtocol.java | 236 +++++++++++++++++-- .../socket/SocketClientTransaction.java | 54 +++-- 5 files changed, 272 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java deleted file mode 100644 index bae6e51..0000000 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.client; - -public interface Transaction { - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java index b4206b3..1380e1b 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java @@ -21,11 +21,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.List; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.protocol.DataPacket; /** * <p> @@ -44,36 +43,29 @@ public interface FlowFileCodec extends VersionedRemoteResource { public List<Integer> getSupportedVersions(); /** - * Encodes a FlowFile and its content as a single stream of data and writes - * that stream to the output. If checksum is not null, it will be calculated - * as the stream is read + * Encodes a DataPacket and its content as a single stream of data and writes + * that stream to the output. * - * @param flowFile the FlowFile to encode - * @param session a session that can be used to transactionally create and - * transfer flow files + * @param dataPacket the data to serialize * @param outStream the stream to write the data to * - * @return the updated FlowFile - * - * @throws IOException + * @throws IOException if there is a communications issue + * @throws TransmissionDisabledException if a user terminates the connection */ - FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException; + void encode(DataPacket dataPacket, OutputStream outStream) throws IOException, TransmissionDisabledException; /** * Decodes the contents of the InputStream, interpreting the data to - * determine the next FlowFile's attributes and content, as well as their - * destinations. If not null, checksum will be used to calculate the - * checksum as the data is read. + * determine the next DataPacket's attributes and content. * - * @param stream an InputStream containing FlowFiles' contents, attributes, - * and destinations - * @param session + * @param stream an InputStream containing DataPacket's content and attributes * - * @return the FlowFile that was created, or <code>null</code> if the stream + * @return the DataPacket that was created, or <code>null</code> if the stream * was out of data * * @throws IOException * @throws ProtocolException if the input is malformed + * @throws TransmissionDisabledException if a user terminates the connection */ - FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException; + DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index d817425..51d3970 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -50,15 +50,17 @@ public interface ClientProtocol extends VersionedRemoteResource { - void startTransaction(Peer peer, TransferDirection direction) throws IOException; + void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException; - void completeTransaction(); + void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException; void rollbackTransaction(); - void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException; + // must be done within a transaction. + void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException; - DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException; + // must be done within a transaction. + DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException; /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 6b0c94b..58d26d4 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -36,6 +36,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; @@ -43,7 +44,6 @@ import org.apache.nifi.remote.RemoteResourceInitiator; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.client.Transaction; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -240,7 +240,7 @@ public class SocketClientProtocol implements ClientProtocol { private SocketClientTransaction transaction; @Override - public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException { + public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException { if ( !handshakeComplete ) { throw new IllegalStateException("Handshake has not been performed"); } @@ -255,6 +255,21 @@ public class SocketClientProtocol implements ClientProtocol { // Indicate that we would like to have some data RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); dos.flush(); + + final Response dataAvailableCode = Response.read(transaction.getDataInputStream()); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + transaction.setDataAvailable(true); + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + transaction.setDataAvailable(false); + return; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + } else { // Indicate that we would like to have some data RequestType.SEND_FLOWFILES.writeRequestType(dos); @@ -268,34 +283,180 @@ public class SocketClientProtocol implements ClientProtocol { throw new IllegalStateException("Cannot receive data because no transaction has been started"); } + if ( transaction.getTransferDirection() == TransferDirection.SEND ) { + throw new IllegalStateException("Attempting to receive data but started a SEND Transaction"); + } + + // if no data available, return null + if ( !transaction.isDataAvailable() ) { + return null; + } + final Peer peer = transaction.getPeer(); - logger.debug("{} Receiving FlowFiles from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; + logger.debug("{} Receiving data from {}", this, peer); + final DataPacket packet = codec.decode(transaction.createCheckedInputStream()); + + if ( packet != null ) { + transaction.incrementTransferCount(); + + // Determine if Peer will send us data or has no data to send us + final DataInputStream dis = transaction.getDataInputStream(); + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + transaction.setDataAvailable(true); + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + transaction.setDataAvailable(false); + break; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } } - // Determine if Peer will send us data or has no data to send us - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - break; - case NO_MORE_DATA: - logger.debug("{} No data available from {}", peer); - return null; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + return packet; + } + + + @Override + public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( transaction == null ) { + throw new IllegalStateException("Cannot send data because no transaction has been started"); + } + + if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) { + throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction"); + } + + final Peer peer = transaction.getPeer(); + logger.debug("{} Sending data to {}", this, peer); + + if ( transaction.getTransferCount() > 0 ) { + ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream()); } + final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream(); + codec.encode(dataPacket, checkedOutStream); + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( useCompression ) { + checkedOutStream.close(); + } + + transaction.incrementTransferCount(); } @Override + public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException { + final SocketClientTransaction transaction = this.transaction; + this.transaction = null; + + if ( transaction == null ) { + throw new IllegalStateException("Cannot complete transaction because no transaction has been started"); + } + + final Peer peer = transaction.getPeer(); + + if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) { + final boolean moreData = transaction.isDataAvailable(); + if ( moreData ) { + throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + final String calculatedCRC = transaction.calculateCRC(); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC); + + final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream()); + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + if ( applyBackPressure ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream()); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream()); + } + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream()); + + final String calculatedCRC = transaction.calculateCRC(); + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream()); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + if ( versionNegotiator.getVersion() > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream()); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + final Response transactionResponse; + try { + transactionResponse = Response.read(transaction.getDataInputStream()); + } catch (final IOException e) { + throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + + "It is unknown whether or not the peer successfully received/processed the data.", e); + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + } + } + + + @Override + public void rollbackTransaction() { + final SocketClientTransaction transaction = this.transaction; + this.transaction = null; + + if ( transaction == null ) { + throw new IllegalStateException("Cannot rollback transaction because no transaction has been started"); + } + + // TODO: IMPLEMENT + } + + @Override public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { if ( !handshakeComplete ) { throw new IllegalStateException("Handshake has not been performed"); @@ -344,7 +505,12 @@ public class SocketClientProtocol implements ClientProtocol { final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); final long startNanos = System.nanoTime(); - FlowFile flowFile = codec.decode(checkedIn, session); + + final DataPacket dataPacket = codec.decode(checkedIn); + FlowFile flowFile = session.create(); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + final long transmissionNanos = System.nanoTime() - startNanos; final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); @@ -462,7 +628,33 @@ public class SocketClientProtocol implements ClientProtocol { final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); final long startNanos = System.nanoTime(); - flowFile = codec.encode(flowFile, session, checkedOutStream); + + // call codec.encode within a session callback so that we have the InputStream to read the FlowFile + final FlowFile toWrap = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new DataPacket() { + @Override + public Map<String, String> getAttributes() { + return toWrap.getAttributes(); + } + + @Override + public InputStream getData() { + return in; + } + + @Override + public long getSize() { + return toWrap.getSize(); + } + }; + + codec.encode(dataPacket, checkedOutStream); + } + }); + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index 0c4ce05..83522a5 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -19,48 +19,74 @@ package org.apache.nifi.remote.protocol.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.Transaction; -import org.apache.nifi.remote.io.CompressionInputStream; -public class SocketClientTransaction implements Transaction { +public class SocketClientTransaction { private final long startTime = System.nanoTime(); - private long bytesReceived = 0L; - private CRC32 crc = new CRC32(); + private final CRC32 crc = new CRC32(); private final Peer peer; - private final TransferDirection direction; private final DataInputStream dis; private final DataOutputStream dos; - private final CheckedInputStream checkedInputStream; + private final TransferDirection direction; + + private boolean dataAvailable = false; + private int transfers = 0; SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException { this.peer = peer; this.direction = direction; - this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); - - final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis; - checkedInputStream = new CheckedInputStream(dataInputStream, crc); } - CheckedInputStream getCheckedInputStream() { - return checkedInputStream; + int getTransferCount() { + return transfers; + } + + void incrementTransferCount() { + transfers++; + } + + void setDataAvailable(final boolean available) { + this.dataAvailable = available; + } + + boolean isDataAvailable() { + return dataAvailable; + } + + TransferDirection getTransferDirection() { + return direction; } DataOutputStream getDataOutputStream() { return dos; } + DataInputStream getDataInputStream() { + return dis; + } + + CheckedInputStream createCheckedInputStream() { + return new CheckedInputStream(dis, crc); + } + + CheckedOutputStream createCheckedOutputStream() { + return new CheckedOutputStream(dos, crc); + } + Peer getPeer() { return peer; } + String calculateCRC() { + return String.valueOf(crc.getValue()); + } }
