Repository: nifi
Updated Branches:
  refs/heads/master c614a7443 -> 9583ca99c


NIFI-3414: Added EnforceOrder processor

Use it with FirstInFirstOutPrioritizer, it can enforce original ordering
of 'out-of-order' FlowFiles.

nifi-mock is modified to support FlowFile assertion using Prioritizer.

Signed-off-by: Matt Burgess <[email protected]>

NIFI-3414: Added EnforceOrder processor

Incorporated review comments, added displayNames.

Signed-off-by: Matt Burgess <[email protected]>

NIFI-3414: Added EnforceOrder processor

Incorporate review comments:

- Moved nifi-standard-prioritizers dependency to top level nifi/pom.xml.
- Changed default initial order from 1 to 0.
- Fixed typos.
- Use session.get(batchCount).

Signed-off-by: Matt Burgess <[email protected]>

NIFI-3414: Added EnforceOrder processor

When a FlowFile is transferred to success, remove attributes previously set 
when it was transferred to wait or failure.

Signed-off-by: Matt Burgess <[email protected]>

This closes #1496


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9583ca99
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9583ca99
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9583ca99

Branch: refs/heads/master
Commit: 9583ca99c1e4b2d3c43511939a2a148d20f8e1ac
Parents: c614a74
Author: Koji Kawamura <[email protected]>
Authored: Wed Feb 8 00:13:23 2017 +0900
Committer: Matt Burgess <[email protected]>
Committed: Mon Apr 10 14:03:22 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/util/MockFlowFile.java |  16 +-
 .../apache/nifi/util/MockProcessSession.java    |  39 +-
 nifi-nar-bundles/nifi-framework-bundle/pom.xml  |   5 -
 .../nifi-standard-processors/pom.xml            |   5 +
 .../nifi/processors/standard/EnforceOrder.java  | 551 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestEnforceOrder.java   | 493 +++++++++++++++++
 pom.xml                                         |   5 +
 8 files changed, 1086 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index df87de5..b23f2eb 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -51,10 +51,14 @@ public class MockFlowFile implements FlowFileRecord {
 
     private byte[] data = new byte[0];
 
+    private long lastEnqueuedDate = 0;
+    private long enqueuedIndex = 0;
+
     public MockFlowFile(final long id) {
         this.creationTime = System.nanoTime();
         this.id = id;
         entryDate = System.currentTimeMillis();
+        lastEnqueuedDate = entryDate;
         attributes.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()) + ".mockFlowFile");
         attributes.put(CoreAttributes.PATH.key(), "target");
 
