This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 359fd3f  NIFI-7476: Implemented FlowFileGating / FlowFileConcurrency 
at the ProcessGroup level Added FlowFileOutboundPolicy to ProcessGroups and 
updated LocalPort to make use of it Persisted FlowFile Concurrency and FlowFile 
Output Policy to flow.xml.gz and included in flow fingerprint Added 
configuration for FlowFile concurrency and outbound policy to UI for 
configuration of Process Groups Added system tests. Fixed a couple of bugs that 
were found Fixed a couple of typos in the  [...]
359fd3f is described below

commit 359fd3ff299c6abb7e7b4d5dfb99e48570aeede5
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed May 20 17:09:40 2020 -0400

    NIFI-7476: Implemented FlowFileGating / FlowFileConcurrency at the 
ProcessGroup level
    Added FlowFileOutboundPolicy to ProcessGroups and updated LocalPort to make 
use of it
    Persisted FlowFile Concurrency and FlowFile Output Policy to flow.xml.gz 
and included in flow fingerprint
    Added configuration for FlowFile concurrency and outbound policy to UI for 
configuration of Process Groups
    Added system tests. Fixed a couple of bugs that were found
    Fixed a couple of typos in the RecordPath guide
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #4306.
---
 .../apache/nifi/web/api/dto/ProcessGroupDTO.java   |  21 +++
 .../apache/nifi/groups/FlowFileConcurrency.java    |  39 ++++++
 .../java/org/apache/nifi/groups/FlowFileGate.java  |  18 +--
 .../apache/nifi/groups/FlowFileOutboundPolicy.java |  35 +++++
 .../java/org/apache/nifi/groups/ProcessGroup.java  |  42 ++++++
 .../org/apache/nifi/connectable/LocalPort.java     | 127 ++++++++++++++---
 .../nifi/controller/StandardFlowSnippet.java       |  13 ++
 .../nifi/controller/StandardFlowSynchronizer.java  |  31 +++++
 .../serialization/FlowFromDOMFactory.java          |   2 +
 .../serialization/StandardFlowSerializer.java      |   2 +
 .../nifi/fingerprint/FingerprintFactory.java       |   2 +
 .../nifi/groups/SingleConcurrencyFlowFileGate.java |  57 ++++++++
 .../apache/nifi/groups/StandardProcessGroup.java   |  93 +++++++++++++
 .../apache/nifi/groups/UnboundedFlowFileGate.java  |  25 ++--
 .../src/main/resources/FlowConfiguration.xsd       |  22 ++-
 .../org/apache/nifi/connectable/TestLocalPort.java |  14 +-
 .../controller/service/mock/MockProcessGroup.java  |  45 ++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |   4 +
 .../nifi/web/dao/impl/StandardProcessGroupDAO.java |  14 +-
 .../canvas/process-group-configuration.jsp         |  18 +++
 .../webapp/css/process-group-configuration.css     |   8 ++
 .../js/nf/canvas/nf-process-group-configuration.js |  45 +++++-
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  72 +++++++++-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  31 ++++-
 .../system/pg/SingleFlowFileConcurrencyIT.java     | 152 +++++++++++++++++++++
 .../cli/impl/client/nifi/OutputPortClient.java     |   6 +
 .../client/nifi/impl/JerseyOutputPortClient.java   |   5 +
 27 files changed, 877 insertions(+), 66 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
