http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java index 3f0ec4f..e7b6d06 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java @@ -19,27 +19,30 @@ package org.apache.nifi.remote.protocol; import java.io.InputStream; import java.util.Map; - /** - * Represents a piece of data that is to be sent to or that was received from a NiFi instance. + * Represents a piece of data that is to be sent to or that was received from a + * NiFi instance. */ public interface DataPacket { /** * The key-value attributes that are to be associated with the data - * @return + * + * @return all attributes + */ + Map<String, String> getAttributes(); + + /** + * An InputStream from which the content can be read + * + * @return input stream to the data */ - Map<String, String> getAttributes(); - - /** - * An InputStream from which the content can be read - * @return - */ - InputStream getData(); + InputStream getData(); - /** - * The length of the InputStream. - * @return - */ - long getSize(); + /** + * The length of the InputStream. + * + * @return length of the inputstream. + */ + long getSize(); }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java index 41dc276..016690c 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java @@ -16,46 +16,44 @@ */ package org.apache.nifi.remote.protocol.socket; - /** - * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol. + * Enumeration of Properties that can be used for the Site-to-Site Socket + * Protocol. */ public enum HandshakeProperty { + /** - * Boolean value indicating whether or not the contents of a FlowFile should be - * GZipped when transferred. + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. */ GZIP, - /** * The unique identifier of the port to communicate with */ PORT_IDENTIFIER, - /** - * Indicates the number of milliseconds after the request was made that the client - * will wait for a response. If no response has been received by the time this value - * expires, the server can move on without attempting to service the request because - * the client will have already disconnected. + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. */ REQUEST_EXPIRATION_MILLIS, - /** - * The preferred number of FlowFiles that the server should send to the client - * when pulling data. This property was introduced in version 5 of the protocol. + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. */ BATCH_COUNT, - /** - * The preferred number of bytes that the server should send to the client when - * pulling data. This property was introduced in version 5 of the protocol. + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. */ BATCH_SIZE, - /** - * The preferred amount of time that the server should send data to the client - * when pulling data. This property was introduced in version 5 of the protocol. - * Value is in milliseconds. + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. */ BATCH_DURATION; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java index eae1940..6ad2ba0 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java @@ -22,28 +22,29 @@ import java.io.IOException; import org.apache.nifi.remote.exception.ProtocolException; public class Response { + private final ResponseCode code; private final String message; - + private Response(final ResponseCode code, final String explanation) { this.code = code; this.message = explanation; } - + public ResponseCode getCode() { return code; } - + public String getMessage() { return message; } - + public static Response read(final DataInputStream in) throws IOException, ProtocolException { final ResponseCode code = ResponseCode.readCode(in); final String message = code.containsMessage() ? in.readUTF() : null; return new Response(code, message); } - + @Override public String toString() { return code + ": " + message; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java index 8860e73..0e1359e 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java @@ -23,131 +23,126 @@ import java.io.InputStream; import org.apache.nifi.remote.exception.ProtocolException; - public enum ResponseCode { + RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of - // ResponseCode, so that we can indicate a 0 followed by some other bytes - + // ResponseCode, so that we can indicate a 0 followed by some other bytes + // handshaking properties PROPERTIES_OK(1, "Properties OK", false), UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), MISSING_PROPERTY(232, "Missing Property", true), - // transaction indicators CONTINUE_TRANSACTION(10, "Continue Transaction", false), FINISH_TRANSACTION(11, "Finish Transaction", false), - CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum + CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum TRANSACTION_FINISHED(13, "Transaction Finished", false), TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), CANCEL_TRANSACTION(15, "Cancel Transaction", true), BAD_CHECKSUM(19, "Bad Checksum", false), - // data availability indicators MORE_DATA(20, "More Data Exists", false), NO_MORE_DATA(21, "No More Data Exists", false), - // port state indicators UNKNOWN_PORT(200, "Unknown Port", false), PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), - // authorization UNAUTHORIZED(240, "User Not Authorized", true), - // error indicators ABORT(250, "Abort", true), UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), END_OF_STREAM(255, "End of Stream", false); - + private static final ResponseCode[] codeArray = new ResponseCode[256]; - + static { - for ( final ResponseCode responseCode : ResponseCode.values() ) { + for (final ResponseCode responseCode : ResponseCode.values()) { codeArray[responseCode.getCode()] = responseCode; } } - + private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; private final int code; private final byte[] codeSequence; private final String description; private final boolean containsMessage; - + private ResponseCode(final int code, final String description, final boolean containsMessage) { - this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; + this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; this.code = code; this.description = description; this.containsMessage = containsMessage; } - + public int getCode() { return code; } - + public byte[] getCodeSequence() { return codeSequence; } - + @Override public String toString() { return description; } - + public boolean containsMessage() { return containsMessage; } - + public void writeResponse(final DataOutputStream out) throws IOException { - if ( containsMessage() ) { + if (containsMessage()) { throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); } - + out.write(getCodeSequence()); out.flush(); } - + public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { - if ( !containsMessage() ) { + if (!containsMessage()) { throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); } - + out.write(getCodeSequence()); out.writeUTF(explanation); out.flush(); } - + static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { final int byte1 = in.read(); - if ( byte1 < 0 ) { + if (byte1 < 0) { throw new EOFException(); - } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) { + } else if (byte1 != CODE_SEQUENCE_VALUE_1) { throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); } - + final int byte2 = in.read(); - if ( byte2 < 0 ) { + if (byte2 < 0) { throw new EOFException(); - } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) { + } else if (byte2 != CODE_SEQUENCE_VALUE_2) { throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); } final int byte3 = in.read(); - if ( byte3 < 0 ) { + if (byte3 < 0) { throw new EOFException(); } - + final ResponseCode responseCode = codeArray[byte3]; if (responseCode == null) { throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); } return responseCode; } - + public static ResponseCode fromSequence(final byte[] value) { final int code = value[3] & 0xFF; final ResponseCode responseCode = codeArray[code]; return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 83c5305..de845ee 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -58,120 +58,121 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketClientProtocol implements ClientProtocol { + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); private RemoteDestination destination; private boolean useCompression = false; - + private String commsIdentifier; private boolean handshakeComplete = false; - + private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); - + private Response handshakeResponse = null; private boolean readyForFileTransfer = false; private String transitUriPrefix = null; private int timeoutMillis = 30000; - + private int batchCount; private long batchSize; private long batchMillis; private EventReporter eventReporter; private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds - + public SocketClientProtocol() { } public void setPreferredBatchCount(final int count) { this.batchCount = count; } - + public void setPreferredBatchSize(final long bytes) { this.batchSize = bytes; } - + public void setPreferredBatchDuration(final long millis) { this.batchMillis = millis; } - + public void setEventReporter(final EventReporter eventReporter) { - this.eventReporter = eventReporter; + this.eventReporter = eventReporter; } - + public void setDestination(final RemoteDestination destination) { this.destination = destination; this.useCompression = destination.isUseCompression(); } - + public void setTimeout(final int timeoutMillis) { - this.timeoutMillis = timeoutMillis; + this.timeoutMillis = timeoutMillis; } - + @Override public void handshake(final Peer peer) throws IOException, HandshakeException { - handshake(peer, destination.getIdentifier()); + handshake(peer, destination.getIdentifier()); } - + public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException { - if ( handshakeComplete ) { + if (handshakeComplete) { throw new IllegalStateException("Handshake has already been completed"); } commsIdentifier = UUID.randomUUID().toString(); logger.debug("{} handshaking with {}", this, peer); - + final Map<HandshakeProperty, String> properties = new HashMap<>(); properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); - - if ( destinationId != null ) { - properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId); + + if (destinationId != null) { + properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId); } - - properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); - - if ( versionNegotiator.getVersion() >= 5 ) { - if ( batchCount > 0 ) { + + properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis)); + + if (versionNegotiator.getVersion() >= 5) { + if (batchCount > 0) { properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount)); } - if ( batchSize > 0L ) { + if (batchSize > 0L) { properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize)); } - if ( batchMillis > 0L ) { + if (batchMillis > 0L) { properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis)); } } - + final CommunicationsSession commsSession = peer.getCommunicationsSession(); commsSession.setTimeout(timeoutMillis); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - + dos.writeUTF(commsIdentifier); - - if ( versionNegotiator.getVersion() >= 3 ) { + + if (versionNegotiator.getVersion() >= 3) { dos.writeUTF(peer.getUrl()); transitUriPrefix = peer.getUrl(); - - if ( !transitUriPrefix.endsWith("/") ) { + + if (!transitUriPrefix.endsWith("/")) { transitUriPrefix = transitUriPrefix + "/"; } } - + logger.debug("Handshaking with properties {}", properties); dos.writeInt(properties.size()); - for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { + for (final Map.Entry<HandshakeProperty, String> entry : properties.entrySet()) { dos.writeUTF(entry.getKey().name()); dos.writeUTF(entry.getValue()); } - + dos.flush(); - + try { handshakeResponse = Response.read(dis); } catch (final ProtocolException e) { throw new HandshakeException(e); } - + switch (handshakeResponse.getCode()) { case PORT_NOT_IN_VALID_STATE: case UNKNOWN_PORT: @@ -181,71 +182,75 @@ public class SocketClientProtocol implements ClientProtocol { readyForFileTransfer = true; break; default: - logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { + logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[]{ this, handshakeResponse, peer}); peer.close(); throw new HandshakeException("Received unexpected response " + handshakeResponse); } - + logger.debug("{} Finished handshake with {}", this, peer); handshakeComplete = true; } - + + @Override public boolean isReadyForFileTransfer() { return readyForFileTransfer; } - + + @Override public boolean isPortInvalid() { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not completed successfully"); } return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; } - + + @Override public boolean isPortUnknown() { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not completed successfully"); } return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; } - + + @Override public boolean isDestinationFull() { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not completed successfully"); } return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; } - + @Override public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not been performed"); } - + logger.debug("{} Get Peer Statuses from {}", this, peer); final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - + RequestType.REQUEST_PEER_LIST.writeRequestType(dos); dos.flush(); final int numPeers = dis.readInt(); final Set<PeerStatus> peers = new HashSet<>(numPeers); - for (int i=0; i < numPeers; i++) { + for (int i = 0; i < numPeers; i++) { final String hostname = dis.readUTF(); final int port = dis.readInt(); final boolean secure = dis.readBoolean(); final int flowFileCount = dis.readInt(); peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount)); } - + logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); return peers; } - + @Override public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not been performed"); } @@ -255,177 +260,174 @@ public class SocketClientProtocol implements ClientProtocol { final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); - + FlowFileCodec codec = new StandardFlowFileCodec(); try { codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos); } catch (HandshakeException e) { throw new ProtocolException(e.toString()); } - logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); + logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[]{this, codec, commsSession}); return codec; } - @Override public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException { - if ( !handshakeComplete ) { + if (!handshakeComplete) { throw new IllegalStateException("Handshake has not been performed"); } - if ( !readyForFileTransfer ) { + if (!readyForFileTransfer) { throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse); } - - return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, - direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter); - } + return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, + direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter); + } @Override public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - final String userDn = peer.getCommunicationsSession().getUserDn(); - final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); - - final StopWatch stopWatch = new StopWatch(true); - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - - while (true) { - final long start = System.nanoTime(); - final DataPacket dataPacket = transaction.receive(); - if ( dataPacket == null ) { - if ( flowFilesReceived.isEmpty() ) { - peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS)); - } - break; - } - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); - flowFile = session.importFrom(dataPacket.getData(), flowFile); - final long receiveNanos = System.nanoTime() - start; - - String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); - if ( sourceFlowFileIdentifier == null ) { - sourceFlowFileIdentifier = "<Unknown Identifier>"; - } - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); - - session.transfer(flowFile, Relationship.ANONYMOUS); - bytesReceived += dataPacket.getSize(); - } - - // Confirm that what we received was the correct data. - transaction.confirm(); - - // Commit the session so that we have persisted the data - session.commit(); - - transaction.complete(); - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - - if ( !flowFilesReceived.isEmpty() ) { - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate }); - } - - return flowFilesReceived.size(); + final String userDn = peer.getCommunicationsSession().getUserDn(); + final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); + + final StopWatch stopWatch = new StopWatch(true); + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + + while (true) { + final long start = System.nanoTime(); + final DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + if (flowFilesReceived.isEmpty()) { + peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + } + break; + } + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + final long receiveNanos = System.nanoTime() - start; + + String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); + if (sourceFlowFileIdentifier == null) { + sourceFlowFileIdentifier = "<Unknown Identifier>"; + } + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); + + session.transfer(flowFile, Relationship.ANONYMOUS); + bytesReceived += dataPacket.getSize(); + } + + // Confirm that what we received was the correct data. + transaction.confirm(); + + // Commit the session so that we have persisted the data + session.commit(); + + transaction.complete(); + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + + if (!flowFilesReceived.isEmpty()) { + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } + + return flowFilesReceived.size(); } - @Override public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return 0; - } - - try { - final String userDn = peer.getCommunicationsSession().getUserDn(); - final long startSendingNanos = System.nanoTime(); - final StopWatch stopWatch = new StopWatch(true); - long bytesSent = 0L; - - final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND); - - final Set<FlowFile> flowFilesSent = new HashSet<>(); - boolean continueTransaction = true; - while (continueTransaction) { - final long startNanos = System.nanoTime(); - // call codec.encode within a session callback so that we have the InputStream to read the FlowFile - final FlowFile toWrap = flowFile; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize()); - transaction.send(dataPacket); - } - }); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - logger.debug("{} Sent {} to {}", this, flowFile, peer); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); - session.remove(flowFile); - - final long sendingNanos = System.nanoTime() - startSendingNanos; - if ( sendingNanos < BATCH_SEND_NANOS ) { - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - } - - transaction.confirm(); - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - - session.commit(); - transaction.complete(); - - final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - - return flowFilesSent.size(); - } catch (final Exception e) { - session.rollback(); - throw e; - } + FlowFile flowFile = session.get(); + if (flowFile == null) { + return 0; + } + + try { + final String userDn = peer.getCommunicationsSession().getUserDn(); + final long startSendingNanos = System.nanoTime(); + final StopWatch stopWatch = new StopWatch(true); + long bytesSent = 0L; + + final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND); + + final Set<FlowFile> flowFilesSent = new HashSet<>(); + boolean continueTransaction = true; + while (continueTransaction) { + final long startNanos = System.nanoTime(); + // call codec.encode within a session callback so that we have the InputStream to read the FlowFile + final FlowFile toWrap = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize()); + transaction.send(dataPacket); + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + logger.debug("{} Sent {} to {}", this, flowFile, peer); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startSendingNanos; + if (sendingNanos < BATCH_SEND_NANOS) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + } + + transaction.confirm(); + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + + session.commit(); + transaction.complete(); + + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesSent.size(); + } catch (final Exception e) { + session.rollback(); + throw e; + } } - - + @Override public VersionNegotiator getVersionNegotiator() { return versionNegotiator; } - + @Override public void shutdown(final Peer peer) throws IOException { readyForFileTransfer = false; final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - + logger.debug("{} Shutting down with {}", this, peer); // Indicate that we would like to have some data RequestType.SHUTDOWN.writeRequestType(dos); @@ -436,7 +438,7 @@ public class SocketClientProtocol implements ClientProtocol { public String getResourceName() { return "SocketFlowFileProtocol"; } - + @Override public String toString() { return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index e69104f..e83ea28 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -45,50 +45,51 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketClientTransaction implements Transaction { - private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); - - private final long creationNanoTime = System.nanoTime(); - private final CRC32 crc = new CRC32(); - private final int protocolVersion; - private final FlowFileCodec codec; - private final DataInputStream dis; - private final DataOutputStream dos; - private final TransferDirection direction; - private final boolean compress; - private final Peer peer; - private final int penaltyMillis; - private final String destinationId; - private final EventReporter eventReporter; - - private boolean dataAvailable = false; - private int transfers = 0; - private long contentBytes = 0; - private TransactionState state; - - SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, - final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException { - this.protocolVersion = protocolVersion; - this.destinationId = destinationId; - this.peer = peer; - this.codec = codec; - this.direction = direction; - this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); - this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); - this.compress = useCompression; - this.state = TransactionState.TRANSACTION_STARTED; - this.penaltyMillis = penaltyMillis; - this.eventReporter = eventReporter; - - initialize(); - } - - private void initialize() throws IOException { - try { - if ( direction == TransferDirection.RECEIVE ) { + + private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); + + private final long creationNanoTime = System.nanoTime(); + private final CRC32 crc = new CRC32(); + private final int protocolVersion; + private final FlowFileCodec codec; + private final DataInputStream dis; + private final DataOutputStream dos; + private final TransferDirection direction; + private final boolean compress; + private final Peer peer; + private final int penaltyMillis; + private final String destinationId; + private final EventReporter eventReporter; + + private boolean dataAvailable = false; + private int transfers = 0; + private long contentBytes = 0; + private TransactionState state; + + SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, + final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException { + this.protocolVersion = protocolVersion; + this.destinationId = destinationId; + this.peer = peer; + this.codec = codec; + this.direction = direction; + this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); + this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); + this.compress = useCompression; + this.state = TransactionState.TRANSACTION_STARTED; + this.penaltyMillis = penaltyMillis; + this.eventReporter = eventReporter; + + initialize(); + } + + private void initialize() throws IOException { + try { + if (direction == TransferDirection.RECEIVE) { // Indicate that we would like to have some data RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); dos.flush(); - + final Response dataAvailableCode = Response.read(dis); switch (dataAvailableCode.getCode()) { case MORE_DATA: @@ -102,39 +103,38 @@ public class SocketClientTransaction implements Transaction { default: throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); } - + } else { // Indicate that we would like to have some data RequestType.SEND_FLOWFILES.writeRequestType(dos); dos.flush(); } - } catch (final Exception e) { - error(); - throw e; - } - } - - - @Override - public DataPacket receive() throws IOException { - try { - try { - if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state); - } - - if ( direction == TransferDirection.SEND ) { - throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction"); - } - - // if we already know there's no data, just return null - if ( !dataAvailable ) { - return null; - } - - // if we have already received a packet, check if another is available. - if ( transfers > 0 ) { - // Determine if Peer will send us data or has no data to send us + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public DataPacket receive() throws IOException { + try { + try { + if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state); + } + + if (direction == TransferDirection.SEND) { + throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction"); + } + + // if we already know there's no data, just return null + if (!dataAvailable) { + return null; + } + + // if we have already received a packet, check if another is available. + if (transfers > 0) { + // Determine if Peer will send us data or has no data to send us final Response dataAvailableCode = Response.read(dis); switch (dataAvailableCode.getCode()) { case CONTINUE_TRANSACTION: @@ -149,170 +149,166 @@ public class SocketClientTransaction implements Transaction { throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode); } } - - // if no data available, return null - if ( !dataAvailable ) { - return null; - } - + + // if no data available, return null + if (!dataAvailable) { + return null; + } + logger.debug("{} Receiving data from {}", this, peer); final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis; final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); - - if ( packet == null ) { + + if (packet == null) { this.dataAvailable = false; } else { - transfers++; - contentBytes += packet.getSize(); + transfers++; + contentBytes += packet.getSize(); } - + this.state = TransactionState.DATA_EXCHANGED; return packet; - } catch (final IOException ioe) { - throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - - @Override - public void send(final byte[] content, final Map<String, String> attributes) throws IOException { - send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length)); - } - - @Override - public void send(final DataPacket dataPacket) throws IOException { - try { - try { - if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state); - } - - if ( direction == TransferDirection.RECEIVE ) { + } catch (final IOException ioe) { + throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public void send(final byte[] content, final Map<String, String> attributes) throws IOException { + send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length)); + } + + @Override + public void send(final DataPacket dataPacket) throws IOException { + try { + try { + if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state); + } + + if (direction == TransferDirection.RECEIVE) { throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction"); } - - if ( transfers > 0 ) { + + if (transfers > 0) { ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); } - + logger.debug("{} Sending data to {}", this, peer); - + final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos; - final OutputStream out = new CheckedOutputStream(dataOut, crc); + final OutputStream out = new CheckedOutputStream(dataOut, crc); codec.encode(dataPacket, out); - + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. // Otherwise, do NOT close it because we don't want to close the underlying stream // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( compress ) { - out.close(); + if (compress) { + out.close(); } - + transfers++; contentBytes += dataPacket.getSize(); this.state = TransactionState.DATA_EXCHANGED; - } catch (final IOException ioe) { - throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - - @Override - public void cancel(final String explanation) throws IOException { - if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) { - throw new IllegalStateException("Cannot cancel transaction because state is already " + state); - } - - try { - ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation); - state = TransactionState.TRANSACTION_CANCELED; - } catch (final IOException ioe) { - error(); - throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe); - } - } - - - @Override - public TransactionCompletion complete() throws IOException { - try { - try { - if ( state != TransactionState.TRANSACTION_CONFIRMED ) { - throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + - "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); - } - - boolean backoff = false; - if ( direction == TransferDirection.RECEIVE ) { - if ( transfers == 0 ) { - state = TransactionState.TRANSACTION_COMPLETED; - return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime); - } - + } catch (final IOException ioe) { + throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public void cancel(final String explanation) throws IOException { + if (state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) { + throw new IllegalStateException("Cannot cancel transaction because state is already " + state); + } + + try { + ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation); + state = TransactionState.TRANSACTION_CANCELED; + } catch (final IOException ioe) { + error(); + throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe); + } + } + + @Override + public TransactionCompletion complete() throws IOException { + try { + try { + if (state != TransactionState.TRANSACTION_CONFIRMED) { + throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); + } + + boolean backoff = false; + if (direction == TransferDirection.RECEIVE) { + if (transfers == 0) { + state = TransactionState.TRANSACTION_COMPLETED; + return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime); + } + // Confirm that we received the data and the peer can now discard it logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - + state = TransactionState.TRANSACTION_COMPLETED; } else { final Response transactionResponse; try { transactionResponse = Response.read(dis); } catch (final IOException e) { - throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + - "It is unknown whether or not the peer successfully received/processed the data.", e); + throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + + "It is unknown whether or not the peer successfully received/processed the data.", e); } - + logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { peer.penalize(destinationId, penaltyMillis); backoff = true; - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse); } - + state = TransactionState.TRANSACTION_COMPLETED; } - - return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime); - } catch (final IOException ioe) { - throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - - @Override - public void confirm() throws IOException { - try { - try { - if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) { - // client requested to receive data but no data available. no need to confirm. - state = TransactionState.TRANSACTION_CONFIRMED; - return; - } - - if ( state != TransactionState.DATA_EXCHANGED ) { - throw new IllegalStateException("Cannot confirm Transaction because state is " + state + - "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED ); - } - - if ( direction == TransferDirection.RECEIVE ) { - if ( dataAvailable ) { + + return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime); + } catch (final IOException ioe) { + throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public void confirm() throws IOException { + try { + try { + if (state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE) { + // client requested to receive data but no data available. no need to confirm. + state = TransactionState.TRANSACTION_CONFIRMED; + return; + } + + if (state != TransactionState.DATA_EXCHANGED) { + throw new IllegalStateException("Cannot confirm Transaction because state is " + state + + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED); + } + + if (direction == TransferDirection.RECEIVE) { + if (dataAvailable) { throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); } - + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message // to peer so that we can verify that the connection is still open. This is a two-phase commit, // which helps to prevent the chances of data duplication. Without doing this, we may commit the @@ -323,84 +319,88 @@ public class SocketClientTransaction implements Transaction { logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); final String calculatedCRC = String.valueOf(crc.getValue()); ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - + final Response confirmTransactionResponse; try { confirmTransactionResponse = Response.read(dis); } catch (final IOException ioe) { logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer); - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction"); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction"); } throw ioe; } - + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - + switch (confirmTransactionResponse.getCode()) { case CONFIRM_TRANSACTION: break; case BAD_CHECKSUM: throw new IOException(this + " Received a BadChecksum response from peer " + peer); default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); } - + state = TransactionState.TRANSACTION_CONFIRMED; } else { logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - + final String calculatedCRC = String.valueOf(crc.getValue()); - + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response final Response transactionConfirmationResponse = Response.read(dis); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { // Confirm checksum and echo back the confirmation. logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); final String receivedCRC = transactionConfirmationResponse.getMessage(); - + // CRC was not used before version 4 - if ( protocolVersion > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { + if (protocolVersion > 3) { + if (!receivedCRC.equals(calculatedCRC)) { ResponseCode.BAD_CHECKSUM.writeResponse(dos); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + + calculatedCRC + " while peer calculated CRC32 Checksum as " + + receivedCRC + "; canceling transaction and rolling back session"); } } - + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + + peer + " but received " + transactionConfirmationResponse); } - + state = TransactionState.TRANSACTION_CONFIRMED; } - } catch (final IOException ioe) { - throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - @Override - public void error() { - this.state = TransactionState.ERROR; - } - - @Override - public TransactionState getState() { - return state; - } - - @Override - public Communicant getCommunicant() { - return peer; - } - + } catch (final IOException ioe) { + throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public void error() { + this.state = TransactionState.ERROR; + } + + @Override + public TransactionState getState() { + return state; + } + + @Override + public Communicant getCommunicant() { + return peer; + } + @Override public String toString() { - return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; + return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java index 5eb6c91..bd95013 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java @@ -26,7 +26,7 @@ public class SocketClientTransactionCompletion implements TransactionCompletion private final int dataPacketsTransferred; private final long bytesTransferred; private final long durationNanos; - + public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) { this.backoff = backoff; this.dataPacketsTransferred = dataPacketsTransferred; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java index 10352ec..d746abf 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java @@ -32,43 +32,44 @@ import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; public class NiFiRestApiUtil { + public static final int RESPONSE_CODE_OK = 200; - + private final SSLContext sslContext; - + public NiFiRestApiUtil(final SSLContext sslContext) { this.sslContext = sslContext; } - + private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException { final URL url = new URL(connUrl); final HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setConnectTimeout(timeoutMillis); connection.setReadTimeout(timeoutMillis); - + // special handling for https if (sslContext != null && connection instanceof HttpsURLConnection) { HttpsURLConnection secureConnection = (HttpsURLConnection) connection; secureConnection.setSSLSocketFactory(sslContext.getSocketFactory()); // check the trusted hostname property and override the HostnameVerifier - secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(), + secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(), secureConnection.getHostnameVerifier())); } - + return connection; } - + public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException { final HttpURLConnection connection = getConnection(url, timeoutMillis); connection.setRequestMethod("GET"); final int responseCode = connection.getResponseCode(); - + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); StreamUtils.copy(connection.getInputStream(), baos); final String responseMessage = baos.toString(); - - if ( responseCode == RESPONSE_CODE_OK ) { + + if (responseCode == RESPONSE_CODE_OK) { final ObjectMapper mapper = new ObjectMapper(); final JsonNode jsonNode = mapper.readTree(responseMessage); final JsonNode controllerNode = jsonNode.get("controller"); @@ -77,8 +78,9 @@ public class NiFiRestApiUtil { throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage); } } - + private static class OverrideHostnameVerifier implements HostnameVerifier { + private final String trustedHostname; private final HostnameVerifier delegate; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java index 6dab77b..c52b4b7 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java @@ -21,7 +21,8 @@ import java.util.Set; import org.apache.nifi.remote.PeerStatus; public class PeerStatusCache { - private final Set<PeerStatus> statuses; + + private final Set<PeerStatus> statuses; private final long timestamp; public PeerStatusCache(final Set<PeerStatus> statuses) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java index bd1b50c..70bb324 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java @@ -25,26 +25,26 @@ import org.apache.nifi.stream.io.MinimumLengthInputStream; public class StandardDataPacket implements DataPacket { - private final Map<String, String> attributes; - private final InputStream stream; - private final long size; - - public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) { - this.attributes = attributes; - this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size); - this.size = size; - } - - public Map<String, String> getAttributes() { - return attributes; - } - - public InputStream getData() { - return stream; - } - - public long getSize() { - return size; - } - + private final Map<String, String> attributes; + private final InputStream stream; + private final long size; + + public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) { + this.attributes = attributes; + this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size); + this.size = size; + } + + public Map<String, String> getAttributes() { + return attributes; + } + + public InputStream getData() { + return stream; + } + + public long getSize() { + return size; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java index c5cca78..8336745 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -40,11 +40,11 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE); - for ( final PeerStatus peerStatus : destinations ) { + for (final PeerStatus peerStatus : destinations) { System.out.println(peerStatus.getPeerDescription()); } } - + @Test public void testFormulateDestinationListForOutputHugeDifference() throws IOException { final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); @@ -54,14 +54,11 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE); - for ( final PeerStatus peerStatus : destinations ) { + for (final PeerStatus peerStatus : destinations) { System.out.println(peerStatus.getPeerDescription()); } } - - - - + @Test public void testFormulateDestinationListForInputPorts() throws IOException { final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); @@ -74,11 +71,11 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); - for ( final PeerStatus peerStatus : destinations ) { + for (final PeerStatus peerStatus : destinations) { System.out.println(peerStatus.getPeerDescription()); } } - + @Test public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); @@ -88,7 +85,7 @@ public class TestEndpointConnectionStatePool { clusterNodeInfo.setNodeInformation(collection); final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); - for ( final PeerStatus peerStatus : destinations ) { + for (final PeerStatus peerStatus : destinations) { System.out.println(peerStatus.getPeerDescription()); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index b73e44d..155fc95 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; @@ -39,32 +38,32 @@ public class TestSiteToSiteClient { @Ignore("For local testing only; not really a unit test but a manual test") public void testReceive() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); - + final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("cba") - .requestBatchCount(10) - .build(); - + .url("http://localhost:8080/nifi") + .portName("cba") + .requestBatchCount(10) + .build(); + try { - for (int i=0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); Assert.assertNotNull(transaction); - + DataPacket packet; while (true) { packet = transaction.receive(); - if ( packet == null ) { + if (packet == null) { break; } final InputStream in = packet.getData(); final long size = packet.getSize(); final byte[] buff = new byte[(int) size]; - + StreamUtils.fillBuffer(in, buff); } - + transaction.confirm(); transaction.complete(); } @@ -72,34 +71,33 @@ public class TestSiteToSiteClient { client.close(); } } - - + @Test @Ignore("For local testing only; not really a unit test but a manual test") public void testSend() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); - + final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("input") - .build(); - + .url("http://localhost:8080/nifi") + .portName("input") + .build(); + try { final Transaction transaction = client.createTransaction(TransferDirection.SEND); Assert.assertNotNull(transaction); - + final Map<String, String> attrs = new HashMap<>(); attrs.put("site-to-site", "yes, please!"); final byte[] bytes = "Hello".getBytes(); final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); transaction.send(packet); - + transaction.confirm(); transaction.complete(); } finally { client.close(); } } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java index 172c593..cc24575 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java @@ -33,10 +33,6 @@ import org.apache.commons.lang3.builder.ToStringStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - * @author none - */ public abstract class AbstractChannelReader implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class); @@ -91,12 +87,12 @@ public abstract class AbstractChannelReader implements Runnable { * Allows a subclass to specifically handle how it reads from the given * key's channel into the given buffer. * - * @param key - * @param buffer + * @param key of channel to read from + * @param buffer to fill * @return the number of bytes read in the final read cycle. A value of zero * or more indicates the channel is still open but a value of -1 indicates * end of stream. - * @throws IOException + * @throws IOException if reading from channel causes failure */ protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java index a413ad2..007034b 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java @@ -25,10 +25,6 @@ import java.util.concurrent.LinkedBlockingDeque; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - * @author none - */ public class BufferPool implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class); @@ -50,9 +46,9 @@ public class BufferPool implements Runnable { /** * Returns the given buffer to the pool - and clears it. * - * @param buffer - * @param bytesProcessed - * @return + * @param buffer buffer to return + * @param bytesProcessed bytes processed for this buffer being returned + * @return true if buffer returned to pool */ public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) { totalBytesExtracted += bytesProcessed; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java index 2ae2c07..824f2df 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java @@ -35,10 +35,6 @@ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - * @author none - */ public final class ChannelDispatcher implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class); @@ -81,8 +77,8 @@ public final class ChannelDispatcher implements Runnable { /* * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all * channels' keys. - * - * @throws IOException + * + * @throws IOException if unable to select keys */ private void selectServerSocketKeys() throws IOException { int numSelected = serverSocketSelector.select(timeout); @@ -121,8 +117,8 @@ public final class ChannelDispatcher implements Runnable { * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that * the selected key set is not manually changed via a remove operation. - * - * @throws IOException + * + * @throws IOException if unable to select keys */ private void selectSocketChannelKeys() throws IOException { // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return. @@ -138,7 +134,7 @@ public final class ChannelDispatcher implements Runnable { // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However, // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only - // way to tell if it's new is the lack of an attachment. + // way to tell if it's new is the lack of an attachment. if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) { reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory); socketChannelKey.attach(reader); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java index b0a1cfb..7cbf589 100644 --- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java +++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java @@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory; * All ChannelReaders will get throttled by the unavailability of buffers in the * provided BufferPool. This is designed to create back pressure. * - * @author none */ public final class ChannelListener { @@ -99,7 +98,7 @@ public final class ChannelListener { * @param port - port to bind to * @param receiveBufferSize - size of OS receive buffer to request. If less * than 0 then will not be set and OS default will win. - * @throws IOException + * @throws IOException if unable to add socket */ public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) throws IOException { @@ -129,7 +128,7 @@ public final class ChannelListener { * @param port - the port to listen on * @param receiveBufferSize - the number of bytes to request for a receive * buffer from OS - * @throws IOException + * @throws IOException if unable to add channel */ public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) throws IOException { @@ -156,7 +155,7 @@ public final class ChannelListener { * any network interface on the local host. * @param sendingPort - the port used by the sender of datagrams. Only * datagrams from this port will be received. - * @throws IOException + * @throws IOException if unable to add channel */ public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost, final Integer sendingPort) throws IOException {