@@ -290,7 +294,11 @@ public class MockFlowFile implements FlowFileRecord {
 
     @Override
     public Long getLastQueueDate() {
-        return entryDate;
+        return lastEnqueuedDate;
+    }
+
+    public void setLastEnqueuedDate(long lastEnqueuedDate) {
+        this.lastEnqueuedDate = lastEnqueuedDate;
     }
 
     @Override
@@ -315,7 +323,11 @@ public class MockFlowFile implements FlowFileRecord {
 
     @Override
     public long getQueueDateIndex() {
-        return 0;
+        return enqueuedIndex;
+    }
+
+    public void setEnqueuedIndex(long enqueuedIndex) {
+        this.enqueuedIndex = enqueuedIndex;
     }
 
     public boolean isAttributeEqual(final String attributeName, final String 
expectedValue) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 3228e1f..faf6e42 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -79,6 +80,8 @@ public class MockProcessSession implements ProcessSession {
     private boolean rolledback = false;
     private final Set<Long> removedFlowFiles = new HashSet<>();
 
+    private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
+
     public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor) {
         this.processor = processor;
         this.sharedState = sharedState;
@@ -715,8 +718,18 @@ public class MockProcessSession implements ProcessSession {
             throw new IllegalArgumentException("I only accept MockFlowFile");
         }
 
+        final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
         beingProcessed.remove(flowFile.getId());
-        processorQueue.offer((MockFlowFile) flowFile);
+        processorQueue.offer(mockFlowFile);
+        updateLastQueuedDate(mockFlowFile);
+
+    }
+
+    private void updateLastQueuedDate(MockFlowFile mockFlowFile) {
+        // Simulate StandardProcessSession.updateLastQueuedDate,
+        // which is called when a flow file is transferred to a relationship.
+        mockFlowFile.setLastEnqueuedDate(System.currentTimeMillis());
+        mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
     }
 
     @Override
@@ -737,14 +750,11 @@ public class MockProcessSession implements ProcessSession 
{
         }
 
         validateState(flowFile);
-        List<MockFlowFile> list = transferMap.get(relationship);
-        if (list == null) {
-            list = new ArrayList<>();
-            transferMap.put(relationship, list);
-        }
+        List<MockFlowFile> list = transferMap.computeIfAbsent(relationship, r 
-> new ArrayList<>());
 
         beingProcessed.remove(flowFile.getId());
         list.add((MockFlowFile) flowFile);
+        updateLastQueuedDate((MockFlowFile) flowFile);
     }
 
     @Override
@@ -753,23 +763,8 @@ public class MockProcessSession implements ProcessSession {
             transfer(flowFiles);
             return;
         }
-        if(!processor.getRelationships().contains(relationship)){
-            throw new IllegalArgumentException("this relationship " + 
relationship.getName() + " is not known");
-        }
-
-        for (final FlowFile flowFile : flowFiles) {
-            validateState(flowFile);
-        }
-
-        List<MockFlowFile> list = transferMap.get(relationship);
-        if (list == null) {
-            list = new ArrayList<>();
-            transferMap.put(relationship, list);
-        }
-
         for (final FlowFile flowFile : flowFiles) {
-            beingProcessed.remove(flowFile.getId());
-            list.add((MockFlowFile) flowFile);
+            transfer(flowFile, relationship);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-nar-bundles/nifi-framework-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/pom.xml
index 1ea79c0..1b343d7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml
@@ -128,11 +128,6 @@
                 <artifactId>nifi-authorizer</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-standard-prioritizers</artifactId>
-                <version>1.2.0-SNAPSHOT</version>
-            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 1a18b08..dc77309 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -215,6 +215,11 @@ language governing permissions and limitations under the 
License. -->
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-prioritizers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
new file mode 100644
index 0000000..fa3d1b6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@EventDriven
+@Tags({"sort", "order"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@TriggerSerially
+@CapabilityDescription("Enforces expected ordering of FlowFiles those belong 
to the same data group. " +
+        " Although PriorityAttributePrioritizer can be used on a connection to 
ensure that flow files going through that connection are in priority order," +
+        " depending on error-handling, branching, and other flow designs, it 
is possible for FlowFiles to get out-of-order." +
+        " EnforceOrder can be used to enforce original ordering for those 
FlowFiles." +
+        " [IMPORTANT] In order to take effect of EnforceOrder, 
FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" +
+        " UNTIL the order of FlowFiles physically get FIXED by operation such 
as MergeContent or being stored to the final destination.")
+@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following 
states per ordering group:" +
+        " '<groupId>.target' is a order number which is being waited to arrive 
next." +
+        " When a FlowFile with a matching order arrives, or a FlowFile 
overtakes the FlowFile being waited for because of wait timeout," +
+        " target order will be updated to (FlowFile.order + 1)." +
+        " '<groupId>.max is the maximum order number for a group." +
+        " '<groupId>.updatedAt' is a timestamp when the order of a group was 
updated last time." +
+        " These managed states will be removed automatically once a group is 
determined as inactive, see 'Inactive Timeout' for detail.")
+@WritesAttributes({
+    @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT,
+        description = "All FlowFiles going through this processor will have 
this attribute. This value is used to determine wait timeout."),
+    @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT,
+        description = "All FlowFiles going through this processor will have 
this attribute denoting which relationship it was routed to."),
+    @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL,
+        description = "FlowFiles routed to 'failure' or 'skipped' relationship 
will have this attribute describing details."),
+    @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER,
+        description = "FlowFiles routed to 'wait' or 'skipped' relationship 
will have this attribute denoting expected order when the FlowFile was 
processed.")
+})
+public class EnforceOrder extends AbstractProcessor {
+
+    public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
+    public static final String ATTR_EXPECTED_ORDER = 
"EnforceOrder.expectedOrder";
+    public static final String ATTR_RESULT = "EnforceOrder.result";
+    public static final String ATTR_DETAIL = "EnforceOrder.detail";
+    private static final Function<String, String> STATE_TARGET_ORDER = groupId 
-> groupId + ".target";
+    private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
+    private static final Function<String, String> STATE_UPDATED_AT = groupId 
-> groupId + STATE_SUFFIX_UPDATED_AT;
+    private static final Function<String, String> STATE_MAX_ORDER = groupId -> 
groupId + ".max";
+
+    public static final PropertyDescriptor GROUP_IDENTIFIER = new 
PropertyDescriptor.Builder()
+        .name("group-id")
+        .displayName("Group Identifier")
+        .description("EnforceOrder is capable of multiple ordering groups." +
+                " 'Group Identifier' is used to determine which group a 
FlowFile belongs to." +
+                " This property will be evaluated with each incoming 
FlowFile." +
+                " If evaluated result is empty, the FlowFile will be routed to 
failure.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("${filename}")
+        .build();
+
+    public static final PropertyDescriptor ORDER_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+        .name("order-attribute")
+        .displayName("Order Attribute")
+        .description("A name of FlowFile attribute whose value will be used to 
enforce order of FlowFiles within a group." +
+                " If a FlowFile does not have this attribute, or its value is 
not an integer, the FlowFile will be routed to failure.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final PropertyDescriptor INITIAL_ORDER = new 
PropertyDescriptor.Builder()
+        .name("initial-order")
+        .displayName("Initial Order")
+        .description("When the first FlowFile of a group arrives, initial 
target order will be computed and stored in the managed state." +
+                " After that, target order will start being tracked by 
EnforceOrder and stored in the state management store." +
+                " If Expression Language is used but evaluated result was not 
an integer, then the FlowFile will be routed to failure," +
+                " and initial order will be left unknown until consecutive 
FlowFiles provide a valid initial order.")
+        .required(true)
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("0")
+        .build();
+
+    public static final PropertyDescriptor MAX_ORDER = new 
PropertyDescriptor.Builder()
+        .name("maximum-order")
+        .displayName("Maximum Order")
+        .description("If specified, any FlowFiles that have larger order will 
be routed to failure." +
+                " This property is computed only once for a given group." +
+                " After a maximum order is computed, it will be persisted in 
the state management store and used for other FlowFiles belonging to the same 
group." +
+                " If Expression Language is used but evaluated result was not 
an integer, then the FlowFile will be routed to failure," +
+                " and maximum order will be left unknown until consecutive 
FlowFiles provide a valid maximum order.")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
+    public static final PropertyDescriptor WAIT_TIMEOUT = new 
PropertyDescriptor.Builder()
+        .name("wait-timeout")
+        .displayName("Wait Timeout")
+        .description("Indicates the duration after which waiting FlowFiles 
will be routed to the 'overtook' relationship.")
+        .required(true)
+        .defaultValue("10 min")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final PropertyDescriptor INACTIVE_TIMEOUT = new 
PropertyDescriptor.Builder()
+        .name("inactive-timeout")
+        .displayName("Inactive Timeout")
+        .description("Indicates the duration after which state for an inactive 
group will be cleared from managed state." +
+                " Group is determined as inactive if any new incoming FlowFile 
has not seen for a group for specified duration." +
+                " Inactive Timeout must be longer than Wait Timeout." +
+                " If a FlowFile arrives late after its group is already 
cleared, it will be treated as a brand new group," +
+                " but will never match the order since expected preceding 
FlowFiles are already gone." +
+                " The FlowFile will eventually timeout for waiting and routed 
to 'overtook'." +
+                " To avoid this, group states should be kept long enough, 
however, shorter duration would be helpful for reusing the same group 
identifier again.")
+        .required(true)
+        .defaultValue("30 min")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final PropertyDescriptor BATCH_COUNT = new 
PropertyDescriptor.Builder()
+        .name("batch-count")
+        .displayName("Batch Count")
+        .description("The maximum number of FlowFiles that EnforceOrder can 
process at an execution.")
+        .required(true)
+        .defaultValue("1000")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("A FlowFile with a matching order number will be routed 
to this relationship.")
+        .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("A FlowFiles which does not have required attributes, or 
fails to compute those will be routed to this relationship")
+        .build();
+
+    public static final Relationship REL_WAIT = new Relationship.Builder()
+        .name("wait")
+        .description("A FlowFile with non matching order will be routed to 
this relationship")
+        .build();
+
+    public static final Relationship REL_OVERTOOK = new Relationship.Builder()
+        .name("overtook")
+        .description("A FlowFile that waited for preceding FlowFiles longer 
than Wait Timeout and overtook those FlowFiles, will be routed to this 
relationship.")
+        .build();
+
+    public static final Relationship REL_SKIPPED = new Relationship.Builder()
+        .name("skipped")
+        .description("A FlowFile that has an order younger than current, which 
means arrived too late and skipped, will be routed to this relationship.")
+        .build();
+
+    private final Set<Relationship> relationships;
+
+    public EnforceOrder() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_WAIT);
+        rels.add(REL_OVERTOOK);
+        rels.add(REL_FAILURE);
+        rels.add(REL_SKIPPED);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(GROUP_IDENTIFIER);
+        descriptors.add(ORDER_ATTRIBUTE);
+        descriptors.add(INITIAL_ORDER);
+        descriptors.add(MAX_ORDER);
+        descriptors.add(BATCH_COUNT);
+        descriptors.add(WAIT_TIMEOUT);
+        descriptors.add(INACTIVE_TIMEOUT);
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final Long waitTimeoutMillis = 
validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+        final Long inactiveTimeoutMillis = 
validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+
+        if (waitTimeoutMillis >= inactiveTimeoutMillis) {
+            results.add(new 
ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue())
+                    .subject(INACTIVE_TIMEOUT.getDisplayName())
+                    .explanation(String.format("%s should be longer than %s",
+                            INACTIVE_TIMEOUT.getDisplayName(), 
WAIT_TIMEOUT.getDisplayName()))
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+
+
+        final ComponentLog logger = getLogger();
+        final Integer batchCount = 
context.getProperty(BATCH_COUNT).asInteger();
+
+        List<FlowFile> flowFiles = session.get(batchCount);
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            return;
+        }
+
+        final StateMap stateMap;
+        try {
+            stateMap = context.getStateManager().getState(Scope.LOCAL);
+        } catch (final IOException e) {
+            logger.error("Failed to retrieve state from StateManager due to 
{}" + e, e);
+            context.yield();
+            return;
+        }
+
+        final OrderingContext oc = new OrderingContext(context, session);
+
+        oc.groupStates.putAll(stateMap.toMap());
+
+        for (FlowFile flowFile : flowFiles) {
+            oc.setFlowFile(flowFile);
+            if (oc.flowFile == null) {
+                break;
+            }
+
+            if (!oc.computeGroupId()
+                    || !oc.computeOrder()
+                    || !oc.computeInitialOrder()
+                    || !oc.computeMaxOrder()) {
+                continue;
+            }
+
+            // At this point, the flow file is confirmed to be valid.
+            oc.markFlowFileValid();
+        }
+
+        oc.transferFlowFiles();
+
+        oc.cleanupInactiveStates();
+
+        try {
+            context.getStateManager().setState(oc.groupStates, Scope.LOCAL);
+        } catch (final IOException e) {
+            throw new RuntimeException("Failed to update state due to " + e
+                    + ". Session will be rollback and processor will be 
yielded for a while.", e);
+        }
+
+    }
+
+    private class OrderingContext {
+
+        private final ComponentLog logger = getLogger();
+        private final ProcessSession processSession;
+        private final ProcessContext processContext;
+
+        // Following properties are static global setting for all groups.
+        private final String orderAttribute;
+        private final Long waitTimeoutMillis;
+        private final Function<FlowFile, Integer> getOrder;
+
+        private final Map<String, String> groupStates = new HashMap<>();
+        private final long now = System.currentTimeMillis();
+
+        // Following properties are computed per flow file.
+        private final PropertyValue groupIdentifierProperty ;
+
+        // Followings are per group objects.
+        private final PropertyValue initOrderProperty;
+        private final PropertyValue maxOrderProperty;
+        private final Map<String, List<FlowFile>> flowFileGroups = new 
TreeMap<>();
+
+        // Current variables within incoming FlowFiles loop.
+        private FlowFile flowFile;
+        private String groupId;
+        private Integer order;
+
+        private OrderingContext(final ProcessContext processContext, final 
ProcessSession processSession) {
+            this.processContext = processContext;
+            this.processSession = processSession;
+
+            orderAttribute = 
processContext.getProperty(ORDER_ATTRIBUTE).getValue();
+            waitTimeoutMillis = 
processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+            getOrder = flowFile -> 
Integer.parseInt(flowFile.getAttribute(orderAttribute));
+
+
+            groupIdentifierProperty = 
processContext.getProperty(GROUP_IDENTIFIER);
+
+            initOrderProperty = processContext.getProperty(INITIAL_ORDER);
+            maxOrderProperty = processContext.getProperty(MAX_ORDER);
+        }
+
+        private void setFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+            this.groupId = null;
+            this.order = null;
+        }
+
+        private boolean computeGroupId() {
+            groupId = 
groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue();
+            if (isBlank(groupId)) {
+                transferToFailure(flowFile, "Failed to get Group Identifier.");
+                return false;
+            }
+            return true;
+        }
+
+        private boolean computeOrder() {
+            try {
+                order = getOrder.apply(flowFile);
+            } catch (final NumberFormatException e) {
+                transferToFailure(flowFile, "Failed to parse order attribute 
due to " + e, e);
+                return false;
+            }
+            return true;
+        }
+
+        private boolean computeMaxOrder() {
+            if (maxOrderProperty.isSet()) {
+                // Compute maxOrder for this group if it's not there yet.
+                final String maxOrderStr = 
groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId),
+                        k -> 
maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue());
+                if (isBlank(maxOrderStr)) {
+                    transferToFailure(flowFile, String.format("%s was 
specified but result was empty.", MAX_ORDER.getDisplayName()));
+                    return false;
+                }
+
+                final Integer maxOrder;
+                try {
+                    maxOrder = Integer.parseInt(maxOrderStr);
+                } catch (final NumberFormatException e) {
+                    final String msg = String.format("Failed to get Maximum 
Order for group [%s] due to %s", groupId, e);
+                    transferToFailure(flowFile, msg, e);
+                    return false;
+                }
+
+                // Check max order.
+                if (order > maxOrder) {
+                    final String msg = String.format("Order (%d) is greater 
than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId);
+                    transferToFailure(flowFile, msg);
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        private boolean computeInitialOrder() {
+            // Compute initial order. Use asInteger() to check if it's a valid 
integer.
+            final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
+            try {
+                final AtomicReference<String> computedInitOrder = new 
AtomicReference<>();
+                groupStates.computeIfAbsent(stateKeyOrder, k -> {
+                    final String initOrderStr = 
initOrderProperty.evaluateAttributeExpressions(flowFile).getValue();
+                    // Parse it to check if it is a valid integer.
+                    Integer.parseInt(initOrderStr);
+                    computedInitOrder.set(initOrderStr);
+                    return initOrderStr;
+                });
+                // If these map modification is in the computeIfAbsent 
function, it causes this issue.
+                // JDK-8071667 : HashMap.computeIfAbsent() adds entry that 
HashMap.get() does not find.
+                // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667
+                if (!isBlank(computedInitOrder.get())) {
+                    groupStates.put(STATE_UPDATED_AT.apply(groupId), 
String.valueOf(now));
+                }
+
+            } catch (final NumberFormatException e) {
+                final String msg = String.format("Failed to get Initial Order 
for Group [%s] due to %s", groupId, e);
+                transferToFailure(flowFile, msg, e);
+                return false;
+            }
+            return true;
+        }
+
+        private void markFlowFileValid() {
+            final List<FlowFile> groupedFlowFiles = 
flowFileGroups.computeIfAbsent(groupId, k -> new ArrayList<>());
+
+            final FlowFile validFlowFile;
+            if (isBlank(flowFile.getAttribute(ATTR_STARTED_AT))) {
+                validFlowFile = processSession.putAttribute(flowFile, 
ATTR_STARTED_AT, String.valueOf(now));
+            } else {
+                validFlowFile = flowFile;
+            }
+
+            groupedFlowFiles.add(validFlowFile);
+        }
+
+        private void transferFlowFiles() {
+            flowFileGroups.entrySet().stream().filter(entry -> 
!entry.getValue().isEmpty()).map(entry -> {
+                // Sort flow files within each group.
+                final List<FlowFile> groupedFlowFiles = entry.getValue();
+                groupedFlowFiles.sort(Comparator.comparing(getOrder));
+                return entry;
+            }).forEach(entry -> {
+                // Check current state.
+                final String groupId = entry.getKey();
+                final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
+                final int previousTargetOrder = 
Integer.parseInt(groupStates.get(stateKeyOrder));
+                final AtomicInteger targetOrder = new 
AtomicInteger(previousTargetOrder);
+                final List<FlowFile> groupedFlowFiles = entry.getValue();
+                final String maxOrderStr = 
groupStates.get(STATE_MAX_ORDER.apply(groupId));
+
+                groupedFlowFiles.forEach(f -> {
+                    final Integer order = getOrder.apply(f);
+                    final boolean isMaxOrder = !isBlank(maxOrderStr) && 
order.equals(Integer.parseInt(maxOrderStr));
+
+                    if (order == targetOrder.get()) {
+                        transferResult(f, REL_SUCCESS, null, null);
+                        if (!isMaxOrder) {
+                            // If max order is specified and this FlowFile has 
the max order, don't increment target anymore.
+                            targetOrder.incrementAndGet();
+                        }
+
+                    } else if (order > targetOrder.get()) {
+
+                        if (now - 
Long.parseLong(f.getAttribute(ATTR_STARTED_AT)) > waitTimeoutMillis) {
+                            transferResult(f, REL_OVERTOOK, null, 
targetOrder.get());
+                            targetOrder.set(isMaxOrder ? order : order + 1);
+                        } else {
+                            transferResult(f, REL_WAIT, null, 
targetOrder.get());
+                        }
+
+                    } else {
+                        final String msg = String.format("Skipped, FlowFile 
order was %d but current target is %d", order, targetOrder.get());
+                        logger.warn(msg + ". {}", new Object[]{f});
+                        transferResult(f, REL_SKIPPED, msg, targetOrder.get());
+                    }
+
+                });
+
+                if (previousTargetOrder != targetOrder.get()) {
+                    groupStates.put(stateKeyOrder, 
String.valueOf(targetOrder.get()));
+                    groupStates.put(STATE_UPDATED_AT.apply(groupId), 
String.valueOf(now));
+                }
+            });
+        }
+
+        private void transferResult(final FlowFile flowFile, final 
Relationship result, final String detail, final Integer expectedOrder) {
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_RESULT, result.getName());
+            if (expectedOrder != null) {
+                attributes.put(ATTR_EXPECTED_ORDER, expectedOrder.toString());
+            }
+            if (!isBlank(detail)) {
+                attributes.put(ATTR_DETAIL, detail);
+            }
+
+            FlowFile resultFlowFile = 
processSession.putAllAttributes(flowFile, attributes);
+            // Remove
+            if (expectedOrder == null) {
+                resultFlowFile = 
processSession.removeAttribute(resultFlowFile, ATTR_EXPECTED_ORDER);
+            }
+            if (detail == null) {
+                resultFlowFile = 
processSession.removeAttribute(resultFlowFile, ATTR_DETAIL);
+            }
+            processSession.transfer(resultFlowFile, result);
+        }
+
+        private void transferToFailure(final FlowFile flowFile, final String 
message) {
+            transferToFailure(flowFile, message, null);
+        }
+
+        private void transferToFailure(final FlowFile flowFile, final String 
message, final Throwable cause) {
+            if (cause != null) {
+                getLogger().warn(message + " {}", new Object[]{flowFile}, 
cause);
+            } else {
+                getLogger().warn(message + " {}", new Object[]{flowFile});
+            }
+            transferResult(flowFile, REL_FAILURE, message, null);
+        }
+
+        private void cleanupInactiveStates() {
+            final Long inactiveTimeout = 
processContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+            final List<String> inactiveGroups = groupStates.keySet().stream()
+                    .filter(k -> k.endsWith(STATE_SUFFIX_UPDATED_AT) && (now - 
Long.parseLong(groupStates.get(k)) > inactiveTimeout))
+                    .map(k -> k.substring(0, k.length() - 
STATE_SUFFIX_UPDATED_AT.length()))
+                    .collect(Collectors.toList());
+            inactiveGroups.forEach(groupId -> {
+                groupStates.remove(STATE_TARGET_ORDER.apply(groupId));
+                groupStates.remove(STATE_UPDATED_AT.apply(groupId));
+                groupStates.remove(STATE_MAX_ORDER.apply(groupId));
+            });
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index cfcc85a..9de5ab6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -23,6 +23,7 @@ org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DistributeLoad
 org.apache.nifi.processors.standard.DuplicateFlowFile
 org.apache.nifi.processors.standard.EncryptContent
+org.apache.nifi.processors.standard.EnforceOrder
 org.apache.nifi.processors.standard.EvaluateJsonPath
 org.apache.nifi.processors.standard.EvaluateXPath
 org.apache.nifi.processors.standard.EvaluateXQuery

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java
new file mode 100644
index 0000000..0a179a3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestEnforceOrder {
+
+    @Test
+    public void testDefaultPropertyValidation() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        // Default values should not be valid.
+        runner.assertNotValid();
+
+        // Set required properties.
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCustomPropertyValidation() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        // Set required properties.
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.assertValid();
+
+        // Inactive Timeout should be longer than Wait Timeout
+        runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "30 sec");
+        runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "29 sec");
+        runner.assertNotValid();
+
+        // Inactive Timeout should be longer than Wait Timeout
+        runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "30 sec");
+        runner.assertNotValid();
+
+        // Inactive Timeout should be longer than Wait Timeout
+        runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "31 sec");
+        runner.assertValid();
+    }
+
+
+    private static class Ordered {
+        private final Map<String, String> map = new HashMap<>();
+        private Ordered(final int index) {
+            map.put("index", String.valueOf(index));
+        }
+
+        private static Ordered i(final int index) {
+            return new Ordered(index);
+        }
+
+        private static Ordered i(final String group, final int index) {
+            return new Ordered(index).put("group", group);
+        }
+
+        private Ordered put(final String key, final String value) {
+            map.put(key, value);
+            return this;
+        }
+
+        private Map<String, String> map() {
+            return map;
+        }
+
+        private static MockFlowFile enqueue(final TestRunner runner, final 
String group, final int index) {
+            return runner.enqueue(group + "." + index, i(group, index).map());
+        }
+    }
+
+    @Test
+    public void testSort() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 0);
+        Ordered.enqueue(runner, "a", 1);
+        Ordered.enqueue(runner, "a", 0);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 3);
+
+        final List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        succeeded.get(0).assertContentEquals("a.0");
+        succeeded.get(1).assertContentEquals("a.1");
+        succeeded.get(2).assertContentEquals("b.0");
+    }
+
+    @Test
+    public void testDuplicatedOrder() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 1);
+        Ordered.enqueue(runner, "a", 2);
+        Ordered.enqueue(runner, "a", 1);
+        Ordered.enqueue(runner, "a", 2);
+        Ordered.enqueue(runner, "a", 3);
+
+        runner.run();
+
+        final List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        assertEquals(4, succeeded.size());
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        succeeded.get(0).assertContentEquals("a.1");
+        succeeded.get(1).assertContentEquals("a.2");
+        succeeded.get(2).assertContentEquals("a.3");
+        succeeded.get(3).assertContentEquals("b.1");
+
+        // It's not possible to distinguish skipped and duplicated, since we 
only tracks target order number.
+        final List<MockFlowFile> skipped = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED);
+        assertEquals(1, skipped.size());
+        skipped.get(0).assertContentEquals("a.2");
+        skipped.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, 
"3");
+    }
+
+    @Test
+    public void testNoGroupIdentifier() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 1);
+        Ordered.enqueue(runner, "a", 2);
+        runner.enqueue("no group id", Ordered.i(1).map()); // without group 
attribute
+        Ordered.enqueue(runner, "a", 1);
+
+        runner.run();
+
+        final List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        assertEquals(3, succeeded.size());
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        succeeded.get(0).assertContentEquals("a.1");
+        succeeded.get(1).assertContentEquals("a.2");
+        succeeded.get(2).assertContentEquals("b.1");
+
+        final List<MockFlowFile> failed = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
+        assertEquals(1, failed.size());
+        failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+    }
+
+    @Test
+    public void testIllegalOrderValue() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 1);
+        Ordered.enqueue(runner, "a", 2);
+        runner.enqueue("illegal order", Ordered.i("a", 1).put("index", 
"non-integer").map());
+        Ordered.enqueue(runner, "a", 1);
+
+        runner.run();
+
+        final List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        assertEquals(3, succeeded.size());
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        succeeded.get(0).assertContentEquals("a.1");
+        succeeded.get(1).assertContentEquals("a.2");
+        succeeded.get(2).assertContentEquals("b.1");
+
+        final List<MockFlowFile> failed = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
+        assertEquals(1, failed.size());
+        failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+        failed.get(0).assertContentEquals("illegal order");
+    }
+
+    @Test
+    public void testInitialOrderValue() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "${index.start}");
+        runner.setProperty(EnforceOrder.MAX_ORDER, "${index.max}");
+        runner.assertValid();
+        runner.enqueue("b.0", Ordered.i("b", 0).put("index.start", 
"0").put("index.max", "99").map());
+        runner.enqueue("a.100", Ordered.i("a", 100).put("index.start", 
"100").put("index.max", "103").map());
+        runner.enqueue("a.101", Ordered.i("a", 101).put("index.start", 
"100").put("index.max", "103").map());
+        runner.enqueue("illegal initial order", Ordered.i("c", 
1).put("index.start", "non-integer").map());
+        runner.enqueue("without initial order", Ordered.i("d", 1).map());
+        // Even if this flow file doesn't have initial order attribute, this 
will be routed to success.
+        // Because target order for group b is already computed from b.0.
+        Ordered.enqueue(runner, "b", 1);
+
+        runner.run();
+
+        List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        assertEquals(4, succeeded.size());
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        succeeded.get(0).assertContentEquals("a.100");
+        succeeded.get(1).assertContentEquals("a.101");
+        succeeded.get(2).assertContentEquals("b.0");
+        succeeded.get(3).assertContentEquals("b.1");
+
+        final List<MockFlowFile> failed = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
+        assertEquals(2, failed.size());
+        failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+        failed.get(0).assertContentEquals("illegal initial order");
+        failed.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+        failed.get(1).assertContentEquals("without initial order");
+
+        final MockStateManager stateManager = runner.getStateManager();
+        stateManager.assertStateEquals("a.target", "102", Scope.LOCAL);
+        stateManager.assertStateEquals("a.max", "103", Scope.LOCAL);
+        stateManager.assertStateEquals("b.target", "2", Scope.LOCAL);
+        stateManager.assertStateEquals("b.max", "99", Scope.LOCAL);
+
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testMaxOrder() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, 
"${fragment.identifier}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.setProperty(EnforceOrder.MAX_ORDER, "${fragment.count}");
+        runner.assertValid();
+        runner.enqueue("b.1", Ordered.i(1).put("fragment.identifier", 
"b").put("fragment.count", "3").map());
+        runner.enqueue("a.2", Ordered.i(2).put("fragment.identifier", 
"a").put("fragment.count", "2").map());
+        runner.enqueue("without max order", 
Ordered.i(1).put("fragment.identifier", "c").map());
+        runner.enqueue("illegal max order", 
Ordered.i(1).put("fragment.identifier", "d").put("fragment.count", "X").map());
+        runner.enqueue("a.1", Ordered.i(1).put("fragment.identifier", 
"a").put("fragment.count", "2").map());
+        runner.enqueue("a.3", Ordered.i(3).put("fragment.identifier", 
"a").put("fragment.count", "2").map()); // Exceed max
+
+        runner.run();
+
+        final List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        succeeded.sort(new FirstInFirstOutPrioritizer());
+        assertEquals(3, succeeded.size());
+        succeeded.get(0).assertContentEquals("a.1");
+        succeeded.get(1).assertContentEquals("a.2");
+        succeeded.get(2).assertContentEquals("b.1");
+
+        final List<MockFlowFile> failed = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE);
+        assertEquals(3, failed.size());
+        failed.get(0).assertContentEquals("without max order");
+        failed.get(1).assertContentEquals("illegal max order");
+        failed.get(2).assertContentEquals("a.3"); // exceeds max order
+
+        final MockStateManager stateManager = runner.getStateManager();
+        stateManager.assertStateEquals("a.target", "2", Scope.LOCAL);
+        stateManager.assertStateEquals("a.max", "2", Scope.LOCAL);
+    }
+
+    @Test
+    public void testWaitOvertakeSkip() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.setProperty(EnforceOrder.MAX_ORDER, "10");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 1);
+        Ordered.enqueue(runner, "a", 2);
+        Ordered.enqueue(runner, "a", 1);
+        Ordered.enqueue(runner, "a", 5); // waits for a.3 and a.4
+        Ordered.enqueue(runner, "b", 3); // waits for b.2
+        Ordered.enqueue(runner, "c", 9); // waits for c.1 to 8
+        Ordered.enqueue(runner, "d", 10); // waits for d.1 to 9
+
+        runner.run();
+
+        List<MockFlowFile> succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        assertEquals(3, succeeded.size());
+        final FirstInFirstOutPrioritizer fifo = new 
FirstInFirstOutPrioritizer();
+        succeeded.sort(fifo);
+        succeeded.get(0).assertContentEquals("a.1");
+        succeeded.get(1).assertContentEquals("a.2");
+        succeeded.get(2).assertContentEquals("b.1");
+
+        List<MockFlowFile> waiting = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
+        assertEquals(4, waiting.size());
+        waiting.get(0).assertContentEquals("a.5");
+        waiting.get(1).assertContentEquals("b.3");
+        waiting.get(2).assertContentEquals("c.9");
+        waiting.get(3).assertContentEquals("d.10");
+        waiting.get(0).assertAttributeExists("EnforceOrder.startedAt");
+        waiting.get(1).assertAttributeExists("EnforceOrder.startedAt");
+        waiting.get(2).assertAttributeExists("EnforceOrder.startedAt");
+        waiting.get(3).assertAttributeExists("EnforceOrder.startedAt");
+
+        final MockStateManager stateManager = runner.getStateManager();
+        stateManager.assertStateEquals("a.target", "3", Scope.LOCAL);
+        stateManager.assertStateEquals("b.target", "2", Scope.LOCAL);
+        stateManager.assertStateEquals("c.target", "1", Scope.LOCAL);
+        stateManager.assertStateEquals("d.target", "1", Scope.LOCAL);
+        stateManager.assertStateSet("a.updatedAt", Scope.LOCAL);
+        stateManager.assertStateSet("b.updatedAt", Scope.LOCAL);
+        stateManager.assertStateSet("c.updatedAt", Scope.LOCAL);
+        stateManager.assertStateSet("d.updatedAt", Scope.LOCAL);
+
+        // Run it again with waiting files.
+        runner.clearTransferState();
+        waiting.forEach(runner::enqueue);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 4);
+        waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
+
+        // Run it again with shorter wait timeout to make overtaking happen.
+        runner.clearTransferState();
+        runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "10 ms");
+        Thread.sleep(20);
+        waiting.forEach(runner::enqueue);
+        Ordered.enqueue(runner, "b", 2); // arrived in time
+        Ordered.enqueue(runner, "a", 6); // a.4 and a.5 have not arrived yet
+        runner.run();
+
+        succeeded = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS);
+        succeeded.sort(fifo);
+        assertEquals(3, succeeded.size());
+        succeeded.get(0).assertContentEquals("a.6"); // This is ok because a.5 
was there.
+        succeeded.get(1).assertContentEquals("b.2");
+        succeeded.get(2).assertContentEquals("b.3");
+
+        List<MockFlowFile> overtook = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_OVERTOOK);
+        assertEquals(3, overtook.size());
+        overtook.get(0).assertContentEquals("a.5"); // overtook a.3.
+        
overtook.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "3");
+        overtook.get(1).assertContentEquals("c.9"); // overtook c.1 - 8.
+        
overtook.get(1).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
+        overtook.get(2).assertContentEquals("d.10"); // overtook d.1 - 9.
+        
overtook.get(2).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
+
+        stateManager.assertStateEquals("a.target", "7", Scope.LOCAL);
+        stateManager.assertStateEquals("b.target", "4", Scope.LOCAL);
+        stateManager.assertStateEquals("c.target", "10", Scope.LOCAL); // it 
was c.9, so +1
+        stateManager.assertStateEquals("d.target", "10", Scope.LOCAL); // it 
was d.10 (max) so don't +1
+
+        // Simulate a.3 and a.4 arrive but too late..
+        runner.clearTransferState();
+        Ordered.enqueue(runner, "a", 3);
+        Ordered.enqueue(runner, "a", 4);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SKIPPED, 2);
+        final List<MockFlowFile> skipped = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED);
+        skipped.get(0).assertContentEquals("a.3");
+        skipped.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+        skipped.get(1).assertContentEquals("a.4");
+        skipped.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL);
+
+    }
+
+    @Test
+    public void testCleanInactiveGroups() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.assertValid();
+        Ordered.enqueue(runner, "b", 1);
+        Ordered.enqueue(runner, "a", 2);
+        Ordered.enqueue(runner, "c", 1);
+        Ordered.enqueue(runner, "a", 1);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 4);
+
+        // Run it again with shorter inactive timeout
+        runner.clearTransferState();
+        runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "5 ms");
+        runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "10 ms");
+
+        Thread.sleep(15);
+
+        // No group b.
+        Ordered.enqueue(runner, "a", 3);
+        Ordered.enqueue(runner, "c", 2);
+
+        runner.run();
+
+        // Group b was determined as inactive, thus its states should be 
removed.
+        final MockStateManager stateManager = runner.getStateManager();
+        stateManager.assertStateEquals("a.target", "4", Scope.LOCAL);
+        stateManager.assertStateNotSet("b.target", Scope.LOCAL);
+        stateManager.assertStateEquals("c.target", "3", Scope.LOCAL);
+        stateManager.assertStateSet("a.updatedAt", Scope.LOCAL);
+        stateManager.assertStateNotSet("b.updatedAt", Scope.LOCAL);
+        stateManager.assertStateSet("c.updatedAt", Scope.LOCAL);
+
+        // If b comes again, it'll be treated as brand new group.
+        runner.clearTransferState();
+        Ordered.enqueue(runner, "b", 2);
+
+        runner.run();
+        stateManager.assertStateEquals("b.target", "1", Scope.LOCAL);
+        stateManager.assertStateSet("b.updatedAt", Scope.LOCAL);
+
+        // b.2 should be routed to wait, since there's no b.1. It will 
eventually overtake.
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 1);
+        final List<MockFlowFile> waiting = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT);
+        waiting.get(0).assertContentEquals("b.2");
+
+    }
+
+    @Test
+    public void testClearOldProperties() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(EnforceOrder.class);
+
+        runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}");
+        runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index");
+        runner.setProperty(EnforceOrder.INITIAL_ORDER, "1");
+        runner.assertValid();
+        Ordered.enqueue(runner, "a", 2);
+        Ordered.enqueue(runner, "b", 1);
+
+        runner.run();
+
+        runner.assertTransferCount(EnforceOrder.REL_WAIT, 1);
+        MockFlowFile a2 = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT).get(0);
+        a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "wait");
+        a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
+        a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
+        a2.assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1");
+        a2.assertContentEquals("a.2");
+
+        runner.assertTransferCount(EnforceOrder.REL_SUCCESS, 1);
+        MockFlowFile b1 = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0);
+        b1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
+        b1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
+        b1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
+        b1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER);
+        b1.assertContentEquals("b.1");
+
+        runner.clearTransferState();
+
+        Ordered.enqueue(runner, "a", 1);
+        runner.enqueue(a2);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 2);
+        MockFlowFile a1 = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0);
+        a1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
+        a1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
+        a1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
+        a1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER);
+        a1.assertContentEquals("a.1");
+
+        a2 = 
runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(1);
+        a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success");
+        a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT);
+        a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL);
+        a2.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER); // 
Should be cleared.
+        a2.assertContentEquals("a.2");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/9583ca99/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e2b3cd..91aae37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -972,6 +972,11 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-standard-prioritizers</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-jetty-bundle</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to