index 349d0bb..1b3bfe2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
@@ -34,6 +34,8 @@ public class ProcessGroupDTO extends ComponentDTO {
     private Map<String, String> variables;
     private VersionControlInformationDTO versionControlInformation;
     private ParameterContextReferenceEntity parameterContext;
+    private String flowfileConcurrency;
+    private String flowfileOutboundPolicy;
 
     private Integer runningCount;
     private Integer stoppedCount;
@@ -352,4 +354,23 @@ public class ProcessGroupDTO extends ComponentDTO {
     public void setParameterContext(final ParameterContextReferenceEntity 
parameterContext) {
         this.parameterContext = parameterContext;
     }
+
+    @ApiModelProperty(value = "The FlowFile Concurrency for this Process 
Group.", allowableValues = "UNBOUNDED, SINGLE_FLOWFILE_PER_NODE")
+    public String getFlowfileConcurrency() {
+        return flowfileConcurrency;
+    }
+
+    public void setFlowfileConcurrency(final String flowfileConcurrency) {
+        this.flowfileConcurrency = flowfileConcurrency;
+    }
+
+    @ApiModelProperty(value = "The Oubound Policy that is used for determining 
how FlowFiles should be transferred out of the Process Group.",
+        allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT")
+    public String getFlowfileOutboundPolicy() {
+        return flowfileOutboundPolicy;
+    }
+
+    public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) 
{
+        this.flowfileOutboundPolicy = flowfileOutboundPolicy;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java
new file mode 100644
index 0000000..6c91c66
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+/**
+ * Specifies the concurrency level of a Process Group
+ */
+public enum FlowFileConcurrency {
+
+    /**
+     * Only a single FlowFile is to be allowed to enter the Process Group at a 
time.
+     * While that FlowFile may be split into many or spawn many children, no 
additional FlowFiles will be
+     * allowed to enter the Process Group through a Local Input Port until the 
previous FlowFile - and all of its
+     * child/descendent FlowFiles - have been processed. In a clustered 
instance, each node may allow through
+     * a single FlowFile at a time, so multiple FlowFiles may still be 
processed concurrently across the cluster.
+     */
+    SINGLE_FLOWFILE_PER_NODE,
+
+    /**
+     * The number of FlowFiles that can be processed concurrently is unbounded.
+     */
+    UNBOUNDED;
+
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java
similarity index 52%
copy from 
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
copy to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java
index b635404..e700851 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java
@@ -14,23 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
-import org.apache.nifi.web.api.entity.PortEntity;
+package org.apache.nifi.groups;
 
-import java.io.IOException;
+public interface FlowFileGate {
 
-public interface OutputPortClient {
+    boolean tryClaim();
 
-    PortEntity createOutputPort(String parentGroupId, PortEntity entity) 
throws NiFiClientException, IOException;
+    void releaseClaim();
 
-    PortEntity getOutputPort(String id) throws NiFiClientException, 
IOException;
-
-    PortEntity updateOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
-
-    PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
-
-    PortEntity startInpuOutputPort(PortEntity entity) throws 
NiFiClientException, IOException;
-
-    PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java
new file mode 100644
index 0000000..767dfae
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+public enum FlowFileOutboundPolicy {
+
+    /**
+     * FlowFiles that are queued up to be transferred out of a ProcessGroup by 
an Output Port will be transferred
+     * out of the Process Group as soon as they are available.
+     */
+    STREAM_WHEN_AVAILABLE,
+
+    /**
+     * FlowFiles that are queued up to be transferred out of a Process Group 
by an Output Port will remain queued until
+     * all FlowFiles in the Process Group are ready to be transferred out of 
the group. The FlowFiles will then be transferred
+     * out of the group. I.e., the FlowFiles will be batched together and 
transferred at the same time (not necessarily in a single
+     * Process Session) but no FlowFile will be transferred until all 
FlowFiles in the group are ready to be transferred.
+     */
+    BATCH_OUTPUT;
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index a386b49..3e111cf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -1062,4 +1062,46 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
      * @param updatedParameters a Map of parameter name to the ParameterUpdate 
that describes how the Parameter was updated
      */
     void onParameterContextUpdated(Map<String, ParameterUpdate> 
updatedParameters);
+
+    /**
+     * @return the FlowFileGate that must be used for obtaining a claim before 
an InputPort is allowed to bring data into a ProcessGroup
+     */
+    FlowFileGate getFlowFileGate();
+
+    /**
+     * @return the FlowFileConcurrency that is currently configured for the 
ProcessGroup
+     */
+    FlowFileConcurrency getFlowFileConcurrency();
+
+    /**
+     * Sets the FlowFileConcurrency to use for this ProcessGroup
+     * @param flowFileConcurrency the FlowFileConcurrency to use
+     */
+    void setFlowFileConcurrency(FlowFileConcurrency flowFileConcurrency);
+
+    /**
+     * @return the FlowFile Outbound Policy that governs the behavior of this 
Process Group
+     */
+    FlowFileOutboundPolicy getFlowFileOutboundPolicy();
+
+    /**
+     * Specifies the FlowFile Outbound Policy that should be applied to this 
Process Group
+     * @param outboundPolicy the policy to enforce.
+     */
+    void setFlowFileOutboundPolicy(FlowFileOutboundPolicy outboundPolicy);
+
+    /**
+     * @return true if at least one FlowFile resides in a FlowFileQueue in 
this Process Group or a child ProcessGroup, false otherwise
+     */
+    boolean isDataQueued();
+
+    /**
+     * Indicates whether or not data is queued for Processing. Data is 
considered queued for processing if it is enqueued in a Connection and
+     * the destination of that Connection is not an Output Port, OR if the 
data is enqueued within a child group, regardless of whether or not it is
+     * queued before an Output Port. I.e., any data that is enqueued in this 
Process Group is enqueued for Processing unless it is ready to be transferred
+     * out of this Process Group.
+     *
+     * @return <code>true</code> if there is data that is queued for 
Processing, <code>false</code> otherwise
+     */
+    boolean isDataQueuedForProcessing();
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
index 7f1173e..c636f92 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -20,11 +20,16 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileGate;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -39,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * NiFi.
  */
 public class LocalPort extends AbstractPort {
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalPort.class);
 
     // "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property 
allowing users to configure
     // the number of concurrent tasks to schedule for local ports and funnels.
@@ -51,7 +57,7 @@ public class LocalPort extends AbstractPort {
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
-    final int maxIterations;
+    private final int maxIterations;
 
     public LocalPort(final String id, final String name, final ConnectableType 
type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) {
         super(id, name, type, scheduler);
@@ -61,8 +67,12 @@ public class LocalPort extends AbstractPort {
 
         int maxTransferredFlowFiles = 
Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME,
 "10000"));
         maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 
1000.0));
+        setYieldPeriod(nifiProperties.getBoredYieldDuration());
     }
 
+    protected int getMaxIterations() {
+        return maxIterations;
+    }
 
     private boolean[] validateConnections() {
         // LocalPort requires both in/out.
@@ -109,28 +119,104 @@ public class LocalPort extends AbstractPort {
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         readLock.lock();
         try {
-            Set<Relationship> available = context.getAvailableRelationships();
-            int iterations = 0;
-            while (!available.isEmpty()) {
-                final List<FlowFile> flowFiles = session.get(1000);
-                if (flowFiles.isEmpty()) {
-                    break;
-                }
+            if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
+                triggerOutputPort(context, session);
+            } else {
+                triggerInputPort(context, session);
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
 
-                session.transfer(flowFiles, Relationship.ANONYMOUS);
-                session.commit();
+    private void triggerOutputPort(final ProcessContext context, final 
ProcessSession session) {
+        final boolean shouldTransfer = isTransferDataOut();
+        if (shouldTransfer) {
+            transferUnboundedConcurrency(context, session);
+        } else {
+            context.yield();
+        }
+    }
 
-                // If there are fewer than 1,000 FlowFiles available to 
transfer, or if we
-                // have hit the configured FlowFile cap, we want to stop. This 
prevents us from
-                // holding the Timer-Driven Thread for an excessive amount of 
time.
-                if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
-                    break;
-                }
+    private void triggerInputPort(final ProcessContext context, final 
ProcessSession session) {
+        final FlowFileGate flowFileGate = getProcessGroup().getFlowFileGate();
+        final boolean obtainedClaim = flowFileGate.tryClaim();
+        if (!obtainedClaim) {
+            logger.trace("{} failed to obtain claim for FlowFileGate. Will 
yield and will not transfer any FlowFiles", this);
+            context.yield();
+            return;
+        }
+
+        try {
+            logger.trace("{} obtained claim for FlowFileGate", this);
 
-                available = context.getAvailableRelationships();
+            final FlowFileConcurrency flowFileConcurrency = 
getProcessGroup().getFlowFileConcurrency();
+            switch (flowFileConcurrency) {
+                case UNBOUNDED:
+                    transferUnboundedConcurrency(context, session);
+                    break;
+                case SINGLE_FLOWFILE_PER_NODE:
+                    transferSingleFlowFile(session);
+                    break;
             }
         } finally {
-            readLock.unlock();
+            flowFileGate.releaseClaim();
+            logger.trace("{} released claim for FlowFileGate", this);
+        }
+    }
+
+    private boolean isTransferDataOut() {
+        final FlowFileConcurrency flowFileConcurrency = 
getProcessGroup().getFlowFileConcurrency();
+        if (flowFileConcurrency == FlowFileConcurrency.UNBOUNDED) {
+            logger.trace("{} will transfer data out of Process Group because 
FlowFile Concurrency is Unbounded", this);
+            return true;
+        }
+
+        final FlowFileOutboundPolicy outboundPolicy = 
getProcessGroup().getFlowFileOutboundPolicy();
+        if (outboundPolicy == FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE) {
+            logger.trace("{} will transfer data out of Process Group because 
FlowFile Outbound Policy is Stream When Available", this);
+            return true;
+        }
+
+        final boolean queuedForProcessing = 
getProcessGroup().isDataQueuedForProcessing();
+        if (queuedForProcessing) {
+            logger.trace("{} will not transfer data out of Process Group 
because FlowFile Outbound Policy is Batch Output and there is data queued for 
Processing", this);
+            return false;
+        }
+
+        logger.trace("{} will transfer data out of Process Group because there 
is no data queued for processing", this);
+        return true;
+    }
+
+    private void transferSingleFlowFile(final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        session.transfer(flowFile, Relationship.ANONYMOUS);
+    }
+
+    protected void transferUnboundedConcurrency(final ProcessContext context, 
final ProcessSession session) {
+        Set<Relationship> available = context.getAvailableRelationships();
+        int iterations = 0;
+        while (!available.isEmpty()) {
+            final List<FlowFile> flowFiles = session.get(1000);
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+
+            session.transfer(flowFiles, Relationship.ANONYMOUS);
+            session.commit();
+
+            // If there are fewer than 1,000 FlowFiles available to transfer, 
or if we
+            // have hit the configured FlowFile cap, we want to stop. This 
prevents us from
+            // holding the Timer-Driven Thread for an excessive amount of time.
+            if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
+                break;
+            }
+
+            available = context.getAvailableRelationships();
         }
     }
 
@@ -223,4 +309,9 @@ public class LocalPort extends AbstractPort {
     public String getComponentType() {
         return "Local Port";
     }
+
+    @Override
+    public String toString() {
+        return "LocalPort[id=" + getIdentifier() + ", type=" + 
getConnectableType() + "]";
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index a65583f..85e5984 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -31,6 +31,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@@ -454,6 +456,17 @@ public class StandardFlowSnippet implements FlowSnippet {
             childGroup.setPosition(toPosition(groupDTO.getPosition()));
             childGroup.setComments(groupDTO.getComments());
             childGroup.setName(groupDTO.getName());
+
+            final String flowfileConcurrentName = 
groupDTO.getFlowfileConcurrency();
+            if (flowfileConcurrentName != null) {
+                
childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName));
+            }
+
+            final String outboundPolicyName = 
groupDTO.getFlowfileOutboundPolicy();
+            if (outboundPolicyName != null) {
+                
childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName));
+            }
+
             if (groupDTO.getVariables() != null) {
                 childGroup.setVariables(groupDTO.getVariables());
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index c9460f8..7bb5052 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -57,6 +57,8 @@ import 
org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@@ -1156,6 +1158,8 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         final String name = dto.getName();
         final PositionDTO position = dto.getPosition();
         final String comments = dto.getComments();
+        final String flowfileConcurrencyName = dto.getFlowfileConcurrency();
+        final String flowfileOutboundPolicyName = 
dto.getFlowfileOutboundPolicy();
 
         if (name != null) {
             group.setName(name);
@@ -1167,6 +1171,18 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             group.setComments(comments);
         }
 
+        if (flowfileConcurrencyName == null) {
+            group.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED);
+        } else {
+            
group.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName));
+        }
+
+        if (flowfileOutboundPolicyName == null) {
+            
group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE);
+        } else {
+            
group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
+        }
+
         final ParameterContextReferenceEntity parameterContextReference = 
dto.getParameterContext();
         if (parameterContextReference != null && 
parameterContextReference.getId() != null) {
             final String parameterContextId = 
parameterContextReference.getId();
@@ -1274,6 +1290,21 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             parentGroup.addProcessGroup(processGroup);
         }
 
+        final String flowfileConcurrencyName = 
processGroupDTO.getFlowfileConcurrency();
+        final String flowfileOutboundPolicyName = 
processGroupDTO.getFlowfileOutboundPolicy();
+        if (flowfileConcurrencyName == null) {
+            processGroup.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED);
+        } else {
+            
processGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName));
+        }
+
+        if (flowfileOutboundPolicyName == null) {
+            
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE);
+        } else {
+            
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
+        }
+
+
         final String parameterContextId = getString(processGroupElement, 
"parameterContextId");
         if (parameterContextId != null) {
             final ParameterContext parameterContext = 
controller.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index e64e573..84d8cf4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -183,6 +183,8 @@ public class FlowFromDOMFactory {
         dto.setName(getString(element, "name"));
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
         dto.setComments(getString(element, "comment"));
+        dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency"));
+        dto.setFlowfileOutboundPolicy(getString(element, 
"flowfileOutboundPolicy"));
 
         final Map<String, String> variables = new HashMap<>();
         final NodeList variableList = DomUtils.getChildNodesByTagName(element, 
"variable");
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index a9b203e..920ccf5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -234,6 +234,8 @@ public class StandardFlowSerializer implements 
FlowSerializer<Document> {
         addTextElement(element, "name", group.getName());
         addPosition(element, group.getPosition());
         addTextElement(element, "comment", group.getComments());
+        addTextElement(element, "flowfileConcurrency", 
group.getFlowFileConcurrency().name());
+        addTextElement(element, "flowfileOutboundPolicy", 
group.getFlowFileOutboundPolicy().name());
 
         final VersionControlInformation versionControlInfo = 
group.getVersionControlInformation();
         if (versionControlInfo != null) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index c3c95e6..7a11c51 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -350,6 +350,8 @@ public class FingerprintFactory {
         appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processGroupElem, "id"));
         appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId"));
         appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId"));
+        appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency"));
+        appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy"));
 
         final Element versionControlInfo = DomUtils.getChild(processGroupElem, 
"versionControlInformation");
         if (versionControlInfo == null) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
new file mode 100644
index 0000000..da1041a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+public class SingleConcurrencyFlowFileGate implements FlowFileGate {
+    private final BooleanSupplier groupEmptyCheck;
+    private final AtomicBoolean claimed = new AtomicBoolean(false);
+
+    public SingleConcurrencyFlowFileGate(final BooleanSupplier 
groupEmptyCheck) {
+        this.groupEmptyCheck = groupEmptyCheck;
+    }
+
+    @Override
+    public boolean tryClaim() {
+        // Check if the claim is already held and atomically set it to being 
held.
+        final boolean alreadyClaimed = claimed.getAndSet(true);
+        if (alreadyClaimed) {
+            // If claim was already held, then this thread failed to obtain 
the claim.
+            return false;
+        }
+
+        // The claim is now held by this thread. Check if the ProcessGroup is 
empty.
+        final boolean empty = groupEmptyCheck.getAsBoolean();
+        if (empty) {
+            // Process Group is empty so return true indicating that the claim 
is now held.
+            return true;
+        }
+
+        // Process Group was not empty, so we cannot allow any more FlowFiles 
through. Reset claimed to false and return false,
+        // indicating that the caller did not obtain the claim.
+        claimed.set(false);
+        return false;
+    }
+
+    @Override
+    public void releaseClaim() {
+        claimed.set(false);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 0c091d8..da53527 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -196,6 +196,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     private final VersionControlFields versionControlFields = new 
VersionControlFields();
     private volatile ParameterContext parameterContext;
 
+    private FlowFileConcurrency flowFileConcurrency = 
FlowFileConcurrency.UNBOUNDED;
+    private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate();
+    private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = 
FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
+
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
@@ -5280,4 +5284,93 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
         }
     }
+
+    @Override
+    public FlowFileGate getFlowFileGate() {
+        return flowFileGate;
+    }
+
+    @Override
+    public FlowFileConcurrency getFlowFileConcurrency() {
+        readLock.lock();
+        try {
+            return flowFileConcurrency;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public void setFlowFileConcurrency(final FlowFileConcurrency 
flowFileConcurrency) {
+        writeLock.lock();
+        try {
+            if (this.flowFileConcurrency == flowFileConcurrency) {
+                return;
+            }
+
+            this.flowFileConcurrency = flowFileConcurrency;
+            switch (flowFileConcurrency) {
+                case UNBOUNDED:
+                    flowFileGate = new UnboundedFlowFileGate();
+                    break;
+                case SINGLE_FLOWFILE_PER_NODE:
+                    flowFileGate = new SingleConcurrencyFlowFileGate(() -> 
!isDataQueued());
+                    break;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isDataQueued() {
+        return isDataQueued(connection -> true);
+    }
+
+    @Override
+    public boolean isDataQueuedForProcessing() {
+        // Data is queued for processing if a connection has data queued and 
the connection's destination is NOT an Output Port.
+        return isDataQueued(connection -> 
connection.getDestination().getConnectableType() != 
ConnectableType.OUTPUT_PORT);
+    }
+
+    private boolean isDataQueued(final Predicate<Connection> connectionFilter) 
{
+        readLock.lock();
+        try {
+            for (final Connection connection : this.connections.values()) {
+                // If the connection doesn't pass the filter, just skip over 
it.
+                if (!connectionFilter.test(connection)) {
+                    continue;
+                }
+
+                final boolean queueEmpty = 
connection.getFlowFileQueue().isEmpty();
+                if (!queueEmpty) {
+                    return true;
+                }
+            }
+
+            for (final ProcessGroup child : this.processGroups.values()) {
+                // Check if the child Process Group has any data enqueued. 
Note that we call #isDataQueued here and NOT
+                // #isDataQueeudForProcesing. I.e., regardless of whether this 
is called from #isDataQueued or #isDataQueuedForProcessing,
+                // for child groups, we only call #isDataQueued. This is 
because if data is queued up for the Output Port of a child group,
+                // it is still considered to be data that is being processed 
by this Process Group.
+                if (child.isDataQueued()) {
+                    return true;
+                }
+            }
+
+            return false;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public FlowFileOutboundPolicy getFlowFileOutboundPolicy() {
+        return flowFileOutboundPolicy;
+    }
+
+    @Override
+    public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy 
flowFileOutboundPolicy) {
+        this.flowFileOutboundPolicy = flowFileOutboundPolicy;
+    }
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java
similarity index 52%
copy from 
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
copy to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java
index b635404..352053b 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java
@@ -14,23 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
-import org.apache.nifi.web.api.entity.PortEntity;
+package org.apache.nifi.groups;
 
-import java.io.IOException;
+public class UnboundedFlowFileGate implements FlowFileGate {
+    @Override
+    public boolean tryClaim() {
+        return true;
+    }
 
-public interface OutputPortClient {
-
-    PortEntity createOutputPort(String parentGroupId, PortEntity entity) 
throws NiFiClientException, IOException;
-
-    PortEntity getOutputPort(String id) throws NiFiClientException, 
IOException;
-
-    PortEntity updateOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
-
-    PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
-
-    PortEntity startInpuOutputPort(PortEntity entity) throws 
NiFiClientException, IOException;
-
-    PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
+    @Override
+    public void releaseClaim() {
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index ca43ebc..36d7d92 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -178,6 +178,8 @@
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="position" type="PositionType" />
             <xs:element name="comment" type="xs:string" />
+            <xs:element name="flowfileConcurrency" 
type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
+            <xs:element name="flowfileOutboundPolicy" 
type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
                <xs:element name="versionControlInformation" 
type="VersionControlInformation" minOccurs="0" maxOccurs="1" />
 
             <!-- Each "processor" defines the actual dataflow work horses that 
make dataflow happen-->
@@ -203,6 +205,21 @@
         <xs:attribute name="value" />
     </xs:complexType>
 
+    <xs:simpleType name="FlowFileConcurrencyType">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="SINGLE_FLOWFILE_PER_NODE" />
+            <xs:enumeration value="UNBOUNDED" />
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="FlowFileOutboundPolicyType">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="STREAM_WHEN_AVAILABLE" />
+            <xs:enumeration value="BATCH_OUTPUT" />
+        </xs:restriction>
+    </xs:simpleType>
+
+
     <xs:complexType name="VersionControlInformation">
         <xs:sequence>
             <xs:element name="registryId" type="NonEmptyStringType" />
@@ -216,8 +233,7 @@
     </xs:complexType>
 
     <!-- Same as ProcessGroupType except:
-         - RootProcessGroupType doesn't have versionControlInformation
-         - RootProcessGroupType doesn't have variable -->
+         - RootProcessGroupType doesn't have versionControlInformation -->
     <xs:complexType name="RootProcessGroupType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -225,6 +241,8 @@
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="position" type="PositionType" />
             <xs:element name="comment" type="xs:string" />
+            <xs:element name="flowfileConcurrency" 
type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
+            <xs:element name="flowfileOutboundPolicy" 
type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
 
             <!-- Each "processor" defines the actual dataflow work horses that 
make dataflow happen-->
             <xs:element name="processor" type="ProcessorType" minOccurs="0" 
maxOccurs="unbounded"/>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java
index 25b8929..aa6ffe4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java
@@ -37,14 +37,14 @@ public class TestLocalPort {
     public void testDefaultValues() {
         LocalPort port = getLocalInputPort();
         assertEquals(1, port.getMaxConcurrentTasks());
-        assertEquals(10, port.maxIterations);
+        assertEquals(10, port.getMaxIterations());
     }
 
     @Test
     public void testSetConcurrentTasks() {
         LocalPort port = 
getLocalInputPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
         assertEquals(2, port.getMaxConcurrentTasks());
-        assertEquals(10, port.maxIterations);
+        assertEquals(10, port.getMaxIterations());
     }
 
     @Test
@@ -52,27 +52,27 @@ public class TestLocalPort {
         {
             LocalPort port = 
getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
             assertEquals(1, port.getMaxConcurrentTasks());
-            assertEquals(100, port.maxIterations);
+            assertEquals(100, port.getMaxIterations());
         }
         {
             LocalPort port = 
getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
             assertEquals(1, port.getMaxConcurrentTasks());
-            assertEquals(101, port.maxIterations);
+            assertEquals(101, port.getMaxIterations());
         }
         {
             LocalPort port = 
getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
             assertEquals(1, port.getMaxConcurrentTasks());
-            assertEquals(100, port.maxIterations);
+            assertEquals(100, port.getMaxIterations());
         }
         {
             LocalPort port = 
getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
             assertEquals(1, port.getMaxConcurrentTasks());
-            assertEquals(1, port.maxIterations);
+            assertEquals(1, port.getMaxIterations());
         }
         {
             LocalPort port = 
getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
             assertEquals(1, port.getMaxConcurrentTasks());
-            assertEquals(1, port.maxIterations);
+            assertEquals(1, port.getMaxIterations());
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 2955838..c5f05bc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -32,6 +32,9 @@ import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileGate;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
@@ -720,6 +723,48 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
+    public FlowFileGate getFlowFileGate() {
+        return new FlowFileGate() {
+            @Override
+            public boolean tryClaim() {
+                return true;
+            }
+
+            @Override
+            public void releaseClaim() {
+            }
+        };
+    }
+
+    @Override
+    public FlowFileConcurrency getFlowFileConcurrency() {
+        return FlowFileConcurrency.UNBOUNDED;
+    }
+
+    @Override
+    public void setFlowFileConcurrency(final FlowFileConcurrency 
flowFileConcurrency) {
+    }
+
+    @Override
+    public FlowFileOutboundPolicy getFlowFileOutboundPolicy() {
+        return FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
+    }
+
+    @Override
+    public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy 
outboundPolicy) {
+    }
+
+    @Override
+    public boolean isDataQueued() {
+        return false;
+    }
+
+    @Override
+    public boolean isDataQueuedForProcessing() {
+        return false;
+    }
+
+    @Override
     public void terminateProcessor(ProcessorNode processor) {
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 2557241..9d70f9e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -2477,6 +2477,8 @@ public final class DtoFactory {
         dto.setName(group.getName());
         
dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
         
dto.setVersionControlInformation(createVersionControlInformationDto(group));
+        dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name());
+        
dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
 
         final ParameterContext parameterContext = group.getParameterContext();
         if (parameterContext != null) {
@@ -4284,6 +4286,8 @@ public final class DtoFactory {
         copy.setOutputPortCount(original.getOutputPortCount());
         copy.setParentGroupId(original.getParentGroupId());
         copy.setVersionedComponentId(original.getVersionedComponentId());
+        copy.setFlowfileConcurrency(original.getFlowfileConcurrency());
+        copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy());
 
         copy.setRunningCount(original.getRunningCount());
         copy.setStoppedCount(original.getStoppedCount());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index b5f5bcf..ba2c6c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -25,6 +25,8 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.parameter.ParameterContext;
@@ -335,6 +337,11 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
 
         final String name = processGroupDTO.getName();
         final String comments = processGroupDTO.getComments();
+        final String concurrencyName = 
processGroupDTO.getFlowfileConcurrency();
+        final FlowFileConcurrency flowFileConcurrency = concurrencyName == 
null ? null : FlowFileConcurrency.valueOf(concurrencyName);
+
+        final String outboundPolicyName = 
processGroupDTO.getFlowfileOutboundPolicy();
+        final FlowFileOutboundPolicy flowFileOutboundPolicy = 
outboundPolicyName == null ? null : 
FlowFileOutboundPolicy.valueOf(outboundPolicyName);
 
         final ParameterContextReferenceEntity parameterContextReference = 
processGroupDTO.getParameterContext();
         if (parameterContextReference != null) {
@@ -364,7 +371,12 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
         if (isNotNull(comments)) {
             group.setComments(comments);
         }
-
+        if (flowFileConcurrency != null) {
+            group.setFlowFileConcurrency(flowFileConcurrency);
+        }
+        if (flowFileOutboundPolicy != null) {
+            group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
+        }
         group.onComponentModified();
         return group;
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp
index b21ba0c..eb35513 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp
@@ -54,6 +54,24 @@
                             <span id="read-only-process-group-comments" 
class="unset"></span>
                         </div>
                     </div>
+                    <div class="setting">
+                        <div class="setting-name">Process group FlowFile 
concurrency</div>
+                        <div class="editable setting-field">
+                            <div 
id="process-group-flowfile-concurrency-combo"></div>
+                        </div>
+                        <div class="read-only setting-field">
+                            <span 
id="read-only-process-group-flowfile-concurrency" class="unset"></span>
+                        </div>
+                    </div>
+                    <div class="setting">
+                        <div class="setting-name">Process group outbound 
policy</div>
+                        <div class="editable setting-field">
+                            <div 
id="process-group-outbound-policy-combo"></div>
+                        </div>
+                        <div class="read-only setting-field">
+                            <span id="read-only-process-group-outbound-policy" 
class="unset"></span>
+                        </div>
+                    </div>
                     <div class="editable settings-buttons">
                         <div id="process-group-configuration-save" 
class="button">Apply</div>
                         <div class="clear"></div>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css
index ac968b4..b6322f6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css
@@ -85,6 +85,14 @@
     width: 328px;
 }
 
+#process-group-flowfile-concurrency-combo {
+    width: 328px;
+}
+
+#process-group-outbound-policy-combo {
+    width: 328px;
+}
+
 #process-group-comments {
     height: 100px;
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js
index c4dbfd8..79af2ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js
@@ -105,7 +105,9 @@
                 'comments': $('#process-group-comments').val(),
                 'parameterContext': {
                     'id': 
$('#process-group-parameter-context-combo').combo('getSelectedOption').value
-                }
+                },
+                'flowfileConcurrency': 
$('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value,
+                'flowfileOutboundPolicy': 
$('#process-group-outbound-policy-combo').combo('getSelectedOption').value
             }
         };
 
@@ -212,6 +214,41 @@
                     // populate the process group settings
                     
$('#process-group-name').removeClass('unset').val(processGroup.name);
                     
$('#process-group-comments').removeClass('unset').val(processGroup.comments);
+                    
$('#process-group-flowfile-concurrency-combo').removeClass('unset').combo({
+                        options: [{
+                                text: 'Single FlowFile Per Node',
+                                value: 'SINGLE_FLOWFILE_PER_NODE',
+                                description: 'Only a single FlowFile is to be 
allowed to enter the Process Group at a time on each node in the cluster. While 
that FlowFile may be split into many or '
+                                    + 'spawn many children, no additional 
FlowFiles will be allowed to enter the Process Group through a Local Input Port 
until the previous FlowFile '
+                                    + '- and all of its child/descendent 
FlowFiles - have been processed.'
+                            }, {
+                                text: 'Unbounded',
+                                value: 'UNBOUNDED',
+                                description: 'The number of FlowFiles that can 
be processed concurrently is unbounded.'
+                            }],
+                        selectedOption: {
+                            value: processGroup.flowfileConcurrency
+                        }
+                    });
+
+                    
$('#process-group-outbound-policy-combo').removeClass('unset').combo({
+                        options: [{
+                                text: 'Stream When Available',
+                                value: 'STREAM_WHEN_AVAILABLE',
+                                description: 'FlowFiles that are queued up to 
be transferred out of a ProcessGroup by an Output Port will be transferred out '
+                                        + 'of the Process Group as soon as 
they are available.'
+                            }, {
+                                text: 'Batch Output',
+                                value: 'BATCH_OUTPUT',
+                                description: 'FlowFiles that are queued up to 
be transferred out of a Process Group by an Output Port will remain queued 
until '
+                                        + 'all FlowFiles in the Process Group 
are ready to be transferred out of the group. The FlowFiles will then be 
transferred '
+                                        + 'out of the group. This setting will 
be ignored if the FlowFile Concurrency is Unbounded.'
+                            }],
+                        selectedOption: {
+                            value: processGroup.flowfileOutboundPolicy
+                        }
+                    });
+
 
                     // populate the header
                     
$('#process-group-configuration-header-text').text(processGroup.name + ' 
Configuration');
@@ -228,6 +265,12 @@
                         
$('#read-only-process-group-name').text(processGroup.name);
                         
$('#read-only-process-group-comments').text(processGroup.comments);
 
+                        var concurrencyName = processGroup.flowfileConcurrency 
== "UNBOUNDED" ? "Unbounded" : "Single FlowFile Per Node";
+                        
$('#read-only-process-group-flowfile-concurrency').text(concurrencyName);
+
+                        var outboundPolicyName = 
processGroup.flowfileOutboundPolicy == "BATCH_OUTPUT" ? "Batch Output" : 
"Stream When Available";
+                        
$('#read-only-process-group-outbound-policy').text(outboundPolicyName);
+
                         // populate the header
                         
$('#process-group-configuration-header-text').text(processGroup.name + ' 
Configuration');
                     } else {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index a9806a0..3c5d4f9 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.tests.system;
 
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
@@ -380,12 +381,21 @@ public class NiFiClientUtil {
         return counterValues;
     }
 
+    public ScheduleComponentsEntity startProcessGroupComponents(final String 
groupId) throws NiFiClientException, IOException {
+        final ScheduleComponentsEntity scheduleComponentsEntity = new 
ScheduleComponentsEntity();
+        scheduleComponentsEntity.setId(groupId);
+        scheduleComponentsEntity.setState("RUNNING");
+        final ScheduleComponentsEntity scheduleEntity = 
nifiClient.getFlowClient().scheduleProcessGroupComponents("root", 
scheduleComponentsEntity);
+
+        return scheduleEntity;
+    }
+
     public ScheduleComponentsEntity stopProcessGroupComponents(final String 
groupId) throws NiFiClientException, IOException {
         final ScheduleComponentsEntity scheduleComponentsEntity = new 
ScheduleComponentsEntity();
-        scheduleComponentsEntity.setId("root");
+        scheduleComponentsEntity.setId(groupId);
         scheduleComponentsEntity.setState("STOPPED");
         final ScheduleComponentsEntity scheduleEntity = 
nifiClient.getFlowClient().scheduleProcessGroupComponents("root", 
scheduleComponentsEntity);
-        waitForProcessorsStopped("root");
+        waitForProcessorsStopped(groupId);
 
         return scheduleEntity;
     }
@@ -536,6 +546,18 @@ public class NiFiClientUtil {
         }
     }
 
+    public ConnectionEntity createConnection(final PortEntity source, final 
PortEntity destination) throws NiFiClientException, IOException {
+        return createConnection(source, destination, 
Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
+    }
+
+    public ConnectionEntity createConnection(final PortEntity source, final 
ProcessorEntity destination) throws NiFiClientException, IOException {
+        return createConnection(source, destination, 
Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
+    }
+
+    public ConnectionEntity createConnection(final ProcessorEntity source, 
final PortEntity destination, final String relationship) throws 
NiFiClientException, IOException {
+        return createConnection(source, destination, 
Collections.singleton(relationship));
+    }
+
     public ConnectionEntity createConnection(final ProcessorEntity source, 
final ProcessorEntity destination, final String relationship) throws 
NiFiClientException, IOException {
         return createConnection(source, destination, 
Collections.singleton(relationship));
     }
@@ -548,27 +570,41 @@ public class NiFiClientUtil {
         return createConnection(createConnectableDTO(source), 
createConnectableDTO(destination), relationships);
     }
 
+    public ConnectionEntity createConnection(final ProcessorEntity source, 
final PortEntity destination, final Set<String> relationships) throws 
NiFiClientException, IOException {
+        return createConnection(createConnectableDTO(source), 
createConnectableDTO(destination), relationships);
+    }
+
+    public ConnectionEntity createConnection(final PortEntity source, final 
PortEntity destination, final Set<String> relationships) throws 
NiFiClientException, IOException {
+        return createConnection(createConnectableDTO(source), 
createConnectableDTO(destination), relationships);
+    }
+
+    public ConnectionEntity createConnection(final PortEntity source, final 
ProcessorEntity destination, final Set<String> relationships) throws 
NiFiClientException, IOException {
+        return createConnection(createConnectableDTO(source), 
createConnectableDTO(destination), relationships);
+    }
+
     public ConnectionEntity createConnection(final ConnectableDTO source, 
final ConnectableDTO destination, final Set<String> relationships) throws 
NiFiClientException, IOException {
+        final String groupId = "OUTPUT_PORT".equals(source.getType()) ? 
destination.getGroupId() : source.getGroupId();
+
         final ConnectionDTO connectionDto = new ConnectionDTO();
         connectionDto.setSelectedRelationships(relationships);
         connectionDto.setDestination(destination);
         connectionDto.setSource(source);
-        connectionDto.setParentGroupId(source.getGroupId());
+        connectionDto.setParentGroupId(groupId);
 
         final ConnectionEntity connectionEntity = new ConnectionEntity();
         connectionEntity.setComponent(connectionDto);
 
         connectionEntity.setDestinationGroupId(destination.getGroupId());
         connectionEntity.setDestinationId(destination.getId());
-        connectionEntity.setDestinationType("PROCESSOR");
+        connectionEntity.setDestinationType(destination.getType());
 
         connectionEntity.setSourceGroupId(source.getGroupId());
         connectionEntity.setSourceId(source.getId());
-        connectionEntity.setDestinationType("PROCESSOR");
+        connectionEntity.setSourceType(source.getType());
 
         connectionEntity.setRevision(createNewRevision());
 
-        return 
nifiClient.getConnectionClient().createConnection(source.getGroupId(), 
connectionEntity);
+        return nifiClient.getConnectionClient().createConnection(groupId, 
connectionEntity);
     }
 
     public ConnectableDTO createConnectableDTO(final ProcessorEntity 
processor) {
@@ -778,6 +814,30 @@ public class NiFiClientUtil {
         return childGroup;
     }
 
+    public PortEntity createInputPort(final String name, final String groupId) 
throws NiFiClientException, IOException {
+        final PortDTO component = new PortDTO();
+        component.setName(name);
+        component.setParentGroupId(groupId);
+
+        final PortEntity inputPortEntity = new PortEntity();
+        inputPortEntity.setRevision(createNewRevision());
+        inputPortEntity.setComponent(component);
+
+        return nifiClient.getInputPortClient().createInputPort(groupId, 
inputPortEntity);
+    }
+
+    public PortEntity createOutputPort(final String name, final String 
groupId) throws NiFiClientException, IOException {
+        final PortDTO component = new PortDTO();
+        component.setName(name);
+        component.setParentGroupId(groupId);
+
+        final PortEntity outputPortEntity = new PortEntity();
+        outputPortEntity.setRevision(createNewRevision());
+        outputPortEntity.setComponent(component);
+
+        return nifiClient.getOutputPortClient().createOutputPort(groupId, 
outputPortEntity);
+    }
+
     public ProvenanceEntity queryProvenance(final Map<SearchableField, String> 
searchTerms, final Long startTime, final Long endTime) throws 
NiFiClientException, IOException {
         final Map<String, String> searchTermsAsStrings = 
searchTerms.entrySet().stream()
             .collect(Collectors.toMap(entry -> 
entry.getKey().getSearchableFieldName(), Map.Entry::getValue));
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 1e37b01..a57d4ea 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -30,18 +30,26 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public abstract class NiFiSystemIT {
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiSystemIT.class);
+    private final ConcurrentMap<String, Long> lastLogTimestamps = new 
ConcurrentHashMap<>();
+
     public static final int CLIENT_API_PORT = 5671;
     public static final String NIFI_GROUP_ID = "org.apache.nifi";
     public static final String TEST_EXTENSIONS_ARTIFACT_ID = 
"nifi-system-test-extensions-nar";
@@ -131,6 +139,8 @@ public abstract class NiFiSystemIT {
     }
 
     protected void waitForAllNodesConnected(final int expectedNumberOfNodes, 
final long sleepMillis) {
+        logger.info("Waiting for {} nodes to connect", expectedNumberOfNodes);
+
         final NiFiClient client = getNifiClient();
 
         final long maxTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(60);
@@ -142,6 +152,8 @@ public abstract class NiFiSystemIT {
                     return;
                 }
 
+                logEverySecond("Waiting for {} nodes to connect but currently 
on {} nodes are connected", expectedNumberOfNodes, connectedNodeCount);
+
                 if (System.currentTimeMillis() > maxTime) {
                     throw new RuntimeException("Waited up to 60 seconds for 
both nodes to connect but only " + connectedNodeCount + " nodes connected");
                 }
@@ -262,10 +274,27 @@ public abstract class NiFiSystemIT {
     }
 
     protected void waitForQueueCount(final String connectionId, final int 
queueSize) throws InterruptedException {
+        logger.info("Waiting for Queue Count of {} on Connection {}", 
queueSize, connectionId);
+
         waitFor(() -> {
             final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
-            return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() 
== queueSize;
+            final int currentSize = 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
+            final String sourceName = 
statusEntity.getConnectionStatus().getSourceName();
+            final String destinationName = 
statusEntity.getConnectionStatus().getDestinationName();
+            logEverySecond("Current Queue Size for Connection from {} to {} = 
{}, Waiting for {}", sourceName, destinationName, currentSize, queueSize);
+
+            return currentSize == queueSize;
         });
+
+        logger.info("Queue Count for Connection {} is now {}", connectionId, 
queueSize);
+    }
+
+    private void logEverySecond(final String message, final Object... args) {
+        final Long lastLogTime = lastLogTimestamps.get(message);
+        if (lastLogTime == null || lastLogTime < System.currentTimeMillis() - 
1000L) {
+            logger.info(message, args);
+            lastLogTimestamps.put(message, System.currentTimeMillis());
+        }
     }
 
     private ConnectionStatusEntity getConnectionStatus(final String 
connectionId) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
new file mode 100644
index 0000000..02e8bbc
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.pg;
+
+import org.apache.nifi.groups.FlowFileConcurrency;
+import org.apache.nifi.groups.FlowFileOutboundPolicy;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
+
+
+    @Test
+    public void testSingleConcurrency() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessGroupEntity processGroupEntity = 
getClientUtil().createProcessGroup("My Group", "root");
+        final PortEntity inputPort = getClientUtil().createInputPort("In", 
processGroupEntity.getId());
+        final PortEntity outputPort = getClientUtil().createOutputPort("Out", 
processGroupEntity.getId());
+
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("Batch Size", "3"));
+        getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
+
+        final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", 
processGroupEntity.getId());
+        getClientUtil().updateProcessorProperties(sleep, 
Collections.singletonMap("onTrigger Sleep Time", "5 sec"));
+
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Connect Generate -> Input Port -> Sleep -> Output Port -> Terminate
+        // Since we will use Single FlowFile at a time concurrency, we should 
see that the connection between Input Port and Sleep
+        // never has more than 1 FlowFile even though the Sleep processor 
takes a long time.
+        final ConnectionEntity generateToInput = 
getClientUtil().createConnection(generate, inputPort, "success");
+        final ConnectionEntity inputToSleep = 
getClientUtil().createConnection(inputPort, sleep);
+        getClientUtil().createConnection(sleep, outputPort, "success");
+        final ConnectionEntity outputToTerminate = 
getClientUtil().createConnection(outputPort, terminate);
+
+        
processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name());
+        
getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity);
+
+        // Start all components except for Terminate. We want the data to 
queue up before terminate so we can ensure that the
+        // correct number of FlowFiles are queued up.
+        getClientUtil().startProcessGroupComponents("root");
+        getNifiClient().getProcessorClient().stopProcessor(terminate);
+
+        // Wait for 1 FlowFile to queue up for the Sleep Processor. This 
should leave 2 FlowFiles queued up for the input port.
+        waitForQueueCount(inputToSleep.getId(), 1);
+        assertEquals(2, getConnectionQueueSize(generateToInput.getId()));
+
+        // Wait until only 1 FlowFile is queued up for the input port. Because 
Sleep should take 5 seconds to complete its job,
+        // It should take 5 seconds for this to happen. But it won't be exact. 
So we'll ensure that it takes at least 3 seconds. We could
+        // put an upper bound such as 6 or 7 seconds as well, but it's a good 
idea to avoid that because the tests may run in some environments
+        // with constrained resources that may take a lot longer to run.
+        final long startTime = System.currentTimeMillis();
+        waitForQueueCount(generateToInput.getId(), 1);
+        final long endTime = System.currentTimeMillis();
+        final long delay = endTime - startTime;
+        assertTrue(delay > 3000L);
+
+        assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
+
+        waitForQueueCount(outputToTerminate.getId(), 2);
+
+        // Wait until all FlowFiles have been ingested.
+        waitForQueueCount(generateToInput.getId(), 0);
+        assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
+
+        // Ensure that 3 FlowFiles are queued up for Terminate
+        waitForQueueCount(outputToTerminate.getId(), 3);
+    }
+
+
+    @Test
+    public void testSingleConcurrencyAndBatchOutput() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity processGroupEntity = 
getClientUtil().createProcessGroup("My Group", "root");
+        final PortEntity inputPort = getClientUtil().createInputPort("In", 
processGroupEntity.getId());
+        final PortEntity outputPort = getClientUtil().createOutputPort("Out", 
processGroupEntity.getId());
+        final PortEntity secondOut = getClientUtil().createOutputPort("Out2", 
processGroupEntity.getId());
+
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
+
+        final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", 
processGroupEntity.getId()); // sleep with default configuration is just a 
simple pass-through
+
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Connect Generate -> Input Port -> Count -> Output Port -> Terminate
+        // Also connect InputPort -> Out2 -> Terminate
+        final ConnectionEntity generateToInput = 
getClientUtil().createConnection(generate, inputPort, "success");
+        final ConnectionEntity inputToSleep = 
getClientUtil().createConnection(inputPort, sleep);
+        final ConnectionEntity sleepToOutput = 
getClientUtil().createConnection(sleep, outputPort, "success");
+        final ConnectionEntity inputToSecondOut = 
getClientUtil().createConnection(inputPort, secondOut);
+        final ConnectionEntity outputToTerminate = 
getClientUtil().createConnection(outputPort, terminate);
+        final ConnectionEntity secondOutToTerminate = 
getClientUtil().createConnection(secondOut, terminate);
+
+        
processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name());
+        
processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name());
+        
getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity);
+
+        // Start generate so that data is created. Start Input Port so that 
the data is ingested.
+        // Start Output Ports but not the Sleep processor. This will keep data 
queued up for the Sleep processor,
+        // and that should prevent data from being transferred by Output Port 
"Out2" also.
+        getNifiClient().getProcessorClient().startProcessor(generate);
+        getNifiClient().getInputPortClient().startInputPort(inputPort);
+        getNifiClient().getOutputPortClient().startOutputPort(outputPort);
+        getNifiClient().getOutputPortClient().startOutputPort(secondOut);
+
+        waitForQueueCount(inputToSleep.getId(), 1);
+        assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
+
+        // Wait 3 seconds to ensure that data is never transferred
+        for (int i=0; i < 3; i++) {
+            Thread.sleep(1000L);
+            assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
+            assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
+        }
+
+        // Start Sleep
+        getNifiClient().getProcessorClient().startProcessor(sleep);
+
+        // Data should now flow from both output ports.
+        waitForQueueCount(inputToSleep.getId(), 0);
+        waitForQueueCount(inputToSecondOut.getId(), 0);
+
+        assertEquals(1, getConnectionQueueSize(outputToTerminate.getId()));
+        assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId()));
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
index b635404..1bfab87 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java
@@ -30,7 +30,13 @@ public interface OutputPortClient {
 
     PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
 
+    /**
+     * @deprecated use startOutputPort
+     */
+    @Deprecated
     PortEntity startInpuOutputPort(PortEntity entity) throws 
NiFiClientException, IOException;
 
+    PortEntity startOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
+
     PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, 
IOException;
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java
index 46637fc..645c95f 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java
@@ -62,6 +62,11 @@ public class JerseyOutputPortClient extends 
CRUDJerseyClient<PortEntity> impleme
 
     @Override
     public PortEntity startInpuOutputPort(final PortEntity entity) throws 
NiFiClientException, IOException {
+        return startOutputPort(entity);
+    }
+
+    @Override
+    public PortEntity startOutputPort(final PortEntity entity) throws 
NiFiClientException, IOException {
         final PortEntity startEntity = createStateEntity(entity, "RUNNING");
         return updateOutputPort(startEntity);
     }

Reply via email to