http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 021531f..67f28d2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -68,10 +68,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardRootGroupPort extends AbstractPort implements RootGroupPort { + private static final String CATEGORY = "Site to Site"; - + private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class); - + private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference<Set<String>>(new HashSet<String>()); private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<Set<String>>(new HashSet<String>()); private final ProcessScheduler processScheduler; @@ -82,18 +83,18 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort private final EventReporter eventReporter; private final ProcessScheduler scheduler; private final Set<Relationship> relationships; - + private final BlockingQueue<FlowFileRequest> requestQueue = new ArrayBlockingQueue<>(1000); - + private final Set<FlowFileRequest> activeRequests = new HashSet<>(); private final Lock requestLock = new ReentrantLock(); private boolean shutdown = false; // guarded by requestLock - public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup, + public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup, final TransferDirection direction, final ConnectableType type, final UserService userService, final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure) { super(id, name, processGroup, type, scheduler); - + this.processScheduler = scheduler; setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); this.userService = userService; @@ -110,20 +111,20 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message)); } }; - + relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.<Relationship>emptySet(); } - + @Override public Collection<Relationship> getRelationships() { - return relationships; + return relationships; } - + @Override public boolean isTriggerWhenEmpty() { return true; } - + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { final FlowFileRequest flowFileRequest; @@ -132,29 +133,29 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } catch (final InterruptedException ie) { return; } - - if ( flowFileRequest == null ) { + + if (flowFileRequest == null) { return; } flowFileRequest.setServiceBegin(); - + requestLock.lock(); try { - if ( shutdown ) { + if (shutdown) { final CommunicationsSession commsSession = flowFileRequest.getPeer().getCommunicationsSession(); - if ( commsSession != null ) { + if (commsSession != null) { commsSession.interrupt(); } } - + activeRequests.add(flowFileRequest); } finally { requestLock.unlock(); } - + final ProcessSession session = sessionFactory.createSession(); - + try { onTrigger(context, session, flowFileRequest); // we leave the session open, because we send it back to the caller of #receiveFlowFile or #transmitFlowFile, @@ -162,11 +163,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } catch (final TransmissionDisabledException e) { session.rollback(); } catch (final Exception e) { - logger.error("{} Failed to process data due to {}", new Object[] {this, e}); - if ( logger.isDebugEnabled() ) { + logger.error("{} Failed to process data due to {}", new Object[]{this, e}); + if (logger.isDebugEnabled()) { logger.error("", e); } - + session.rollback(); } finally { requestLock.lock(); @@ -177,50 +178,50 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } } } - + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { // nothing to do here -- we will never get called because we override onTrigger(ProcessContext, ProcessSessionFactory) } - + private void onTrigger(final ProcessContext context, final ProcessSession session, final FlowFileRequest flowFileRequest) { final ServerProtocol protocol = flowFileRequest.getProtocol(); final BlockingQueue<ProcessingResult> responseQueue = flowFileRequest.getResponseQueue(); - if ( flowFileRequest.isExpired() ) { + if (flowFileRequest.isExpired()) { final String message = String.format("%s Cannot service request from %s because the request has timed out", this, flowFileRequest.getPeer()); logger.warn(message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); - + responseQueue.add(new ProcessingResult(new RequestExpiredException())); return; } - + final Peer peer = flowFileRequest.getPeer(); final CommunicationsSession commsSession = peer.getCommunicationsSession(); final String sourceDn = commsSession.getUserDn(); logger.debug("{} Servicing request for {} (DN={})", this, peer, sourceDn); - + final PortAuthorizationResult authorizationResult = checkUserAuthorization(sourceDn); - if ( !authorizationResult.isAuthorized() ) { - final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s", - this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation()); + if (!authorizationResult.isAuthorized()) { + final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s", + this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation()); logger.error(message); eventReporter.reportEvent(Severity.ERROR, CATEGORY, message); - + responseQueue.add(new ProcessingResult(new NotAuthorizedException(authorizationResult.getExplanation()))); return; } final FlowFileCodec codec = protocol.getPreNegotiatedCodec(); - if ( codec == null ) { + if (codec == null) { responseQueue.add(new ProcessingResult(new BadRequestException("None of the supported FlowFile Codecs supplied is compatible with this instance"))); return; } final int transferCount; - + try { - if ( getConnectableType() == ConnectableType.INPUT_PORT ) { + if (getConnectableType() == ConnectableType.INPUT_PORT) { transferCount = receiveFlowFiles(context, session, codec, flowFileRequest); } else { transferCount = transferFlowFiles(context, session, codec, flowFileRequest); @@ -233,58 +234,55 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } catch (final Exception e) { session.rollback(); responseQueue.add(new ProcessingResult(e)); - + return; } - + session.commit(); responseQueue.add(new ProcessingResult(transferCount)); } - private int transferFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest request) throws IOException, ProtocolException { return request.getProtocol().transferFlowFiles(request.getPeer(), context, session, codec); } - private int receiveFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest receiveRequest) throws IOException, ProtocolException { return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec); } - + @Override public boolean isValid() { return (getConnectableType() == ConnectableType.INPUT_PORT) ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true; } - + @Override public Collection<ValidationResult> getValidationErrors() { final Collection<ValidationResult> validationErrors = new ArrayList<>(); if (!isValid()) { final ValidationResult error = new ValidationResult.Builder() - .explanation(String.format("Output connection for port '%s' is not defined.", getName())) - .subject(String.format("Port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Output connection for port '%s' is not defined.", getName())) + .subject(String.format("Port '%s'", getName())) + .valid(false) + .build(); validationErrors.add(error); } return validationErrors; } - - + @Override public boolean isTransmitting() { - if ( !isRunning() ) { + if (!isRunning()) { return false; } - - if ( processScheduler.getActiveThreadCount(this) > 0 ) { + + if (processScheduler.getActiveThreadCount(this) > 0) { return true; } - - if ( requestQueue.isEmpty() ) { + + if (requestQueue.isEmpty()) { return false; } - + requestLock.lock(); try { return !activeRequests.isEmpty(); @@ -316,14 +314,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort @Override public void shutdown() { super.shutdown(); - + requestLock.lock(); try { this.shutdown = true; - - for ( final FlowFileRequest request : activeRequests ) { + + for (final FlowFileRequest request : activeRequests) { final CommunicationsSession commsSession = request.getPeer().getCommunicationsSession(); - if ( commsSession != null ) { + if (commsSession != null) { commsSession.interrupt(); } } @@ -331,11 +329,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort requestLock.unlock(); } } - + @Override public void onSchedulingStart() { super.onSchedulingStart(); - + requestLock.lock(); try { shutdown = false; @@ -343,14 +341,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort requestLock.unlock(); } } - + @Override public PortAuthorizationResult checkUserAuthorization(final String dn) { - if ( !secure ) { + if (!secure) { return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure"); } - if ( dn == null ) { + if (dn == null) { final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn); logger.warn(message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); @@ -359,9 +357,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort try { final NiFiUser user = userService.checkAuthorization(dn); - + final Set<Authority> authorities = user.getAuthorities(); - if ( !authorities.contains(Authority.ROLE_NIFI) ) { + if (!authorities.contains(Authority.ROLE_NIFI)) { final String message = String.format("%s authorization failed for user %s because the user does not have Role NiFi", this, dn); logger.warn(message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); @@ -369,12 +367,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } final Set<String> allowedUsers = userAccessControl.get(); - if ( allowedUsers.contains(dn) ) { + if (allowedUsers.contains(dn)) { return new StandardPortAuthorizationResult(true, "User is Authorized"); } final String userGroup = user.getUserGroup(); - if ( userGroup == null ) { + if (userGroup == null) { final String message = String.format("%s authorization failed for user %s because the user does not have a group and is not in the set of Allowed Users for this Port", this, dn); logger.warn(message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); @@ -383,13 +381,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort final Set<String> allowedGroups = groupAccessControl.get(); final boolean allowed = allowedGroups.contains(userGroup); - if ( !allowed ) { - final String message = String.format("%s authorization failed for user %s because the user is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn); + if (!allowed) { + final String message = String.format("%s authorization failed for user %s because the user " + + "is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn); logger.warn(message); eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + this.toString()); } - + return new StandardPortAuthorizationResult(true, "User is part of group '" + userGroup + "', which is Authorized to communicate with " + this.toString()); } catch (final AccountNotFoundException anfe) { final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn); @@ -418,145 +417,145 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort return new StandardPortAuthorizationResult(false, "Authorization failed because " + e); } } - + public static class StandardPortAuthorizationResult implements PortAuthorizationResult { + private final boolean isAuthorized; private final String explanation; - + public StandardPortAuthorizationResult(final boolean isAuthorized, final String explanation) { this.isAuthorized = isAuthorized; this.explanation = explanation; } - + @Override public boolean isAuthorized() { return isAuthorized; } - + @Override public String getExplanation() { return explanation; } } - - + private static class ProcessingResult { + private final int fileCount; private final Exception problem; - + public ProcessingResult(final int fileCount) { this.fileCount = fileCount; this.problem = null; } - + public ProcessingResult(final Exception problem) { this.fileCount = 0; this.problem = problem; } - + public Exception getProblem() { return problem; } - + public int getFileCount() { return fileCount; } } - - private static class FlowFileRequest { + private final Peer peer; private final ServerProtocol protocol; private final BlockingQueue<ProcessingResult> queue; private final long creationTime; private final AtomicBoolean beingServiced = new AtomicBoolean(false); - + public FlowFileRequest(final Peer peer, final ServerProtocol protocol) { this.creationTime = System.currentTimeMillis(); this.peer = peer; this.protocol = protocol; this.queue = new ArrayBlockingQueue<>(1); } - - + public void setServiceBegin() { this.beingServiced.set(true); } - + public boolean isBeingServiced() { return beingServiced.get(); } - + public BlockingQueue<ProcessingResult> getResponseQueue() { return queue; } - + public Peer getPeer() { return peer; } - + public ServerProtocol getProtocol() { return protocol; } - + public boolean isExpired() { // use double the protocol's expiration because the sender may send data for a bit before // the timeout starts being counted, and we don't want to timeout before the sender does. // is this a good idea...??? long expiration = protocol.getRequestExpiration() * 2; - if ( expiration <= 0L ) { + if (expiration <= 0L) { return false; } - + if (expiration < 500L) { expiration = 500L; } - + return System.currentTimeMillis() > creationTime + expiration; } } - @Override - public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException { - if ( getConnectableType() != ConnectableType.INPUT_PORT ) { + public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) + throws NotAuthorizedException, BadRequestException, RequestExpiredException { + if (getConnectableType() != ConnectableType.INPUT_PORT) { throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port"); } - if ( !this.isRunning() ) { + if (!this.isRunning()) { throw new IllegalStateException("Port not running"); } - + try { final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol); - if ( !this.requestQueue.offer(request) ) { + if (!this.requestQueue.offer(request)) { throw new RequestExpiredException(); } - + // Trigger this port to run. scheduler.registerEvent(this); - + // Get a response from the response queue but don't wait forever if the port is stopped ProcessingResult result = null; - + // wait for the request to start getting serviced... and time out if it doesn't happen // before the request expires - while ( !request.isBeingServiced() ) { - if ( request.isExpired() ) { + while (!request.isBeingServiced()) { + if (request.isExpired()) { throw new SocketTimeoutException("Read timed out"); } else { try { Thread.sleep(100L); - } catch (final InterruptedException e) {} + } catch (final InterruptedException e) { + } } } // we've started to service the request. Now just wait until it's finished result = request.getResponseQueue().take(); - + final Exception problem = result.getProblem(); - if ( problem == null ) { + if (problem == null) { return result.getFileCount(); } else { throw problem; @@ -571,44 +570,46 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort } @Override - public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException { - if ( getConnectableType() != ConnectableType.OUTPUT_PORT ) { + public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) + throws NotAuthorizedException, BadRequestException, RequestExpiredException { + if (getConnectableType() != ConnectableType.OUTPUT_PORT) { throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port"); } - - if ( !this.isRunning() ) { + + if (!this.isRunning()) { throw new IllegalStateException("Port not running"); } try { final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol); - if ( !this.requestQueue.offer(request) ) { + if (!this.requestQueue.offer(request)) { throw new RequestExpiredException(); } // Trigger this port to run scheduler.registerEvent(this); - + // Get a response from the response queue but don't wait forever if the port is stopped ProcessingResult result = null; - + // wait for the request to start getting serviced... and time out if it doesn't happen // before the request expires - while ( !request.isBeingServiced() ) { - if ( request.isExpired() ) { + while (!request.isBeingServiced()) { + if (request.isExpired()) { throw new SocketTimeoutException("Read timed out"); - } else { + } else { try { Thread.sleep(100L); - } catch (final InterruptedException e) {} + } catch (final InterruptedException e) { + } } } // we've started to service the request. Now just wait until it's finished result = request.getResponseQueue().take(); - + final Exception problem = result.getProblem(); - if ( problem == null ) { + if (problem == null) { return result.getFileCount(); } else { throw problem; @@ -621,12 +622,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort throw new ProcessException(e); } } - + @Override public SchedulingStrategy getSchedulingStrategy() { return SchedulingStrategy.TIMER_DRIVEN; } - + @Override public boolean isSideEffectFree() { return false;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java index 926809c..4a4f96b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java @@ -19,9 +19,10 @@ package org.apache.nifi.remote.exception; import org.apache.nifi.remote.codec.FlowFileCodec; public class UnsupportedCodecException extends RuntimeException { - private static final long serialVersionUID = 198234789237L; - public UnsupportedCodecException(final String codecName) { + private static final long serialVersionUID = 198234789237L; + + public UnsupportedCodecException(final String codecName) { super("Codec " + codecName + " is not supported"); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java index 391d52b..7d0ffab 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java @@ -40,12 +40,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClusterManagerServerProtocol implements ServerProtocol { + public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class); private NodeInformant nodeInformant; - + private String commsIdentifier; private boolean shutdown = false; private boolean handshakeCompleted = false; @@ -53,52 +54,51 @@ public class ClusterManagerServerProtocol implements ServerProtocol { public ClusterManagerServerProtocol() { } - - + @Override public void setNodeInformant(final NodeInformant nodeInformant) { this.nodeInformant = nodeInformant; } - + @Override public void handshake(final Peer peer) throws IOException, HandshakeException { - if ( handshakeCompleted ) { + if (handshakeCompleted) { throw new IllegalStateException("Handshake has already been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - + // read communications identifier commsIdentifier = dis.readUTF(); - + // read all of the properties. we don't really care what the properties are. final int numProperties = dis.readInt(); - for (int i=0; i < numProperties; i++) { + for (int i = 0; i < numProperties; i++) { final String propertyName = dis.readUTF(); final String propertyValue = dis.readUTF(); - + final HandshakeProperty property; try { property = HandshakeProperty.valueOf(propertyName); - if ( HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property) ) { + if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) { requestExpirationMillis = Long.parseLong(propertyValue); } } catch (final Exception e) { } } - + // send "OK" response ResponseCode.PROPERTIES_OK.writeResponse(dos); - + logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier); handshakeCompleted = true; } - + @Override public boolean isHandshakeSuccessful() { return handshakeCompleted; @@ -106,10 +106,10 @@ public class ClusterManagerServerProtocol implements ServerProtocol { @Override public void sendPeerList(final Peer peer) throws IOException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } @@ -118,29 +118,29 @@ public class ClusterManagerServerProtocol implements ServerProtocol { final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation(); final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation(); - + // determine how many nodes have Site-to-site enabled int numPeers = 0; - for ( final NodeInformation nodeInfo : nodeInfos ) { + for (final NodeInformation nodeInfo : nodeInfos) { if (nodeInfo.getSiteToSitePort() != null) { numPeers++; } } - + dos.writeInt(numPeers); - for ( final NodeInformation nodeInfo : nodeInfos ) { - if ( nodeInfo.getSiteToSitePort() == null ) { + for (final NodeInformation nodeInfo : nodeInfos) { + if (nodeInfo.getSiteToSitePort() == null) { continue; } - + dos.writeUTF(nodeInfo.getHostname()); dos.writeInt(nodeInfo.getSiteToSitePort()); dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); dos.writeInt(nodeInfo.getTotalFlowFiles()); } - + logger.info("Redirected {} to {} nodes", peer, numPeers); - + dos.flush(); } @@ -153,7 +153,7 @@ public class ClusterManagerServerProtocol implements ServerProtocol { public boolean isShutdown() { return shutdown; } - + @Override public FlowFileCodec negotiateCodec(Peer peer) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 21de646..b931e26 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -65,43 +65,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SocketFlowFileServerProtocol implements ServerProtocol { + public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; - + private ProcessGroup rootGroup; private String commsIdentifier; private boolean handshakeCompleted; - + private Boolean useGzip; private long requestExpirationMillis; private RootGroupPort port; private boolean shutdown = false; private FlowFileCodec negotiatedFlowFileCodec = null; private String transitUriPrefix = null; - + private int requestedBatchCount = 0; private long requestedBatchBytes = 0L; private long requestedBatchNanos = 0L; private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); - + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class); - - @Override public void setRootProcessGroup(final ProcessGroup group) { - if ( !group.isRootGroup() ) { + if (!group.isRootGroup()) { throw new IllegalArgumentException(); } this.rootGroup = group; } - + @Override public void handshake(final Peer peer) throws IOException, HandshakeException { - if ( handshakeCompleted ) { + if (handshakeCompleted) { throw new IllegalStateException("Handshake has already been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } @@ -109,30 +108,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - + commsIdentifier = dis.readUTF(); - - if ( versionNegotiator.getVersion() >= 3 ) { + + if (versionNegotiator.getVersion() >= 3) { transitUriPrefix = dis.readUTF(); - if ( !transitUriPrefix.endsWith("/") ) { + if (!transitUriPrefix.endsWith("/")) { transitUriPrefix = transitUriPrefix + "/"; } } - + final Map<String, String> properties = new HashMap<>(); final int numProperties = dis.readInt(); - for (int i=0; i < numProperties; i++) { + for (int i = 0; i < numProperties; i++) { final String propertyName = dis.readUTF(); final String propertyValue = dis.readUTF(); properties.put(propertyName, propertyValue); } - + // evaluate the properties received boolean responseWritten = false; - for ( final Map.Entry<String, String> entry : properties.entrySet() ) { + for (final Map.Entry<String, String> entry : properties.entrySet()) { final String propertyName = entry.getKey(); final String value = entry.getValue(); - + final HandshakeProperty property; try { property = HandshakeProperty.valueOf(propertyName); @@ -140,7 +139,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName); throw new HandshakeException("Received unknown property: " + propertyName); } - + try { switch (property) { case GZIP: { @@ -152,66 +151,66 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { break; case BATCH_COUNT: requestedBatchCount = Integer.parseInt(value); - if ( requestedBatchCount < 0 ) { + if (requestedBatchCount < 0) { throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value); } break; case BATCH_SIZE: requestedBatchBytes = Long.parseLong(value); - if ( requestedBatchBytes < 0 ) { + if (requestedBatchBytes < 0) { throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value); } break; case BATCH_DURATION: requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)); - if ( requestedBatchNanos < 0 ) { + if (requestedBatchNanos < 0) { throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value); } break; case PORT_IDENTIFIER: { Port receivedPort = rootGroup.getInputPort(value); - if ( receivedPort == null ) { + if (receivedPort == null) { receivedPort = rootGroup.getOutputPort(value); } - if ( receivedPort == null ) { + if (receivedPort == null) { logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); ResponseCode.UNKNOWN_PORT.writeResponse(dos); throw new HandshakeException("Received unknown port identifier: " + value); } - if ( !(receivedPort instanceof RootGroupPort) ) { + if (!(receivedPort instanceof RootGroupPort)) { logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); ResponseCode.UNKNOWN_PORT.writeResponse(dos); throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); } - + this.port = (RootGroupPort) receivedPort; final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); - if ( !portAuthResult.isAuthorized() ) { + if (!portAuthResult.isAuthorized()) { logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); responseWritten = true; break; } - - if ( !receivedPort.isValid() ) { + + if (!receivedPort.isValid()) { logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); responseWritten = true; break; } - - if ( !receivedPort.isRunning() ) { + + if (!receivedPort.isRunning()) { logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); responseWritten = true; break; } - + // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this // we we will simply not service the request but the sender will timeout - if ( getVersionNegotiator().getVersion() > 1 ) { - for ( final Connection connection : port.getConnections() ) { - if ( connection.getFlowFileQueue().isFull() ) { + if (getVersionNegotiator().getVersion() > 1) { + for (final Connection connection : port.getConnections()) { + if (connection.getFlowFileQueue().isFull()) { logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); responseWritten = true; @@ -219,7 +218,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { } } } - + break; } } @@ -227,54 +226,54 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value); } } - - if ( useGzip == null ) { + + if (useGzip == null) { logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing"); ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name()); throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name()); } - + // send "OK" response - if ( !responseWritten ) { + if (!responseWritten) { ResponseCode.PROPERTIES_OK.writeResponse(dos); } - + logger.debug("{} Finished handshake with {}", this, peer); handshakeCompleted = true; } - + @Override public boolean isHandshakeSuccessful() { return handshakeCompleted; } - + @Override public RootGroupPort getPort() { return port; } - + @Override public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } - logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()}); + logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()}); final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - if ( port == null ) { - RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified"); + + if (port == null) { + RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified"); } - + // Negotiate the FlowFileCodec to use. try { negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos); - logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer}); + logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, negotiatedFlowFileCodec, peer}); return negotiatedFlowFileCodec; } catch (final HandshakeException e) { throw new ProtocolException(e.toString()); @@ -286,13 +285,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { return negotiatedFlowFileCodec; } - @Override public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } @@ -301,22 +299,22 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); String remoteDn = commsSession.getUserDn(); - if ( remoteDn == null ) { + if (remoteDn == null) { remoteDn = "none"; } FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { // we have no data to send. Notify the peer. logger.debug("{} No data to send to {}", this, peer); ResponseCode.NO_MORE_DATA.writeResponse(dos); return 0; } - + // we have data to send. logger.debug("{} Data is available to send to {}", this, peer); ResponseCode.MORE_DATA.writeResponse(dos); - + final StopWatch stopWatch = new StopWatch(true); long bytesSent = 0L; final Set<FlowFile> flowFilesSent = new HashSet<>(); @@ -328,27 +326,27 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { String calculatedCRC = ""; while (continueTransaction) { final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos; - logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer}); - + logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer}); + final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); final StopWatch transferWatch = new StopWatch(true); - + final FlowFile toSend = flowFile; session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); - codec.encode(dataPacket, checkedOutputStream); - } + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); + codec.encode(dataPacket, checkedOutputStream); + } }); - + final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); - + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. // Otherwise, do NOT close it because we don't want to close the underlying stream // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( useGzip ) { + if (useGzip) { checkedOutputStream.close(); } @@ -358,33 +356,33 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); session.remove(flowFile); - + // determine if we should check for more data on queue. final long sendingNanos = System.nanoTime() - startNanos; boolean poll = true; - if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) { + if (sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L) { poll = false; } - if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) { + if (bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L) { poll = false; } - if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) { + if (flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0) { poll = false; } - - if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) { + + if (requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0) { poll = (sendingNanos < DEFAULT_BATCH_NANOS); } - - if ( poll ) { + + if (poll) { // we've not elapsed the requested sending duration, so get more data. flowFile = session.get(); } else { flowFile = null; } - + continueTransaction = (flowFile != null); - if ( continueTransaction ) { + if (continueTransaction) { logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer); ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); } else { @@ -393,19 +391,21 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue()); } } - + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response final Response transactionConfirmationResponse = Response.read(dis); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { // Confirm Checksum and echo back the confirmation. logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer); final String receivedCRC = transactionConfirmationResponse.getMessage(); - if ( versionNegotiator.getVersion() > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { + if (versionNegotiator.getVersion() > 3) { + if (!receivedCRC.equals(calculatedCRC)) { ResponseCode.BAD_CHECKSUM.writeResponse(dos); session.rollback(); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + + "; canceling transaction and rolling back session"); } } @@ -415,61 +415,60 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { } final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - + final Response transactionResponse; try { transactionResponse = Response.read(dis); } catch (final IOException e) { - logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + - " It is unknown whether or not the peer successfully received/processed the data." + - " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", - this, peer, session, flowFileDescription); + logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + + " It is unknown whether or not the peer successfully received/processed the data." + + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", + this, peer, session, flowFileDescription); session.rollback(); throw e; } - - logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer}); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + + logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer}); + if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); } - + session.commit(); - + stopWatch.stop(); final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesSent); - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); return flowFilesSent.size(); } - - + @Override public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } logger.debug("{} receiving FlowFiles from {}", this, peer); - + final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); String remoteDn = commsSession.getUserDn(); - if ( remoteDn == null ) { + if (remoteDn == null) { remoteDn = "none"; } final StopWatch stopWatch = new StopWatch(true); final CRC32 crc = new CRC32(); - + // Peer has data. Otherwise, we would not have been called, because they would not have sent // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's // finished sending data. @@ -486,18 +485,19 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { FlowFile flowFile = session.create(); flowFile = session.importFrom(dataPacket.getData(), flowFile); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); - + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid; - session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); + session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null + ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); session.transfer(flowFile, Relationship.ANONYMOUS); flowFilesReceived.add(flowFile); bytesReceived += flowFile.getSize(); - + final Response transactionResponse = Response.read(dis); switch (transactionResponse.getCode()) { case CONTINUE_TRANSACTION: @@ -516,7 +516,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse); } } - + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message // to peer so that we can verify that the connection is still open. This is a two-phase commit, // which helps to prevent the chances of data duplication. Without doing this, we may commit the @@ -526,7 +526,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - + final Response confirmTransactionResponse = Response.read(dis); logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer); @@ -539,11 +539,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { default: throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); } - + // Commit the session so that we have persisted the data session.commit(); - - if ( context.getAvailableRelationships().isEmpty() ) { + + if (context.getAvailableRelationships().isEmpty()) { // Confirm that we received the data and the peer can now discard it but that the peer should not // send any more data for a bit logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); @@ -553,30 +553,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); } - + stopWatch.stop(); final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); return flowFilesReceived.size(); } - + @Override public RequestType getRequestType(final Peer peer) throws IOException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } - logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()}); + logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()}); final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream())); - logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer}); + logger.debug("{} Got Request Type {} from {}", new Object[]{this, requestType, peer}); return requestType; } @@ -599,10 +599,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { @Override public void sendPeerList(final Peer peer) throws IOException { - if ( !handshakeCompleted ) { + if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } - if ( shutdown ) { + if (shutdown) { throw new IllegalStateException("Protocol is shutdown"); } @@ -611,7 +611,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); final NiFiProperties properties = NiFiProperties.getInstance(); - + // we have only 1 peer: ourselves. dos.writeInt(1); dos.writeUTF(InetAddress.getLocalHost().getHostName()); @@ -620,12 +620,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host. dos.flush(); } - + @Override public String getResourceName() { return RESOURCE_NAME; } - + @Override public void setNodeInformant(final NodeInformant nodeInformant) { } @@ -634,7 +634,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { public long getRequestExpiration() { return requestExpirationMillis; } - + @Override public String toString() { return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]"; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java index b9a567b..8380f8b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java @@ -39,13 +39,13 @@ package org.apache.nifi.remote; // final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>(); // final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3); // final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500); -// +// // final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null); // final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null); -// +// // destinationMap.put(node1, node1Destination); // destinationMap.put(node2, node2Destination); -// +// // final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND); // int node1Count = 0, node2Count = 0; // for ( final Destination destination : destinations ) { @@ -57,30 +57,30 @@ package org.apache.nifi.remote; // Assert.fail("Got Destination for unknkown NodeInformation"); // } // } -// +// // System.out.println(node1Count); // System.out.println(node2Count); -// +// // final double node1Pct = (double) node1Count / (double) (node1Count + node2Count); // assertEquals(0.80, node1Pct, 0.01); -// // node1 should get the most but is not allowed to have more than approximately 80% of the data. +// // node1 should get the most but is not allowed to have more than approximately 80% of the data. // } -// +// // @Test // public void testWeightedDistributionWithThreeNodes() throws IOException { // final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>(); // final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3); // final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500); // final NodeInformation node3 = new NodeInformation("hostC", 80, 90, true, 500); -// +// // final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null); // final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null); // final Destination node3Destination = new Destination(createRemoteGroupPort("PortC"), null, node3, TransferDirection.SEND, true, null); -// +// // destinationMap.put(node1, node1Destination); // destinationMap.put(node2, node2Destination); // destinationMap.put(node3, node3Destination); -// +// // final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND); // int node1Count = 0, node2Count = 0, node3Count = 0; // for ( final Destination destination : destinations ) { @@ -94,20 +94,20 @@ package org.apache.nifi.remote; // Assert.fail("Got Destination for unknkown NodeInformation"); // } // } -// +// // System.out.println(node1Count); // System.out.println(node2Count); // System.out.println(node3Count); -// +// // final double node1Pct = (double) node1Count / (double) (node1Count + node2Count + node3Count); // final double node2Pct = (double) node2Count / (double) (node1Count + node2Count + node3Count); // final double node3Pct = (double) node3Count / (double) (node1Count + node2Count + node3Count); -// +// // assertEquals(0.5, node1Pct, 0.02); // assertEquals(0.25, node2Pct, 0.02); // assertEquals(node2Pct, node3Pct, 0.02); // } -// +// // private RemoteGroupPort createRemoteGroupPort(final String portName) { // RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class); // Mockito.when(port.getName()).thenReturn(portName); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java index 4e55f5f..03f8190 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java @@ -65,7 +65,7 @@ package org.apache.nifi.remote.io.socket; // System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); // final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000)); // channel.configureBlocking(false); -// +// // final CommunicationsSession commsSession; // commsSession = new SocketChannelCommunicationsSession(channel, "", null); // commsSession.setUri("nifi://localhost:5000"); @@ -74,7 +74,7 @@ package org.apache.nifi.remote.io.socket; // // dos.write(CommunicationsProtocol.MAGIC_BYTES); // dos.flush(); -// +// // final EventReporter eventReporter = Mockito.mock(EventReporter.class); // final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, NiFiProperties.getInstance()); // @@ -84,20 +84,20 @@ package org.apache.nifi.remote.io.socket; // final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class); // Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000); // Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") ); -// +// // final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class); // Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a"); // Mockito.when(port.getName()).thenReturn("Data In"); // Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg); -// +// // negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND); // } -// +// // @Test // public void testInputOutputStreams() throws IOException, InterruptedException { // final ServerThread server = new ServerThread(); // server.start(); -// +// // int port = server.getPort(); // while ( port <= 0 ) { // Thread.sleep(10L); @@ -106,11 +106,11 @@ package org.apache.nifi.remote.io.socket; // // final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port)); // channel.configureBlocking(false); -// +// // final OutputStream out = new SocketChannelOutputStream(channel); // final InputStream in = new SocketChannelInputStream(channel); // final DataInputStream dataIn = new DataInputStream(in); -// +// // final byte[] sent = new byte[DATA_SIZE]; // for (int i=0; i < sent.length; i++) { // sent[i] = (byte) (i % 255); @@ -125,21 +125,21 @@ package org.apache.nifi.remote.io.socket; // final float megabytes = (float) DATA_SIZE / (1024F * 1024F); // final float MBperS = megabytes / seconds; // System.out.println("Millis: " + millis + "; MB/s: " + MBperS); -// +// // Thread.sleep(2500L); // final byte[] received = server.getReceivedData(); // System.out.println("Server received " + received.length + " bytes"); // server.clearReceivedData(); // assertTrue(Arrays.equals(sent, received)); -// +// // final long val = dataIn.readLong(); // assertEquals(DATA_SIZE, val); // System.out.println(val); // } -// +// // server.shutdown(); // } -// +// // public final long toLong(final byte[] buffer) throws IOException { // return (((long)buffer[0] << 56) + // ((long)(buffer[1] & 255) << 48) + @@ -150,82 +150,82 @@ package org.apache.nifi.remote.io.socket; // ((buffer[6] & 255) << 8) + // ((buffer[7] & 255) << 0)); // } -// +// // private static class ServerThread extends Thread { // private int listeningPort; // private final ByteArrayOutputStream received = new ByteArrayOutputStream(); -// +// // private volatile int readingDelay = 0; // private volatile boolean shutdown = false; -// +// // public ServerThread() { // } -// +// // public int getPort() { // return listeningPort; // } -// +// // public byte[] getReceivedData() { // return received.toByteArray(); // } -// +// // @Override // public void run() { // try { // final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault(); // final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0); // this.listeningPort = serverSocket.getLocalPort(); -// +// // final Socket socket = serverSocket.accept(); // final InputStream stream = socket.getInputStream(); // final DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); -// +// // final byte[] buffer = new byte[4096]; // int len; -// +// // while (!shutdown) { // try { // len = stream.read(buffer); -// +// // System.out.println("Received " + len + " bytes"); -// +// // if ( readingDelay > 0 ) { // try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {} // } // } catch (final SocketTimeoutException e) { // continue; // } -// +// // if ( len < 0 ) { // return; // } -// +// // received.write(buffer, 0, len); -// +// // final long length = received.size(); // if ( length % (DATA_SIZE) == 0 ) { // dos.writeLong(length); // dos.flush(); // } // } -// +// // System.out.println("Server successfully shutdown"); // } catch (final Exception e) { // e.printStackTrace(); // } // } -// +// // public void clearReceivedData() { // this.received.reset(); // } -// +// // public void shutdown() { // this.shutdown = true; // } -// +// // public void delayReading(final int millis) { // this.readingDelay = millis; // } // } -// +// //}
