Repository: incubator-nifi Updated Branches: refs/heads/develop db2360998 -> 73384b23d
NIFI-273: Moved getAvailableRelationships from ProcessSession to ProcessContext Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3a7b8de0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3a7b8de0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3a7b8de0 Branch: refs/heads/develop Commit: 3a7b8de0e30e7d4dac877611d0aa9bf67bfadffd Parents: 94a06fc Author: Mark Payne <[email protected]> Authored: Fri Jan 16 11:27:22 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Jan 16 11:27:22 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/StandardFunnel.java | 4 +-- .../repository/BatchingSessionFactory.java | 5 ---- .../repository/StandardProcessSession.java | 5 ---- .../scheduling/ConnectableProcessContext.java | 21 +++++++++++++++ .../nifi/processor/StandardProcessContext.java | 28 ++++++++++++++++++++ .../processor/StandardSchedulingContext.java | 6 +++++ .../protocol/socket/SocketClientProtocol.java | 2 +- .../socket/SocketFlowFileServerProtocol.java | 2 +- .../processors/standard/DistributeLoad.java | 16 +++++------ .../nifi/processors/standard/ListenHTTP.java | 2 ++ .../nifi/processors/standard/ListenUDP.java | 2 +- .../processors/standard/PutFileTransfer.java | 2 +- .../standard/servlets/ListenHTTPServlet.java | 7 ++++- .../processors/standard/TestDistributeLoad.java | 1 + .../apache/nifi/processor/ProcessContext.java | 8 ++++++ .../apache/nifi/processor/ProcessSession.java | 6 ----- .../apache/nifi/util/MockProcessContext.java | 23 ++++++++++++++++ .../apache/nifi/util/MockProcessSession.java | 4 --- .../apache/nifi/util/SharedSessionState.java | 21 +-------------- .../nifi/util/StandardProcessorTestRunner.java | 8 +++--- 20 files changed, 114 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index e516f20..e34e043 100644 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -348,7 +348,7 @@ public class StandardFunnel implements Funnel { private void onTrigger(final ProcessContext context, final ProcessSession session) { readLock.lock(); try { - Set<Relationship> available = session.getAvailableRelationships(); + Set<Relationship> available = context.getAvailableRelationships(); int transferred = 0; while (!available.isEmpty()) { final List<FlowFile> flowFiles = session.get(10); @@ -359,7 +359,7 @@ public class StandardFunnel implements Funnel { transferred += flowFiles.size(); session.transfer(flowFiles, Relationship.ANONYMOUS); session.commit(); - available = session.getAvailableRelationships(); + available = context.getAvailableRelationships(); } if (transferred == 0) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index eae2550..d5dba82 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -98,11 +98,6 @@ public class BatchingSessionFactory implements ProcessSessionFactory { } @Override - public Set<Relationship> getAvailableRelationships() { - return session.getAvailableRelationships(); - } - - @Override public FlowFile create() { return session.create(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3d3e854..dcb461c 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1237,11 +1237,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public Set<Relationship> getAvailableRelationships() { - return context.getAvailableRelationships(); - } - - @Override public FlowFile create() { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 8c60e4b..acb3a01 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -16,13 +16,18 @@ */ package org.apache.nifi.controller.scheduling; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.encrypt.StringEncryptor; @@ -30,6 +35,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; /** @@ -170,4 +176,19 @@ public class ConnectableProcessContext implements ProcessContext { public ControllerServiceLookup getControllerServiceLookup() { return null; } + + @Override + public Set<Relationship> getAvailableRelationships() { + for ( final Connection connection : connectable.getConnections() ) { + if ( connection.getFlowFileQueue().isFull() ) { + return Collections.emptySet(); + } + } + + final Collection<Relationship> relationships = connectable.getRelationships(); + if ( relationships instanceof Set ) { + return (Set<Relationship>) relationships; + } + return new HashSet<>(connectable.getRelationships()); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 93a8c6b..cd0d31c 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.processor; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -24,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; @@ -142,4 +145,29 @@ public class StandardProcessContext implements ProcessContext, ControllerService public ControllerServiceLookup getControllerServiceLookup() { return this; } + + @Override + public Set<Relationship> getAvailableRelationships() { + final Set<Relationship> set = new HashSet<>(); + for (final Relationship relationship : procNode.getRelationships()) { + final Collection<Connection> connections = procNode.getConnections(relationship); + if (connections.isEmpty()) { + set.add(relationship); + } else { + boolean available = true; + for (final Connection connection : connections) { + if (connection.getFlowFileQueue().isFull()) { + available = false; + } + } + + if (available) { + set.add(relationship); + } + } + } + + return set; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 0fe08c9..318901f 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -104,4 +105,9 @@ public class StandardSchedulingContext implements SchedulingContext { public ControllerServiceLookup getControllerServiceLookup() { return processContext.getControllerServiceLookup(); } + + @Override + public Set<Relationship> getAvailableRelationships() { + return processContext.getAvailableRelationships(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 22ec983..d4b4f61 100644 --- a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -327,7 +327,7 @@ public class SocketClientProtocol implements ClientProtocol { // Commit the session so that we have persisted the data session.commit(); - if ( session.getAvailableRelationships().isEmpty() ) { + if ( context.getAvailableRelationships().isEmpty() ) { // Confirm that we received the data and the peer can now discard it but that the peer should not // send any more data for a bit logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 88b6a41..5edd4f9 100644 --- a/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -482,7 +482,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { // Commit the session so that we have persisted the data session.commit(); - if ( session.getAvailableRelationships().isEmpty() ) { + if ( context.getAvailableRelationships().isEmpty() ) { // Confirm that we received the data and the peer can now discard it but that the peer should not // send any more data for a bit logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index a755b1a..3ac55d2 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@ -354,14 +354,14 @@ public class DistributeLoad extends AbstractProcessor { } final DistributionStrategy strategy = strategyRef.get(); - final Set<Relationship> available = session.getAvailableRelationships(); + final Set<Relationship> available = context.getAvailableRelationships(); final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); final boolean allDestinationsAvailable = (available.size() == numRelationships); if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) { return; } - final Relationship relationship = strategy.mapToRelationship(session, flowFile); + final Relationship relationship = strategy.mapToRelationship(context, flowFile); if (relationship == null) { // can't transfer the FlowFiles. Roll back and yield session.rollback(); @@ -403,7 +403,7 @@ public class DistributeLoad extends AbstractProcessor { * @param flowFiles * @return */ - Relationship mapToRelationship(ProcessSession session, FlowFile flowFile); + Relationship mapToRelationship(ProcessContext context, FlowFile flowFile); boolean requiresAllDestinationsAvailable(); } @@ -413,7 +413,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final int numRelationships = relationshipList.size(); @@ -427,7 +427,7 @@ public class DistributeLoad extends AbstractProcessor { final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % numRelationships); relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); if (++attempts % numRelationships == 0 && !foundFreeRelationship) { return null; } @@ -448,7 +448,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % relationshipList.size()); @@ -467,7 +467,7 @@ public class DistributeLoad extends AbstractProcessor { private final AtomicLong counter = new AtomicLong(0L); @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); final int numRelationships = relationshipList.size(); @@ -481,7 +481,7 @@ public class DistributeLoad extends AbstractProcessor { final long counterValue = counter.getAndIncrement(); final int idx = (int) (counterValue % numRelationships); relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); if (++attempts % numRelationships == 0 && !foundFreeRelationship) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 2b0b437..b7fe97a 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -117,6 +117,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; + public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder"; public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern"; public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; @@ -240,6 +241,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger()); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index 65b3c66..43d8395 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@ -377,7 +377,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor { // this will throttle the processing of the received datagrams. If there are no more // buffers to read into because none have been returned to the pool via consumer.process(), // then the desired back pressure on the channel is created. - if (session.getAvailableRelationships().size() > 0) { + if (context.getAvailableRelationships().size() > 0) { consumer.process(); if (flowFileCount == newFlowFiles.size()) { // no new datagrams received, need to throttle this thread back so it does http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 31e5105..da80546 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -148,7 +148,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr session.transfer(flowFile, conflictResult.getRelationship()); session.commit(); - } while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); + } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); } catch (final IOException e) { context.yield(); logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e}); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index cae61f0..1cf5f1f 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -48,6 +48,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamThrottler; import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.io.OutputStreamCallback; @@ -87,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet { private ProcessorLog logger; private AtomicReference<ProcessSessionFactory> sessionFactoryHolder; + private volatile ProcessContext processContext; private Pattern authorizedPattern; private Pattern headerPattern; private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; @@ -103,6 +105,7 @@ public class ListenHTTPServlet extends HttpServlet { final ServletContext context = config.getServletContext(); this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); + this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); @@ -118,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet { @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final ProcessContext context = processContext; + ProcessSessionFactory sessionFactory; do { sessionFactory = sessionFactoryHolder.get(); @@ -136,7 +141,7 @@ public class ListenHTTPServlet extends HttpServlet { try { final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; if (n == 0 || !spaceAvailable.get()) { - if (session.getAvailableRelationships().isEmpty()) { + if (context.getAvailableRelationships().isEmpty()) { spaceAvailable.set(false); if (logger.isDebugEnabled()) { logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java index a6402e4..ab4c978 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java +++ b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java @@ -132,6 +132,7 @@ public class TestDistributeLoad { testRunner.assertQueueEmpty(); for (int i = 1; i <= 100; i++) { + System.out.println(i); testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index 9e04439..7fa183f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -121,4 +122,11 @@ public interface ProcessContext { * @return */ ControllerServiceLookup getControllerServiceLookup(); + + /** + * @return the set of all relationships for which space is available to + * receive new objects + */ + Set<Relationship> getAvailableRelationships(); + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index 09d1bd2..d3de916 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -166,12 +166,6 @@ public interface ProcessSession { QueueSize getQueueSize(); /** - * @return the set of all relationships for which space is available to - * receive new objects - */ - Set<Relationship> getAvailableRelationships(); - - /** * Creates a new FlowFile in the repository with no content and without any * linkage to a parent FlowFile. This method is appropriate only when data * is received or created from an external system. Otherwise, this method http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 6e5f65d..15591d7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -21,10 +21,12 @@ import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; @@ -32,6 +34,8 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SchedulingContext; import org.junit.Assert; @@ -45,6 +49,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean enableExpressionValidation = false; private boolean allowExpressionValidation = true; + private volatile Set<Relationship> unavailableRelationships = new HashSet<>(); + /** * Creates a new MockProcessContext for the given Processor * @@ -258,4 +264,21 @@ public class MockProcessContext extends MockControllerServiceLookup implements S public void leaseControllerService(final String identifier) { } + public Set<Relationship> getAvailableRelationships() { + if ( !(component instanceof Processor) ) { + return Collections.emptySet(); + } + + final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships()); + relationships.removeAll(unavailableRelationships); + return relationships; + } + + public void setUnavailableRelationships(final Set<Relationship> relationships) { + this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); + } + + public Set<Relationship> getUnavailableRelationships() { + return unavailableRelationships; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 552780c..ea55b34 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -329,10 +329,6 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } - @Override - public Set<Relationship> getAvailableRelationships() { - return sharedState.getAvailableRelationships(); - } @Override public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java index 96bef71..13a87de 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@ -16,49 +16,30 @@ */ package org.apache.nifi.util; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceReporter; public class SharedSessionState { private final MockFlowFileQueue flowFileQueue; private final ProvenanceReporter provenanceReporter; + @SuppressWarnings("unused") private final Processor processor; private final AtomicLong flowFileIdGenerator; private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); - private volatile Set<Relationship> unavailableRelationships; public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { flowFileQueue = new MockFlowFileQueue(); provenanceReporter = new MockProvenanceReporter(); - unavailableRelationships = new HashSet<>(); this.flowFileIdGenerator = flowFileIdGenerator; this.processor = processor; } - public Set<Relationship> getAvailableRelationships() { - final Set<Relationship> relationships = new HashSet<>(processor.getRelationships()); - relationships.removeAll(unavailableRelationships); - return relationships; - } - - public void setUnavailableRelationships(final Set<Relationship> relationships) { - this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); - } - - public Set<Relationship> getUnavailableRelationships() { - return unavailableRelationships; - } - public MockFlowFileQueue getFlowFileQueue() { return flowFileQueue; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3a7b8de0/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 54b611d..40d5035 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -426,9 +426,9 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void setRelationshipAvailable(final Relationship relationship) { - final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); + final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); unavailable.remove(relationship); - sharedState.setUnavailableRelationships(unavailable); + context.setUnavailableRelationships(unavailable); } @Override @@ -438,9 +438,9 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void setRelationshipUnavailable(final Relationship relationship) { - final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); + final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); unavailable.add(relationship); - sharedState.setUnavailableRelationships(unavailable); + context.setUnavailableRelationships(unavailable); } @Override
