[
https://issues.apache.org/jira/browse/NIFI-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962900#comment-15962900
]
ASF GitHub Bot commented on NIFI-3414:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1496#discussion_r110662972
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
---
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@EventDriven
+@Tags({"sort", "order"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@TriggerSerially
+@CapabilityDescription("Enforces expected ordering of FlowFiles those
belong to the same data group. " +
+ " Although PriorityAttributePrioritizer can be used on a
connection to ensure that flow files going through that connection are in
priority order," +
+ " depending on error-handling, branching, and other flow designs,
it is possible for FlowFiles to get out-of-order." +
+ " EnforceOrder can be used to enforce original ordering for those
FlowFiles." +
+ " [IMPORTANT] In order to take effect of EnforceOrder,
FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" +
+ " UNTIL the order of FlowFiles physically get FIXED by operation
such as MergeContent or being stored to the final destination.")
+@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following
states per ordering group:" +
+ " '<groupId>.target' is a order number which is being waited to
arrive next." +
+ " When a FlowFile with a matching order arrives, or a FlowFile
overtakes the FlowFile being waited for because of wait timeout," +
+ " target order will be updated to (FlowFile.order + 1)." +
+ " '<groupId>.max is the maximum order number for a group." +
+ " '<groupId>.updatedAt' is a timestamp when the order of a group
was updated last time." +
+ " These managed states will be removed automatically once a group
is determined as inactive, see 'Inactive Timeout' for detail.")
+@WritesAttributes({
+ @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT,
+ description = "All FlowFiles going through this processor will
have this attribute. This value is used to determine wait timeout."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT,
+ description = "All FlowFiles going through this processor will
have this attribute denoting which relationship it was routed to."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL,
+ description = "FlowFiles routed to 'failure' or 'skipped'
relationship will have this attribute describing details."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER,
+ description = "FlowFiles routed to 'wait' or 'skipped'
relationship will have this attribute denoting expected order when the FlowFile
was processed.")
+})
+public class EnforceOrder extends AbstractProcessor {
+
+ public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
+ public static final String ATTR_EXPECTED_ORDER =
"EnforceOrder.expectedOrder";
+ public static final String ATTR_RESULT = "EnforceOrder.result";
+ public static final String ATTR_DETAIL = "EnforceOrder.detail";
+ private static final Function<String, String> STATE_TARGET_ORDER =
groupId -> groupId + ".target";
+ private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
+ private static final Function<String, String> STATE_UPDATED_AT =
groupId -> groupId + STATE_SUFFIX_UPDATED_AT;
+ private static final Function<String, String> STATE_MAX_ORDER =
groupId -> groupId + ".max";
+
+ public static final PropertyDescriptor GROUP_IDENTIFIER = new
PropertyDescriptor.Builder()
+ .name("group-id")
+ .displayName("Group Identifier")
+ .description("EnforceOrder is capable of multiple ordering
groups." +
+ " 'Group Identifier' is used to determine which group a
FlowFile belongs to." +
+ " This property will be evaluated with each incoming
FlowFile." +
+ " If evaluated result is empty, the FlowFile will be
routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("${filename}")
+ .build();
+
+ public static final PropertyDescriptor ORDER_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("order-attribute")
+ .displayName("Order Attribute")
+ .description("A name of FlowFile attribute whose value will be
used to enforce order of FlowFiles within a group." +
+ " If a FlowFile does not have this attribute, or its value
is not an integer, the FlowFile will be routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_ORDER = new
PropertyDescriptor.Builder()
+ .name("initial-order")
+ .displayName("Initial Order")
+ .description("When the first FlowFile of a group arrives, initial
target order will be computed and stored in the managed state." +
+ " After that, target order will start being tracked by
EnforceOrder and stored in the state management store." +
+ " If Expression Language is used but evaluated result was
not an integer, then the FlowFile will be routed to failure," +
+ " and initial order will be left unknown until consecutive
FlowFiles provide a valid initial order.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("0")
+ .build();
+
+ public static final PropertyDescriptor MAX_ORDER = new
PropertyDescriptor.Builder()
+ .name("maximum-order")
+ .displayName("Maximum Order")
+ .description("If specified, any FlowFiles that have larger order
will be routed to failure." +
+ " This property is computed only once for a given group." +
+ " After a maximum order is computed, it will be persisted
in the state management store and used for other FlowFiles belonging to the
same group." +
+ " If Expression Language is used but evaluated result was
not an integer, then the FlowFile will be routed to failure," +
+ " and maximum order will be left unknown until consecutive
FlowFiles provide a valid maximum order.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor WAIT_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("wait-timeout")
+ .displayName("Wait Timeout")
+ .description("Indicates the duration after which waiting FlowFiles
will be routed to the 'overtook' relationship.")
+ .required(true)
+ .defaultValue("10 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor INACTIVE_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("inactive-timeout")
+ .displayName("Inactive Timeout")
+ .description("Indicates the duration after which state for an
inactive group will be cleared from managed state." +
+ " Group is determined as inactive if any new incoming
FlowFile has not seen for a group for specified duration." +
+ " Inactive Timeout must be longer than Wait Timeout." +
+ " If a FlowFile arrives late after its group is already
cleared, it will be treated as a brand new group," +
+ " but will never match the order since expected preceding
FlowFiles are already gone." +
+ " The FlowFile will eventually timeout for waiting and
routed to 'overtook'." +
+ " To avoid this, group states should be kept long enough,
however, shorter duration would be helpful for reusing the same group
identifier again.")
+ .required(true)
+ .defaultValue("30 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
+ .name("batch-count")
+ .displayName("Batch Count")
+ .description("The maximum number of FlowFiles that EnforceOrder
can process at an execution.")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile with a matching order number will be
routed to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFiles which does not have required attributes,
or fails to compute those will be routed to this relationship")
+ .build();
+
+ public static final Relationship REL_WAIT = new Relationship.Builder()
+ .name("wait")
+ .description("A FlowFile with non matching order will be routed to
this relationship")
+ .build();
+
+ public static final Relationship REL_OVERTOOK = new
Relationship.Builder()
+ .name("overtook")
+ .description("A FlowFile that waited for preceding FlowFiles
longer than Wait Timeout and overtook those FlowFiles, will be routed to this
relationship.")
+ .build();
+
+ public static final Relationship REL_SKIPPED = new
Relationship.Builder()
+ .name("skipped")
+ .description("A FlowFile that has an order younger than current,
which means arrived too late and skipped, will be routed to this relationship.")
+ .build();
+
+ private final Set<Relationship> relationships;
+
+ public EnforceOrder() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_WAIT);
+ rels.add(REL_OVERTOOK);
+ rels.add(REL_FAILURE);
+ rels.add(REL_SKIPPED);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(GROUP_IDENTIFIER);
+ descriptors.add(ORDER_ATTRIBUTE);
+ descriptors.add(INITIAL_ORDER);
+ descriptors.add(MAX_ORDER);
+ descriptors.add(BATCH_COUNT);
+ descriptors.add(WAIT_TIMEOUT);
+ descriptors.add(INACTIVE_TIMEOUT);
+ return descriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final Long waitTimeoutMillis =
validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+ final Long inactiveTimeoutMillis =
validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+
+ if (waitTimeoutMillis >= inactiveTimeoutMillis) {
+ results.add(new
ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue())
+ .subject(INACTIVE_TIMEOUT.getDisplayName())
+ .explanation(String.format("%s should be longer than
%s",
+ INACTIVE_TIMEOUT.getDisplayName(),
WAIT_TIMEOUT.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+
+ final ComponentLog logger = getLogger();
+ final Integer batchCount =
context.getProperty(BATCH_COUNT).asInteger();
+
+ List<FlowFile> flowFiles = session.get(batchCount);
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ return;
+ }
+
+ final StateMap stateMap;
+ try {
+ stateMap = context.getStateManager().getState(Scope.LOCAL);
+ } catch (final IOException e) {
+ logger.error("Failed to retrieve state from StateManager due
to {}" + e, e);
+ context.yield();
+ return;
+ }
+
+ final OrderingContext oc = new OrderingContext(context, session);
+
+ oc.groupStates.putAll(stateMap.toMap());
+
+ for (FlowFile flowFile : flowFiles) {
+ oc.setFlowFile(flowFile);
+ if (oc.flowFile == null) {
+ break;
+ }
+
+ if (!oc.computeGroupId()
+ || !oc.computeOrder()
+ || !oc.computeInitialOrder()
+ || !oc.computeMaxOrder()) {
+ continue;
+ }
+
+ // At this point, the flow file is confirmed to be valid.
+ oc.markFlowFileValid();
+ }
+
+ oc.transferFlowFiles();
+
+ oc.cleanupInactiveStates();
+
+ try {
+ context.getStateManager().setState(oc.groupStates,
Scope.LOCAL);
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to update state due to " + e
+ + ". Session will be rollback and processor will be
yielded for a while.", e);
+ }
+
+ }
+
+ private class OrderingContext {
+
+ private final ComponentLog logger = getLogger();
+ private final ProcessSession processSession;
+ private final ProcessContext processContext;
+
+ // Following properties are static global setting for all groups.
+ private final String orderAttribute;
+ private final Long waitTimeoutMillis;
+ private final Function<FlowFile, Integer> getOrder;
+
+ private final Map<String, String> groupStates = new HashMap<>();
+ private final long now = System.currentTimeMillis();
+
+ // Following properties are computed per flow file.
+ private final PropertyValue groupIdentifierProperty ;
+
+ // Followings are per group objects.
+ private final PropertyValue initOrderProperty;
+ private final PropertyValue maxOrderProperty;
+ private final Map<String, List<FlowFile>> flowFileGroups = new
TreeMap<>();
+
+ // Current variables within incoming FlowFiles loop.
+ private FlowFile flowFile;
+ private String groupId;
+ private Integer order;
+
+ private OrderingContext(final ProcessContext processContext, final
ProcessSession processSession) {
+ this.processContext = processContext;
+ this.processSession = processSession;
+
+ orderAttribute =
processContext.getProperty(ORDER_ATTRIBUTE).getValue();
+ waitTimeoutMillis =
processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ getOrder = flowFile ->
Integer.parseInt(flowFile.getAttribute(orderAttribute));
+
+
+ groupIdentifierProperty =
processContext.getProperty(GROUP_IDENTIFIER);
+
+ initOrderProperty = processContext.getProperty(INITIAL_ORDER);
+ maxOrderProperty = processContext.getProperty(MAX_ORDER);
+ }
+
+ private void setFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ this.groupId = null;
+ this.order = null;
+ }
+
+ private boolean computeGroupId() {
+ groupId =
groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue();
+ if (isBlank(groupId)) {
+ transferToFailure(flowFile, "Failed to get Group
Identifier.");
+ return false;
+ }
+ return true;
+ }
+
+ private boolean computeOrder() {
+ try {
+ order = getOrder.apply(flowFile);
+ } catch (final NumberFormatException e) {
+ transferToFailure(flowFile, "Failed to parse order
attribute due to " + e, e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean computeMaxOrder() {
+ if (maxOrderProperty.isSet()) {
+ // Compute maxOrder for this group if it's not there yet.
+ final String maxOrderStr =
groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId),
+ k ->
maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue());
+ if (isBlank(maxOrderStr)) {
+ transferToFailure(flowFile, String.format("%s was
specified but result was empty.", MAX_ORDER.getDisplayName()));
+ return false;
+ }
+
+ final Integer maxOrder;
+ try {
+ maxOrder = Integer.parseInt(maxOrderStr);
+ } catch (final NumberFormatException e) {
+ final String msg = String.format("Failed to get
Maximum Order for group [%s] due to %s", groupId, e);
+ transferToFailure(flowFile, msg, e);
+ return false;
+ }
+
+ // Check max order.
+ if (order > maxOrder) {
+ final String msg = String.format("Order (%d) is
greater than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId);
+ transferToFailure(flowFile, msg);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean computeInitialOrder() {
+ // Compute initial order. Use asInteger() to check if it's a
valid integer.
+ final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
+ try {
+ final AtomicReference<String> computedInitOrder = new
AtomicReference<>();
+ groupStates.computeIfAbsent(stateKeyOrder, k -> {
+ final String initOrderStr =
initOrderProperty.evaluateAttributeExpressions(flowFile).getValue();
+ // Parse it to check if it is a valid integer.
+ Integer.parseInt(initOrderStr);
+ computedInitOrder.set(initOrderStr);
+ return initOrderStr;
+ });
+ // If these map modification is in the computeIfAbsent
function, it causes this issue.
+ // JDK-8071667 : HashMap.computeIfAbsent() adds entry that
HashMap.get() does not find.
+ //
http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667
+ if (!isBlank(computedInitOrder.get())) {
+ groupStates.put(STATE_UPDATED_AT.apply(groupId),
String.valueOf(now));
+ }
+
+ } catch (final NumberFormatException e) {
+ final String msg = String.format("Failed to get Initial
Order for Group [%s] due to %s", groupId, e);
+ transferToFailure(flowFile, msg, e);
+ return false;
+ }
+ return true;
+ }
+
+ private void markFlowFileValid() {
+ final List<FlowFile> groupedFlowFiles =
flowFileGroups.computeIfAbsent(groupId, k -> new ArrayList<>());
+
+ final FlowFile validFlowFile;
+ if (isBlank(flowFile.getAttribute(ATTR_STARTED_AT))) {
+ validFlowFile = processSession.putAttribute(flowFile,
ATTR_STARTED_AT, String.valueOf(now));
+ } else {
+ validFlowFile = flowFile;
+ }
+
+ groupedFlowFiles.add(validFlowFile);
+ }
+
+ private void transferFlowFiles() {
+ flowFileGroups.entrySet().stream().filter(entry ->
!entry.getValue().isEmpty()).map(entry -> {
+ // Sort flow files within each group.
+ final List<FlowFile> groupedFlowFiles = entry.getValue();
+ groupedFlowFiles.sort(Comparator.comparing(getOrder));
+ return entry;
+ }).forEach(entry -> {
+ // Check current state.
+ final String groupId = entry.getKey();
+ final String stateKeyOrder =
STATE_TARGET_ORDER.apply(groupId);
+ final int previousTargetOrder =
Integer.parseInt(groupStates.get(stateKeyOrder));
+ final AtomicInteger targetOrder = new
AtomicInteger(previousTargetOrder);
+ final List<FlowFile> groupedFlowFiles = entry.getValue();
+ final String maxOrderStr =
groupStates.get(STATE_MAX_ORDER.apply(groupId));
+
+ groupedFlowFiles.forEach(f -> {
+ final Integer order = getOrder.apply(f);
+ final boolean isMaxOrder = !isBlank(maxOrderStr) &&
order.equals(Integer.parseInt(maxOrderStr));
+
+ if (order == targetOrder.get()) {
+ transferResult(f, REL_SUCCESS, null, null);
+ if (!isMaxOrder) {
+ // If max order is specified and this FlowFile
has the max order, don't increment target anymore.
+ targetOrder.incrementAndGet();
+ }
+
+ } else if (order > targetOrder.get()) {
+
+ if (now -
Long.parseLong(f.getAttribute(ATTR_STARTED_AT)) > waitTimeoutMillis) {
+ transferResult(f, REL_OVERTOOK, null,
targetOrder.get());
+ targetOrder.set(isMaxOrder ? order : order +
1);
+ } else {
+ transferResult(f, REL_WAIT, null,
targetOrder.get());
+ }
+
+ } else {
+ final String msg = String.format("Skipped,
FlowFile order was %d but current target is %d", order, targetOrder.get());
+ logger.warn(msg + ". {}", new Object[]{f});
+ transferResult(f, REL_SKIPPED, msg,
targetOrder.get());
+ }
+
+ });
+
+ if (previousTargetOrder != targetOrder.get()) {
+ groupStates.put(stateKeyOrder,
String.valueOf(targetOrder.get()));
+ groupStates.put(STATE_UPDATED_AT.apply(groupId),
String.valueOf(now));
+ }
+ });
+ }
+
+ private void transferResult(final FlowFile flowFile, final
Relationship result, final String detail, final Integer expectedOrder) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_RESULT, result.getName());
+ if (expectedOrder != null) {
--- End diff --
Seems like when the flow file is to be transferred to SUCCESS then the
attribute will not be set, but it may have already been set for the flow file
if it had been transferred to WAIT. We should set the attribute to the actual
order, or even better delete it to save memory since it won't be a useful
attribute value anyway.
> Implement an EnforceOrder processor
> -----------------------------------
>
> Key: NIFI-3414
> URL: https://issues.apache.org/jira/browse/NIFI-3414
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Koji Kawamura
>
> For some flows, it is imperative that the flow files are processed in a
> certain order. The PriorityAttributePrioritizer can be used on a connection
> to ensure that flow files going through that connection are in priority
> order, but depending on error-handling, branching, and other flow designs, it
> is possible for flow files to get out-of-order.
> I propose an EnforceOrder processor, which would be single-threaded and have
> (at a minimum) the following properties:
> 1) Order Attribute: This would be the name of a flow file attribute from
> which the current value will be retrieved.
> 2) Initial Value: This property specifies an initial value for the order. The
> processor is stateful, however, so this property is only used when there is
> no entry in the state map for current value.
> The processor would store the Initial Value into the state map (if no state
> map entry exists), then for each incoming flow file, it checks the value in
> the Order Attribute against the current value. If the attribute value
> matches the current value, the flow file is transferred to the "success"
> relationship, and the current value is incremented in the state map. If the
> attribute value does not match the current value, the session will be rolled
> back.
> Using this processor, along with a PriorityAttributePrioritizer on the
> incoming connection, will allow for out-of-order flow files to have a sort of
> "barrier", thereby guaranteeing that flow files transferred to the "success"
> relationship are in the specified order.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)