Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client 2aaed7021 -> 77fd8e5ec
NIFI-282: Refactoring to extract client util Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/77fd8e5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/77fd8e5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/77fd8e5e Branch: refs/heads/site-to-site-client Commit: 77fd8e5ec7e86095d6235deae91939db6412eeb1 Parents: 2aaed70 Author: Mark Payne <[email protected]> Authored: Wed Jan 21 20:04:36 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Jan 21 20:04:36 2015 -0500 ---------------------------------------------------------------------- .../nifi/stream/io/LimitingInputStream.java | 111 ++++ .../stream/io/MinimumLengthInputStream.java | 93 +++ .../org/apache/nifi/remote/Transaction.java | 44 ++ .../nifi/remote/client/SiteToSiteClient.java | 7 +- .../nifi/remote/client/socket/SocketClient.java | 54 +- .../remote/codec/StandardFlowFileCodec.java | 78 +-- .../remote/exception/ProtocolException.java | 4 +- .../nifi/remote/protocol/ClientProtocol.java | 13 +- .../protocol/socket/SocketClientProtocol.java | 618 +++++-------------- .../socket/SocketClientTransaction.java | 260 +++++++- .../nifi/remote/util/StandardDataPacket.java | 50 ++ .../nifi/remote/StandardRemoteGroupPort.java | 2 +- .../socket/SocketFlowFileServerProtocol.java | 20 +- 13 files changed, 758 insertions(+), 596 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java new file mode 100644 index 0000000..421d579 --- /dev/null +++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } + + public long getLimit() { + return limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java new file mode 100644 index 0000000..2e93599 --- /dev/null +++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An InputStream that will throw EOFException if the underlying InputStream runs out of data before reaching the + * configured minimum amount of data + */ +public class MinimumLengthInputStream extends FilterInputStream { + + private final long minLength; + private long consumedCount = 0L; + + public MinimumLengthInputStream(final InputStream in, final long minLength) { + super(in); + this.minLength = minLength; + } + + + @Override + public int read() throws IOException { + final int b = super.read(); + if ( b < 0 && consumedCount < minLength ) { + throw new EOFException(); + } + + if ( b >= 0 ) { + consumedCount++; + } + + return b; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public int read(byte[] b, int off, int len) throws IOException { + final int num = super.read(b, off, len); + + if ( num < 0 && consumedCount < minLength ) { + throw new EOFException(); + } + + if ( num >= 0 ) { + consumedCount += num; + } + + return num; + } + + @Override + public long skip(final long n) throws IOException { + long skipped = super.skip(n); + if ( skipped < 1 ) { + final int b = super.read(); + if ( b >= 0 ) { + skipped = 1; + } + } + + if ( skipped < 0 && consumedCount < minLength ) { + throw new EOFException(); + } + + if ( skipped >= 0 ) { + consumedCount += skipped; + } + + return skipped; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java new file mode 100644 index 0000000..6c136fc --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; + +import org.apache.nifi.remote.protocol.DataPacket; + +public interface Transaction { + + void confirm() throws IOException; + + void complete(boolean applyBackpressure) throws IOException; + + void cancel() throws IOException; + + void send(DataPacket dataPacket) throws IOException; + + DataPacket receive() throws IOException; + + TransactionState getState() throws IOException; + + public enum TransactionState { + TRANSACTION_STARTED, + DATA_EXCHANGED, + TRANSACTION_CONFIRMED, + TRANSACTION_COMPLETED, + TRANSACTION_CANCELED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 34cb56a..164a63c 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -19,12 +19,11 @@ package org.apache.nifi.remote.client; import java.io.Closeable; import java.io.IOException; -import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; public interface SiteToSiteClient extends Closeable { - void send(DataPacket dataPacket) throws IOException; - - DataPacket receive() throws IOException; + Transaction createTransaction(TransferDirection direction) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index b81b425..88eb5e8 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -24,6 +24,7 @@ import javax.net.ssl.SSLContext; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.exception.HandshakeException; @@ -65,7 +66,7 @@ public class SocketClient implements SiteToSiteClient { @Override - public void send(final DataPacket dataPacket) throws IOException { + public Transaction createTransaction(final TransferDirection direction) throws IOException { final String portId = getPortIdentifier(TransferDirection.SEND); if ( portId == null ) { @@ -91,19 +92,58 @@ public class SocketClient implements SiteToSiteClient { final EndpointConnectionState connectionState; try { - connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND); + connectionState = pool.getEndpointConnectionState(remoteDestination, direction); } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) { throw new IOException(e); } + final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction( + connectionState.getPeer(), connectionState.getCodec(), direction); - } + // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever + // the transaction is either completed or canceled. + return new Transaction() { + @Override + public void confirm() throws IOException { + transaction.confirm(); + } - @Override - public DataPacket receive() throws IOException { - // TODO Auto-generated method stub - return null; + @Override + public void complete(final boolean applyBackpressure) throws IOException { + try { + transaction.complete(applyBackpressure); + } finally { + pool.offer(connectionState); + } + } + + @Override + public void cancel() throws IOException { + try { + transaction.cancel(); + } finally { + pool.offer(connectionState); + } + } + + @Override + public void send(final DataPacket dataPacket) throws IOException { + transaction.send(dataPacket); + } + + @Override + public DataPacket receive() throws IOException { + return transaction.receive(); + } + + @Override + public TransactionState getState() throws IOException { + return transaction.getState(); + } + + }; } + @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java index d18a4ee..6fd92de 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java @@ -26,14 +26,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.StreamUtils; public class StandardFlowFileCodec implements FlowFileCodec { public static final int MAX_NUM_ATTRIBUTES = 25000; @@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec { } @Override - public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException { + public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException { final DataOutputStream out = new DataOutputStream(encodedOut); - final Map<String, String> attributes = flowFile.getAttributes(); + final Map<String, String> attributes = dataPacket.getAttributes(); out.writeInt(attributes.size()); for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { writeString(entry.getKey(), out); writeString(entry.getValue(), out); } - out.writeLong(flowFile.getSize()); - - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - while ( (len = in.read(buffer)) > 0 ) { - encodedOut.write(buffer, 0, len); - } - - encodedOut.flush(); - } - }); + out.writeLong(dataPacket.getSize()); - return flowFile; + final InputStream in = dataPacket.getData(); + StreamUtils.copy(in, encodedOut); + encodedOut.flush(); } @Override - public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException { + public DataPacket decode(final InputStream stream) throws IOException, ProtocolException { final DataInputStream in = new DataInputStream(stream); final int numAttributes; @@ -94,43 +81,16 @@ public class StandardFlowFileCodec implements FlowFileCodec { throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); } - try { - final Map<String, String> attributes = new HashMap<>(numAttributes); - for (int i=0; i < numAttributes; i++) { - final String attrName = readString(in); - final String attrValue = readString(in); - attributes.put(attrName, attrValue); - } - - final long numBytes = in.readLong(); - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - int len; - long size = 0; - final byte[] buffer = new byte[8192]; - - while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) { - out.write(buffer, 0, len); - size += len; - } - - if ( size != numBytes ) { - throw new EOFException("Expected " + numBytes + " bytes but received only " + size); - } - } - }); - - return flowFile; - } catch (final EOFException e) { - session.rollback(); - - // we throw the general IOException here because we did not expect to hit EOFException - throw e; + final Map<String, String> attributes = new HashMap<>(numAttributes); + for (int i=0; i < numAttributes; i++) { + final String attrName = readString(in); + final String attrValue = readString(in); + attributes.put(attrName, attrValue); } + + final long numBytes = in.readLong(); + + return new StandardDataPacket(attributes, stream, numBytes); } private void writeString(final String val, final DataOutputStream out) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java index 0f50b98..e12348a 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.remote.exception; -public class ProtocolException extends Exception { +import java.io.IOException; + +public class ProtocolException extends IOException { private static final long serialVersionUID = 5763900324505818495L; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index 51d3970..befbdaa 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.codec.FlowFileCodec; @@ -50,17 +51,7 @@ public interface ClientProtocol extends VersionedRemoteResource { - void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException; - - void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException; - - void rollbackTransaction(); - - // must be done within a transaction. - void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException; - - // must be done within a transaction. - DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException; + Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 58d26d4..b4d1e5d 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -20,16 +20,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -42,18 +38,18 @@ import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteResourceInitiator; import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; @@ -74,6 +70,8 @@ public class SocketClientProtocol implements ClientProtocol { private boolean readyForFileTransfer = false; private String transitUriPrefix = null; private int timeoutMillis = 30000; + + private SocketClientTransaction transaction; private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds @@ -236,11 +234,8 @@ public class SocketClientProtocol implements ClientProtocol { } - // TODO: move up to top with member variables - private SocketClientTransaction transaction; - @Override - public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException { + public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException { if ( !handshakeComplete ) { throw new IllegalStateException("Handshake has not been performed"); } @@ -248,204 +243,29 @@ public class SocketClientProtocol implements ClientProtocol { throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse); } - transaction = new SocketClientTransaction(peer, direction, useCompression); - - final DataOutputStream dos = transaction.getDataOutputStream(); - if ( direction == TransferDirection.RECEIVE ) { - // Indicate that we would like to have some data - RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); - dos.flush(); - - final Response dataAvailableCode = Response.read(transaction.getDataInputStream()); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - transaction.setDataAvailable(true); - break; - case NO_MORE_DATA: - logger.debug("{} No data available from {}", peer); - transaction.setDataAvailable(false); - return; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); - } - - } else { - // Indicate that we would like to have some data - RequestType.SEND_FLOWFILES.writeRequestType(dos); - dos.flush(); - } + return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec, + direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS)); } - - @Override - public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException { - if ( transaction == null ) { - throw new IllegalStateException("Cannot receive data because no transaction has been started"); - } - - if ( transaction.getTransferDirection() == TransferDirection.SEND ) { - throw new IllegalStateException("Attempting to receive data but started a SEND Transaction"); - } - // if no data available, return null - if ( !transaction.isDataAvailable() ) { - return null; - } - - final Peer peer = transaction.getPeer(); - logger.debug("{} Receiving data from {}", this, peer); - final DataPacket packet = codec.decode(transaction.createCheckedInputStream()); - - if ( packet != null ) { - transaction.incrementTransferCount(); - - // Determine if Peer will send us data or has no data to send us - final DataInputStream dis = transaction.getDataInputStream(); - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - transaction.setDataAvailable(true); - break; - case NO_MORE_DATA: - logger.debug("{} No data available from {}", peer); - transaction.setDataAvailable(false); - break; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); - } - } - - return packet; - } - @Override - public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( transaction == null ) { - throw new IllegalStateException("Cannot send data because no transaction has been started"); - } - - if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) { - throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction"); - } - - final Peer peer = transaction.getPeer(); - logger.debug("{} Sending data to {}", this, peer); - - if ( transaction.getTransferCount() > 0 ) { - ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream()); - } + // TODO: Transaction should be pulled out into its own class. + // Flow of execution: + // - start transaction + // - send/receive data + // - confirm contents + // - complete / rollback + // + // - this class should validate transaction state before each step. + // We need to confirm transaction to ensure that data is correct. Yes, it is sent via TCP, which should ensure that the + // data is correct, but things happen. Humans make mistakes. There could easily be a bug on our end, for example. And this + // will ensure that we guard against that. It's a good defensive programming strategy. + public void confirmTransaction() throws IOException { - final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream(); - codec.encode(dataPacket, checkedOutStream); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( useCompression ) { - checkedOutStream.close(); - } - - transaction.incrementTransferCount(); } - @Override - public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException { - final SocketClientTransaction transaction = this.transaction; - this.transaction = null; - - if ( transaction == null ) { - throw new IllegalStateException("Cannot complete transaction because no transaction has been started"); - } - - final Peer peer = transaction.getPeer(); - - if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) { - final boolean moreData = transaction.isDataAvailable(); - if ( moreData ) { - throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - final String calculatedCRC = transaction.calculateCRC(); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC); - - final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream()); - logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - if ( applyBackPressure ) { - // Confirm that we received the data and the peer can now discard it but that the peer should not - // send any more data for a bit - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream()); - } else { - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream()); - } - } else { - logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream()); - - final String calculatedCRC = transaction.calculateCRC(); - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream()); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { - // Confirm checksum and echo back the confirmation. - logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - if ( versionNegotiator.getVersion() > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { - ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream()); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); - } - - final Response transactionResponse; - try { - transactionResponse = Response.read(transaction.getDataInputStream()); - } catch (final IOException e) { - throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + - "It is unknown whether or not the peer successfully received/processed the data.", e); - } - - logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { - peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - } - } - - - @Override - public void rollbackTransaction() { + public void cancelTransaction() { final SocketClientTransaction transaction = this.transaction; this.transaction = null; @@ -456,296 +276,134 @@ public class SocketClientProtocol implements ClientProtocol { // TODO: IMPLEMENT } + @Override public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse); - } - - logger.debug("{} Receiving FlowFiles from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); - dos.flush(); - - // Determine if Peer will send us data or has no data to send us - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - break; - case NO_MORE_DATA: - context.yield(); - logger.debug("{} No data available from {}", peer); - return; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); - } - - final StopWatch stopWatch = new StopWatch(true); - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - final CRC32 crc = new CRC32(); - - // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data. - boolean continueTransaction = true; - String calculatedCRC = ""; - while (continueTransaction) { - final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis; - final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); - - final long startNanos = System.nanoTime(); - - final DataPacket dataPacket = codec.decode(checkedIn); - FlowFile flowFile = session.create(); - flowFile = session.importFrom(dataPacket.getData(), flowFile); - flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); - - final long transmissionNanos = System.nanoTime() - startNanos; - final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); - - final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key()); - flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis); - - session.transfer(flowFile, Relationship.ANONYMOUS); - bytesReceived += flowFile.getSize(); - flowFilesReceived.add(flowFile); - logger.debug("{} Received {} from {}", this, flowFile, peer); - - final Response transactionCode = Response.read(dis); - switch (transactionCode.getCode()) { - case CONTINUE_TRANSACTION: - logger.trace("{} Received ContinueTransaction indicator from {}", this, peer); - break; - case FINISH_TRANSACTION: - logger.trace("{} Received FinishTransaction indicator from {}", this, peer); - continueTransaction = false; - calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue()); - break; - default: - throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode); - } - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - - final Response confirmTransactionResponse = Response.read(dis); - logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - session.rollback(); - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - // Commit the session so that we have persisted the data - session.commit(); - - if ( context.getAvailableRelationships().isEmpty() ) { - // Confirm that we received the data and the peer can now discard it but that the peer should not - // send any more data for a bit - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); - } else { - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - } - - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } + final String userDn = peer.getCommunicationsSession().getUserDn(); + final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); + + final StopWatch stopWatch = new StopWatch(true); + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + + while (true) { + final long start = System.nanoTime(); + final DataPacket dataPacket = transaction.receive(); + if ( dataPacket == null ) { + break; + } + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + final long receiveNanos = System.nanoTime() - start; + + String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); + if ( sourceFlowFileIdentifier == null ) { + sourceFlowFileIdentifier = "<Unknown Identifier>"; + } + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); - @Override - public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse); - } + session.transfer(flowFile, Relationship.ANONYMOUS); + bytesReceived += dataPacket.getSize(); + } - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } + // Confirm that what we received was the correct data. + transaction.confirm(); + + // Commit the session so that we have persisted the data + session.commit(); - logger.debug("{} Sending FlowFiles to {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.SEND_FLOWFILES.writeRequestType(dos); - dos.flush(); - - final StopWatch stopWatch = new StopWatch(true); - final CRC32 crc = new CRC32(); - - long bytesSent = 0L; - final Set<FlowFile> flowFilesSent = new HashSet<>(); - boolean continueTransaction = true; - String calculatedCRC = ""; - final long startSendingNanos = System.nanoTime(); - while (continueTransaction) { - final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos; - logger.debug("{} Sending {} to {}", this, flowFile, peer); - - final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); - - final long startNanos = System.nanoTime(); - - // call codec.encode within a session callback so that we have the InputStream to read the FlowFile - final FlowFile toWrap = flowFile; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final DataPacket dataPacket = new DataPacket() { - @Override - public Map<String, String> getAttributes() { - return toWrap.getAttributes(); - } + // We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships. + final boolean applyBackpressure = context.getAvailableRelationships().isEmpty(); - @Override - public InputStream getData() { - return in; - } + transaction.complete(applyBackpressure); + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - @Override - public long getSize() { - return toWrap.getSize(); - } - }; - - codec.encode(dataPacket, checkedOutStream); - } - }); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( useCompression ) { - checkedOutStream.close(); - } - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - logger.debug("{} Sent {} to {}", this, flowFile, peer); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); - session.remove(flowFile); - - final long sendingNanos = System.nanoTime() - startSendingNanos; - if ( sendingNanos < BATCH_SEND_NANOS ) { - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - if ( continueTransaction ) { - logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer); - ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); - } else { - logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - - calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() ); - } - } - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(dis); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { - // Confirm checksum and echo back the confirmation. - logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - if ( versionNegotiator.getVersion() > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { - ResponseCode.BAD_CHECKSUM.writeResponse(dos); - session.rollback(); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); - } + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate }); + } - final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + + @Override + public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } - final Response transactionResponse; - try { - transactionResponse = Response.read(dis); - } catch (final IOException e) { - logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + - " It is unknown whether or not the peer successfully received/processed the data." + - " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", - this, peer, session, flowFileDescription); - session.rollback(); - throw e; - } - - logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { - peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - - session.commit(); - - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + try { + final String userDn = peer.getCommunicationsSession().getUserDn(); + final long startSendingNanos = System.nanoTime(); + final StopWatch stopWatch = new StopWatch(true); + long bytesSent = 0L; + + final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND); + + final Set<FlowFile> flowFilesSent = new HashSet<>(); + boolean continueTransaction = true; + while (continueTransaction) { + final long startNanos = System.nanoTime(); + // call codec.encode within a session callback so that we have the InputStream to read the FlowFile + final FlowFile toWrap = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize()); + transaction.send(dataPacket); + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + logger.debug("{} Sent {} to {}", this, flowFile, peer); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startSendingNanos; + if ( sendingNanos < BATCH_SEND_NANOS ) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + } + + transaction.confirm(); + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + + session.commit(); + transaction.complete(false); + + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } catch (final Exception e) { + session.rollback(); + throw e; + } } - + + @Override public VersionNegotiator getVersionNegotiator() { return versionNegotiator; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index 83522a5..129e5aa 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -19,74 +19,272 @@ package org.apache.nifi.remote.protocol.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.RequestType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class SocketClientTransaction { - private final long startTime = System.nanoTime(); - private final CRC32 crc = new CRC32(); +public class SocketClientTransaction implements Transaction { + private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); - private final Peer peer; + private final CRC32 crc = new CRC32(); + private final int protocolVersion; + private final FlowFileCodec codec; private final DataInputStream dis; private final DataOutputStream dos; private final TransferDirection direction; + private final boolean compress; + private final Peer peer; + private final int penaltyMillis; private boolean dataAvailable = false; private int transfers = 0; + private TransactionState state; - SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException { + SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec, + final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException { + this.protocolVersion = protocolVersion; this.peer = peer; + this.codec = codec; this.direction = direction; this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); + this.compress = useCompression; + this.state = TransactionState.TRANSACTION_STARTED; + this.penaltyMillis = penaltyMillis; + + initialize(); } - int getTransferCount() { - return transfers; + // TODO: UPDATE STATE + private void initialize() throws IOException { + if ( direction == TransferDirection.RECEIVE ) { + // Indicate that we would like to have some data + RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); + dos.flush(); + + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + this.dataAvailable = true; + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + this.dataAvailable = false; + return; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + + } else { + // Indicate that we would like to have some data + RequestType.SEND_FLOWFILES.writeRequestType(dos); + dos.flush(); + } } - void incrementTransferCount() { - transfers++; - } - void setDataAvailable(final boolean available) { - this.dataAvailable = available; + // TODO: UPDATE STATE + @Override + public DataPacket receive() throws IOException { + if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot receive data because Transaction State is " + state); + } + + if ( direction == TransferDirection.SEND ) { + throw new IllegalStateException("Attempting to receive data but started a SEND Transaction"); + } + + // if no data available, return null + if ( !dataAvailable ) { + return null; + } + + logger.debug("{} Receiving data from {}", this, peer); + final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc)); + + if ( packet != null ) { + transfers++; + + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + this.dataAvailable = true; + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + this.dataAvailable = false; + break; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + } + + return packet; } - boolean isDataAvailable() { - return dataAvailable; - } - TransferDirection getTransferDirection() { - return direction; + // TODO: UPDATE STATE + @Override + public void send(DataPacket dataPacket) throws IOException { + if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot send data because Transaction State is " + state); + } + + if ( direction == TransferDirection.RECEIVE ) { + throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction"); + } + + if ( transfers > 0 ) { + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } + + logger.debug("{} Sending data to {}", this, peer); + + final OutputStream out = new CheckedOutputStream(dos, crc); + codec.encode(dataPacket, out); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( compress ) { + out.close(); + } + + transfers++; } - DataOutputStream getDataOutputStream() { - return dos; - } - DataInputStream getDataInputStream() { - return dis; + // TODO: UPDATE STATE + @Override + public void cancel() { + if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED ) { + throw new IllegalStateException("Cannot cancel transaction because state is already " + state); + } + + // TODO: implement } - CheckedInputStream createCheckedInputStream() { - return new CheckedInputStream(dis, crc); - } - CheckedOutputStream createCheckedOutputStream() { - return new CheckedOutputStream(dos, crc); + // TODO: UPDATE STATE + @Override + public void complete(boolean applyBackPressure) throws IOException { + if ( state != TransactionState.TRANSACTION_CONFIRMED ) { + throw new IllegalStateException("Cannot complete transaction because state is " + state + + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); + } + + if ( direction == TransferDirection.RECEIVE ) { + if ( applyBackPressure ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); + } + } else { + final Response transactionResponse; + try { + transactionResponse = Response.read(dis); + } catch (final IOException e) { + throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + + "It is unknown whether or not the peer successfully received/processed the data.", e); + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(penaltyMillis); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + } } - Peer getPeer() { - return peer; + + // TODO: UPDATE STATE + @Override + public void confirm() throws IOException { + if ( state != TransactionState.DATA_EXCHANGED ) { + throw new IllegalStateException("Cannot confirm Transaction because state is " + state + + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED ); + } + + if ( direction == TransferDirection.RECEIVE ) { + if ( dataAvailable ) { + throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + final String calculatedCRC = String.valueOf(crc.getValue()); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); + + final Response confirmTransactionResponse = Response.read(dis); + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(dos); + + final String calculatedCRC = String.valueOf(crc.getValue()); + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(dis); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + // CRC was not used before version 4 + if ( protocolVersion > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(dos); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + } } + - String calculateCRC() { - return String.valueOf(crc.getValue()); + // TODO: UPDATE STATE + @Override + public TransactionState getState() { + return state; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java new file mode 100644 index 0000000..bd1b50c --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.MinimumLengthInputStream; + +public class StandardDataPacket implements DataPacket { + + private final Map<String, String> attributes; + private final InputStream stream; + private final long size; + + public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) { + this.attributes = attributes; + this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size); + this.size = size; + } + + public Map<String, String> getAttributes() { + return attributes; + } + + public InputStream getData() { + return stream; + } + + public long getSize() { + return size; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 82d8206..a51cdba 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -157,7 +157,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { logger.error(message); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; - } catch (final ProtocolException | HandshakeException | IOException e) { + } catch (final HandshakeException | IOException e) { final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString()); logger.error(message); if ( logger.isDebugEnabled() ) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 887429c..d4b9c2f 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -40,6 +40,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PortAuthorizationResult; import org.apache.nifi.remote.RemoteResourceFactory; @@ -53,8 +54,10 @@ import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ServerProtocol; +import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; @@ -304,7 +307,16 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); final StopWatch transferWatch = new StopWatch(true); - flowFile = codec.encode(flowFile, session, checkedOutputStream); + + final FlowFile toSend = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); + codec.encode(dataPacket, checkedOutputStream); + } + }); + final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); // need to close the CompressionOutputStream in order to force it write out any remaining bytes. @@ -427,7 +439,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis; final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); - FlowFile flowFile = codec.decode(checkedInputStream, session); + final DataPacket dataPacket = codec.decode(checkedInputStream); + FlowFile flowFile = session.create(); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
