Repository: incubator-nifi
Updated Branches:
  refs/heads/develop db2360998 -> 73384b23d


NIFI-273: Moved getAvailableRelationships from ProcessSession to ProcessContext


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3a7b8de0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3a7b8de0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3a7b8de0

Branch: refs/heads/develop
Commit: 3a7b8de0e30e7d4dac877611d0aa9bf67bfadffd
Parents: 94a06fc
Author: Mark Payne <[email protected]>
Authored: Fri Jan 16 11:27:22 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Jan 16 11:27:22 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/StandardFunnel.java  |  4 +--
 .../repository/BatchingSessionFactory.java      |  5 ----
 .../repository/StandardProcessSession.java      |  5 ----
 .../scheduling/ConnectableProcessContext.java   | 21 +++++++++++++++
 .../nifi/processor/StandardProcessContext.java  | 28 ++++++++++++++++++++
 .../processor/StandardSchedulingContext.java    |  6 +++++
 .../protocol/socket/SocketClientProtocol.java   |  2 +-
 .../socket/SocketFlowFileServerProtocol.java    |  2 +-
 .../processors/standard/DistributeLoad.java     | 16 +++++------
 .../nifi/processors/standard/ListenHTTP.java    |  2 ++
 .../nifi/processors/standard/ListenUDP.java     |  2 +-
 .../processors/standard/PutFileTransfer.java    |  2 +-
 .../standard/servlets/ListenHTTPServlet.java    |  7 ++++-
 .../processors/standard/TestDistributeLoad.java |  1 +
 .../apache/nifi/processor/ProcessContext.java   |  8 ++++++
 .../apache/nifi/processor/ProcessSession.java   |  6 -----
 .../apache/nifi/util/MockProcessContext.java    | 23 ++++++++++++++++
 .../apache/nifi/util/MockProcessSession.java    |  4 ---
 .../apache/nifi/util/SharedSessionState.java    | 21 +--------------
 .../nifi/util/StandardProcessorTestRunner.java  |  8 +++---
 20 files changed, 114 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index e516f20..e34e043 100644
--- 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -348,7 +348,7 @@ public class StandardFunnel implements Funnel {
     private void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         readLock.lock();
         try {
-            Set<Relationship> available = session.getAvailableRelationships();
+            Set<Relationship> available = context.getAvailableRelationships();
             int transferred = 0;
             while (!available.isEmpty()) {
                 final List<FlowFile> flowFiles = session.get(10);
@@ -359,7 +359,7 @@ public class StandardFunnel implements Funnel {
                 transferred += flowFiles.size();
                 session.transfer(flowFiles, Relationship.ANONYMOUS);
                 session.commit();
-                available = session.getAvailableRelationships();
+                available = context.getAvailableRelationships();
             }
 
             if (transferred == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index eae2550..d5dba82 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -98,11 +98,6 @@ public class BatchingSessionFactory implements 
ProcessSessionFactory {
         }
 
         @Override
-        public Set<Relationship> getAvailableRelationships() {
-            return session.getAvailableRelationships();
-        }
-
-        @Override
         public FlowFile create() {
             return session.create();
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3d3e854..dcb461c 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1237,11 +1237,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     @Override
-    public Set<Relationship> getAvailableRelationships() {
-        return context.getAvailableRelationships();
-    }
-
-    @Override
     public FlowFile create() {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 8c60e4b..acb3a01 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -16,13 +16,18 @@
  */
 package org.apache.nifi.controller.scheduling;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.encrypt.StringEncryptor;
@@ -30,6 +35,7 @@ import org.apache.nifi.expression.AttributeValueDecorator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 
 /**
@@ -170,4 +176,19 @@ public class ConnectableProcessContext implements 
ProcessContext {
     public ControllerServiceLookup getControllerServiceLookup() {
         return null;
     }
+
+    @Override
+    public Set<Relationship> getAvailableRelationships() {
+        for ( final Connection connection : connectable.getConnections() ) {
+            if ( connection.getFlowFileQueue().isFull() ) {
+                return Collections.emptySet();
+            }
+        }
+        
+        final Collection<Relationship> relationships = 
connectable.getRelationships();
+        if ( relationships instanceof Set ) {
+            return (Set<Relationship>) relationships;
+        }
+        return new HashSet<>(connectable.getRelationships());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 93a8c6b..cd0d31c 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.processor;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -24,6 +26,7 @@ import 
org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessorNode;
@@ -142,4 +145,29 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     public ControllerServiceLookup getControllerServiceLookup() {
         return this;
     }
+
+    @Override
+    public Set<Relationship> getAvailableRelationships() {
+        final Set<Relationship> set = new HashSet<>();
+        for (final Relationship relationship : procNode.getRelationships()) {
+            final Collection<Connection> connections = 
procNode.getConnections(relationship);
+            if (connections.isEmpty()) {
+                set.add(relationship);
+            } else {
+                boolean available = true;
+                for (final Connection connection : connections) {
+                    if (connection.getFlowFileQueue().isFull()) {
+                        available = false;
+                    }
+                }
+
+                if (available) {
+                    set.add(relationship);
+                }
+            }
+        }
+
+        return set;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 0fe08c9..318901f 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processor;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -104,4 +105,9 @@ public class StandardSchedulingContext implements 
SchedulingContext {
     public ControllerServiceLookup getControllerServiceLookup() {
         return processContext.getControllerServiceLookup();
     }
+
+    @Override
+    public Set<Relationship> getAvailableRelationships() {
+        return processContext.getAvailableRelationships();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 22ec983..d4b4f61 100644
--- 
a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -327,7 +327,7 @@ public class SocketClientProtocol implements ClientProtocol 
{
         // Commit the session so that we have persisted the data
         session.commit();
         
-        if ( session.getAvailableRelationships().isEmpty() ) {
+        if ( context.getAvailableRelationships().isEmpty() ) {
             // Confirm that we received the data and the peer can now discard 
it but that the peer should not
             // send any more data for a bit
             logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 88b6a41..5edd4f9 100644
--- 
a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -482,7 +482,7 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
         // Commit the session so that we have persisted the data
         session.commit();
         
-        if ( session.getAvailableRelationships().isEmpty() ) {
+        if ( context.getAvailableRelationships().isEmpty() ) {
             // Confirm that we received the data and the peer can now discard 
it but that the peer should not
             // send any more data for a bit
             logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index a755b1a..3ac55d2 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -354,14 +354,14 @@ public class DistributeLoad extends AbstractProcessor {
         }
 
         final DistributionStrategy strategy = strategyRef.get();
-        final Set<Relationship> available = 
session.getAvailableRelationships();
+        final Set<Relationship> available = 
context.getAvailableRelationships();
         final int numRelationships = 
context.getProperty(NUM_RELATIONSHIPS).asInteger();
         final boolean allDestinationsAvailable = (available.size() == 
numRelationships);
         if (!allDestinationsAvailable && 
strategy.requiresAllDestinationsAvailable()) {
             return;
         }
 
-        final Relationship relationship = strategy.mapToRelationship(session, 
flowFile);
+        final Relationship relationship = strategy.mapToRelationship(context, 
flowFile);
         if (relationship == null) {
             // can't transfer the FlowFiles. Roll back and yield
             session.rollback();
@@ -403,7 +403,7 @@ public class DistributeLoad extends AbstractProcessor {
          * @param flowFiles
          * @return
          */
-        Relationship mapToRelationship(ProcessSession session, FlowFile 
flowFile);
+        Relationship mapToRelationship(ProcessContext context, FlowFile 
flowFile);
 
         boolean requiresAllDestinationsAvailable();
     }
@@ -413,7 +413,7 @@ public class DistributeLoad extends AbstractProcessor {
         private final AtomicLong counter = new AtomicLong(0L);
 
         @Override
-        public Relationship mapToRelationship(final ProcessSession session, 
final FlowFile flowFile) {
+        public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
             final int numRelationships = relationshipList.size();
 
@@ -427,7 +427,7 @@ public class DistributeLoad extends AbstractProcessor {
                 final long counterValue = counter.getAndIncrement();
                 final int idx = (int) (counterValue % numRelationships);
                 relationship = relationshipList.get(idx);
-                foundFreeRelationship = 
session.getAvailableRelationships().contains(relationship);
+                foundFreeRelationship = 
context.getAvailableRelationships().contains(relationship);
                 if (++attempts % numRelationships == 0 && 
!foundFreeRelationship) {
                     return null;
                 }
@@ -448,7 +448,7 @@ public class DistributeLoad extends AbstractProcessor {
         private final AtomicLong counter = new AtomicLong(0L);
 
         @Override
-        public Relationship mapToRelationship(final ProcessSession session, 
final FlowFile flowFile) {
+        public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
             final long counterValue = counter.getAndIncrement();
             final int idx = (int) (counterValue % relationshipList.size());
@@ -467,7 +467,7 @@ public class DistributeLoad extends AbstractProcessor {
         private final AtomicLong counter = new AtomicLong(0L);
 
         @Override
-        public Relationship mapToRelationship(final ProcessSession session, 
final FlowFile flowFile) {
+        public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
             final int numRelationships = relationshipList.size();
 
@@ -481,7 +481,7 @@ public class DistributeLoad extends AbstractProcessor {
                 final long counterValue = counter.getAndIncrement();
                 final int idx = (int) (counterValue % numRelationships);
                 relationship = relationshipList.get(idx);
-                foundFreeRelationship = 
session.getAvailableRelationships().contains(relationship);
+                foundFreeRelationship = 
context.getAvailableRelationships().contains(relationship);
                 if (++attempts % numRelationships == 0 && 
!foundFreeRelationship) {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 2b0b437..b7fe97a 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -117,6 +117,7 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = 
"sessionFactoryHolder";
+    public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = 
"processContextHolder";
     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = 
"authorityPattern";
     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = 
"headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
@@ -240,6 +241,7 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, 
sessionFactoryReference);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, 
context);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, 
flowFileMap);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, 
Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, 
streamThrottler);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 65b3c66..43d8395 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -377,7 +377,7 @@ public class ListenUDP extends 
AbstractSessionFactoryProcessor {
                                     // this will throttle the processing of 
the received datagrams. If there are no more
                                     // buffers to read into because none have 
been returned to the pool via consumer.process(),
                                     // then the desired back pressure on the 
channel is created.
-                                    if 
(session.getAvailableRelationships().size() > 0) {
+                                    if 
(context.getAvailableRelationships().size() > 0) {
                                         consumer.process();
                                         if (flowFileCount == 
newFlowFiles.size()) {
                                             // no new datagrams received, need 
to throttle this thread back so it does

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index 31e5105..da80546 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -148,7 +148,7 @@ public abstract class PutFileTransfer<T extends 
FileTransfer> extends AbstractPr
 
                 session.transfer(flowFile, conflictResult.getRelationship());
                 session.commit();
-            } while (isScheduled() && (getRelationships().size() == 
session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) 
&& ((flowFile = session.get()) != null));
+            } while (isScheduled() && (getRelationships().size() == 
context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) 
&& ((flowFile = session.get()) != null));
         } catch (final IOException e) {
             context.yield();
             logger.error("Unable to transfer {} to remote host {} due to {}", 
new Object[]{flowFile, hostname, e});

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index cae61f0..1cf5f1f 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -48,6 +48,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.StreamThrottler;
 import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.io.OutputStreamCallback;
@@ -87,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet {
 
     private ProcessorLog logger;
     private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
+    private volatile ProcessContext processContext;
     private Pattern authorizedPattern;
     private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
@@ -103,6 +105,7 @@ public class ListenHTTPServlet extends HttpServlet {
         final ServletContext context = config.getServletContext();
         this.logger = (ProcessorLog) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
         this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
+        this.processContext = (ProcessContext) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
         this.authorizedPattern = (Pattern) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
         this.headerPattern = (Pattern) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
@@ -118,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet {
 
     @Override
     protected void doPost(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
+        final ProcessContext context = processContext;
+        
         ProcessSessionFactory sessionFactory;
         do {
             sessionFactory = sessionFactoryHolder.get();
@@ -136,7 +141,7 @@ public class ListenHTTPServlet extends HttpServlet {
         try {
             final long n = filesReceived.getAndIncrement() % 
FILES_BEFORE_CHECKING_DESTINATION_SPACE;
             if (n == 0 || !spaceAvailable.get()) {
-                if (session.getAvailableRelationships().isEmpty()) {
+                if (context.getAvailableRelationships().isEmpty()) {
                     spaceAvailable.set(false);
                     if (logger.isDebugEnabled()) {
                         logger.debug("Received request from " + 
request.getRemoteHost() + " but no space available; Indicating Service 
Unavailable");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
 
b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index a6402e4..ab4c978 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@ -132,6 +132,7 @@ public class TestDistributeLoad {
         testRunner.assertQueueEmpty();
 
         for (int i = 1; i <= 100; i++) {
+            System.out.println(i);
             testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 
1);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index 9e04439..7fa183f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processor;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -121,4 +122,11 @@ public interface ProcessContext {
      * @return
      */
     ControllerServiceLookup getControllerServiceLookup();
+    
+    /**
+     * @return the set of all relationships for which space is available to
+     * receive new objects
+     */
+    Set<Relationship> getAvailableRelationships();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index 09d1bd2..d3de916 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -166,12 +166,6 @@ public interface ProcessSession {
     QueueSize getQueueSize();
 
     /**
-     * @return the set of all relationships for which space is available to
-     * receive new objects
-     */
-    Set<Relationship> getAvailableRelationships();
-
-    /**
      * Creates a new FlowFile in the repository with no content and without any
      * linkage to a parent FlowFile. This method is appropriate only when data
      * is received or created from an external system. Otherwise, this method

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 6e5f65d..15591d7 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -21,10 +21,12 @@ import static java.util.Objects.requireNonNull;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -32,6 +34,8 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SchedulingContext;
 import org.junit.Assert;
 
@@ -45,6 +49,8 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
     private boolean enableExpressionValidation = false;
     private boolean allowExpressionValidation = true;
 
+    private volatile Set<Relationship> unavailableRelationships = new 
HashSet<>();
+
     /**
      * Creates a new MockProcessContext for the given Processor
      *
@@ -258,4 +264,21 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
     public void leaseControllerService(final String identifier) {
     }
 
+    public Set<Relationship> getAvailableRelationships() {
+        if ( !(component instanceof Processor) ) {
+            return Collections.emptySet();
+        }
+        
+        final Set<Relationship> relationships = new HashSet<>(((Processor) 
component).getRelationships());
+        relationships.removeAll(unavailableRelationships);
+        return relationships;
+    }
+
+    public void setUnavailableRelationships(final Set<Relationship> 
relationships) {
+        this.unavailableRelationships = Collections.unmodifiableSet(new 
HashSet<>(relationships));
+    }
+
+    public Set<Relationship> getUnavailableRelationships() {
+        return unavailableRelationships;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 552780c..ea55b34 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -329,10 +329,6 @@ public class MockProcessSession implements ProcessSession {
         return newFlowFile;
     }
 
-    @Override
-    public Set<Relationship> getAvailableRelationships() {
-        return sharedState.getAvailableRelationships();
-    }
 
     @Override
     public MockFlowFile merge(final Collection<FlowFile> sources, final 
FlowFile destination) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index 96bef71..13a87de 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@ -16,49 +16,30 @@
  */
 package org.apache.nifi.util;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceReporter;
 
 public class SharedSessionState {
 
     private final MockFlowFileQueue flowFileQueue;
     private final ProvenanceReporter provenanceReporter;
+    @SuppressWarnings("unused")
     private final Processor processor;
     private final AtomicLong flowFileIdGenerator;
     private final ConcurrentMap<String, AtomicLong> counterMap = new 
ConcurrentHashMap<>();
 
-    private volatile Set<Relationship> unavailableRelationships;
 
     public SharedSessionState(final Processor processor, final AtomicLong 
flowFileIdGenerator) {
         flowFileQueue = new MockFlowFileQueue();
         provenanceReporter = new MockProvenanceReporter();
-        unavailableRelationships = new HashSet<>();
         this.flowFileIdGenerator = flowFileIdGenerator;
         this.processor = processor;
     }
 
-    public Set<Relationship> getAvailableRelationships() {
-        final Set<Relationship> relationships = new 
HashSet<>(processor.getRelationships());
-        relationships.removeAll(unavailableRelationships);
-        return relationships;
-    }
-
-    public void setUnavailableRelationships(final Set<Relationship> 
relationships) {
-        this.unavailableRelationships = Collections.unmodifiableSet(new 
HashSet<>(relationships));
-    }
-
-    public Set<Relationship> getUnavailableRelationships() {
-        return unavailableRelationships;
-    }
-
     public MockFlowFileQueue getFlowFileQueue() {
         return flowFileQueue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 54b611d..40d5035 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -426,9 +426,9 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void setRelationshipAvailable(final Relationship relationship) {
-        final Set<Relationship> unavailable = new 
HashSet<>(sharedState.getUnavailableRelationships());
+        final Set<Relationship> unavailable = new 
HashSet<>(context.getUnavailableRelationships());
         unavailable.remove(relationship);
-        sharedState.setUnavailableRelationships(unavailable);
+        context.setUnavailableRelationships(unavailable);
     }
 
     @Override
@@ -438,9 +438,9 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void setRelationshipUnavailable(final Relationship relationship) {
-        final Set<Relationship> unavailable = new 
HashSet<>(sharedState.getUnavailableRelationships());
+        final Set<Relationship> unavailable = new 
HashSet<>(context.getUnavailableRelationships());
         unavailable.add(relationship);
-        sharedState.setUnavailableRelationships(unavailable);
+        context.setUnavailableRelationships(unavailable);
     }
 
     @Override

Reply via email to