[
https://issues.apache.org/jira/browse/NIFI-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959766#comment-15959766
]
ASF GitHub Bot commented on NIFI-3414:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1496#discussion_r110269816
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
---
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@EventDriven
+@Tags({"sort", "order"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@TriggerSerially
+@CapabilityDescription("Enforces expected ordering of FlowFiles those
belong to the same data group. " +
+ " Although PriorityAttributePrioritizer can be used on a
connection to ensure that flow files going through that connection are in
priority order," +
+ " depending on error-handling, branching, and other flow designs,
it is possible for FlowFiles to get out-of-order." +
+ " EnforceOrder can be used to enforce original ordering for those
FlowFiles." +
+ " [IMPORTANT] In order to take effect of EnforceOrder,
FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" +
+ " UNTIL the order of FlowFiles physically get FIXED by operation
such as MergeContent or being stored to the final destination.")
+@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following
states per ordering group:" +
+ " '<groupId>.target' is a order number which is being waited to
arrive next." +
+ " When a FlowFile with a matching order arrives, or a FlowFile
overtakes the FlowFile being waited for because of wait timeout," +
+ " target order will be updated to (FlowFile.order + 1)." +
+ " '<groupId>.max is the maximum order number for a group." +
+ " '<groupId>.updatedAt' is a timestamp when the order of a group
was updated last time." +
+ " These managed states will be removed automatically once a group
is determined as inactive, see 'Inactive Timeout' for detail.")
+@WritesAttributes({
+ @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT,
+ description = "All FlowFiles going through this processor will
have this attribute. This value is used to determine wait timeout."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT,
+ description = "All FlowFiles going through this processor will
have this attribute denoting which relationship it was routed to."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL,
+ description = "FlowFiles routed to 'failure' or 'skipped'
relationship will have this attribute describing details."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER,
+ description = "FlowFiles routed to 'wait' or 'skipped'
relationship will have this attribute denoting expected order when the FlowFile
was processed.")
+})
+public class EnforceOrder extends AbstractProcessor {
+
+ public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
+ public static final String ATTR_EXPECTED_ORDER =
"EnforceOrder.expectedOrder";
+ public static final String ATTR_RESULT = "EnforceOrder.result";
+ public static final String ATTR_DETAIL = "EnforceOrder.detail";
+ private static final Function<String, String> STATE_TARGET_ORDER =
groupId -> groupId + ".target";
+ private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
+ private static final Function<String, String> STATE_UPDATED_AT =
groupId -> groupId + STATE_SUFFIX_UPDATED_AT;
+ private static final Function<String, String> STATE_MAX_ORDER =
groupId -> groupId + ".max";
+
+ public static final PropertyDescriptor GROUP_IDENTIFIER = new
PropertyDescriptor.Builder()
+ .name("group-id")
+ .displayName("Group Identifier")
+ .description("EnforceOrder is capable of multiple ordering
groups." +
+ " 'Group Identifier' is used to determine which group a
FlowFile belongs to." +
+ " This property will be evaluated with each incoming
FlowFile." +
+ " If evaluated result is empty, the FlowFile will be
routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("${filename}")
+ .build();
+
+ public static final PropertyDescriptor ORDER_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("order-attribute")
+ .displayName("Order Attribute")
+ .description("A name of FlowFile attribute whose value will be
used to enforce order of FlowFiles within a group." +
+ " If a FlowFile does not have this attribute, or its value
is not an integer, the FlowFile will be routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_ORDER = new
PropertyDescriptor.Builder()
+ .name("initial-order")
+ .displayName("Initial Order")
+ .description("When the first FlowFile of a group arrives, initial
target order will be computed and stored in the managed state." +
+ " After that, target order will start being tracked by
EnforceOrder and stored in the state management store." +
+ " If Expression Language is used but evaluated result was
not an integer, then the FlowFile will be routed to failure," +
+ " and initial order will be left unknown until consecutive
FlowFiles provide a valid initial order.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor MAX_ORDER = new
PropertyDescriptor.Builder()
+ .name("maximum-order")
+ .displayName("Maximum Order")
+ .description("If specified, any FlowFiles that has larger order
will be routed to failure." +
+ " This property is computed only once for a given group." +
+ " After a maximum order is computed, it will be persisted
in the state management store and used for other FlowFiles belonging to the
same group." +
+ " If Expression Language is used but evaluated result was
not an integer, then the FlowFile will be routed to failure," +
+ " and maximum order will be left unknown until consecutive
FlowFiles provide a valid maximum order.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor WAIT_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("wait-timeout")
+ .displayName("Wait Timeout")
+ .description("Indicates the duration after which waiting FlowFiles
will be routed to the 'overtook' relationship.")
+ .required(true)
+ .defaultValue("10 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor INACTIVE_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("inactive-timeout")
+ .displayName("Inactive Timeout")
+ .description("Indicates the duration after which state for an
inactive group will be cleared from managed state." +
+ " Group is determined as inactive if any new incoming
FlowFile has not seen for a group for specified duration." +
+ " Inactive Timeout must be longer than Wait Timeout." +
+ " If a FlowFile arrives late after its group is already
cleared, it will be treated as a brand new group," +
+ " but will never match the order since expected preceding
FlowFiles are already gone." +
+ " The FlowFile will eventually timeout for waiting and
routed to 'overtook'." +
+ " To avoid this, group states should be kept long enough,
however, shorter duration would be helpful for reusing the same group
identifier again.")
+ .required(true)
+ .defaultValue("30 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
+ .name("batch-count")
+ .displayName("Batch Count")
+ .description("The maximum number of FlowFiles that EnforceOrder
can process at an execution.")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile with a matching order number will be
routed to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFiles which does not have required attributes,
or fails to compute those will be routed to this relationship")
+ .build();
+
+ public static final Relationship REL_WAIT = new Relationship.Builder()
+ .name("wait")
+ .description("A FlowFile with non matching order will be routed to
this relationship")
+ .build();
+
+ public static final Relationship REL_OVERTOOK = new
Relationship.Builder()
+ .name("overtook")
+ .description("A FlowFile that waited for preceding FlowFiles
longer than Wait Timeout and overtook those FlowFiles, will be routed to this
relationship.")
+ .build();
+
+ public static final Relationship REL_SKIPPED = new
Relationship.Builder()
+ .name("skipped")
+ .description("A FlowFile that has an order younger than current,
which means arrived too late and skipped, will be routed to this relationship.")
+ .build();
+
+ private final Set<Relationship> relationships;
+
+ public EnforceOrder() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_WAIT);
+ rels.add(REL_OVERTOOK);
+ rels.add(REL_FAILURE);
+ rels.add(REL_SKIPPED);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(GROUP_IDENTIFIER);
+ descriptors.add(ORDER_ATTRIBUTE);
+ descriptors.add(INITIAL_ORDER);
+ descriptors.add(MAX_ORDER);
+ descriptors.add(BATCH_COUNT);
+ descriptors.add(WAIT_TIMEOUT);
+ descriptors.add(INACTIVE_TIMEOUT);
+ return descriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final Long waitTimeoutMillis =
validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+ final Long inactiveTimeoutMillis =
validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS);
+
+ if (waitTimeoutMillis >= inactiveTimeoutMillis) {
+ results.add(new
ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue())
+ .subject(INACTIVE_TIMEOUT.getDisplayName())
+ .explanation(String.format("%s should be longer than
%s",
+ INACTIVE_TIMEOUT.getDisplayName(),
WAIT_TIMEOUT.getDisplayName()))
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+
+ final ComponentLog logger = getLogger();
+ final Integer batchCount =
context.getProperty(BATCH_COUNT).asInteger();
+
+ final StateMap stateMap;
+ try {
+ stateMap = context.getStateManager().getState(Scope.LOCAL);
+ } catch (final IOException e) {
+ logger.error("Failed to retrieve state from StateManager due
to {}" + e, e);
+ context.yield();
+ return;
+ }
+
+ final OrderingContext oc = new OrderingContext(context, session);
+
+ oc.groupStates.putAll(stateMap.toMap());
+
+ for (int i = 0; i < batchCount; i++) {
+
+ oc.setFlowFile(session.get());
+ if (oc.flowFile == null) {
+ break;
+ }
+
+ if (!oc.computeGroupId()
+ || !oc.computeOrder()
+ || !oc.computeInitialOrder()
+ || !oc.computeMaxOrder()) {
+ continue;
+ }
+
+ // At this point, the flow file is confirmed to be valid.
+ oc.markFlowFileValied();
+ }
+
+ oc.transferFlowFiles();
+
+ oc.cleanupInactiveStates();
+
+ try {
+ context.getStateManager().setState(oc.groupStates,
Scope.LOCAL);
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to update state due to " + e
+ + ". Session will be rollback and processor will be
yielded for a while.", e);
+ }
+
+ }
+
+ private class OrderingContext {
+
+ private final ComponentLog logger = getLogger();
+ private final ProcessSession processSession;
+ private final ProcessContext processContext;
+
+ // Following properties are static global setting for all groups.
+ private final String orderAttribute;
+ private final Long waitTimeoutMillis;
+ private final Function<FlowFile, Integer> getOrder;
+
+ private final Map<String, String> groupStates = new HashMap<>();
+ private final long now = System.currentTimeMillis();
+
+ // Following properties are computed per flow file.
+ private final PropertyValue groupIdentifierProperty ;
+
+ // Followings are per group objects.
+ private final PropertyValue initOrderProperty;
+ private final PropertyValue maxOrderProperty;
+ private final Map<String, List<FlowFile>> flowFileGroups = new
TreeMap<>();
+
+ // Current variables within incoming FlowFiles loop.
+ private FlowFile flowFile;
+ private String groupId;
+ private Integer order;
+
+ private OrderingContext(final ProcessContext processContext, final
ProcessSession processSession) {
+ this.processContext = processContext;
+ this.processSession = processSession;
+
+ orderAttribute =
processContext.getProperty(ORDER_ATTRIBUTE).getValue();
+ waitTimeoutMillis =
processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ getOrder = flowFile ->
Integer.parseInt(flowFile.getAttribute(orderAttribute));
+
+
+ groupIdentifierProperty =
processContext.getProperty(GROUP_IDENTIFIER);
+
+ initOrderProperty = processContext.getProperty(INITIAL_ORDER);
+ maxOrderProperty = processContext.getProperty(MAX_ORDER);
+ }
+
+ private void setFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ this.groupId = null;
+ this.order = null;
+ }
+
+ private boolean computeGroupId() {
+ groupId =
groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue();
+ if (isBlank(groupId)) {
+ transferToFailure(flowFile, "Failed to get Group
Identifier.");
+ return false;
+ }
+ return true;
+ }
+
+ private boolean computeOrder() {
+ try {
+ order = getOrder.apply(flowFile);
+ } catch (final NumberFormatException e) {
+ transferToFailure(flowFile, "Failed to parse order
attribute due to " + e, e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean computeMaxOrder() {
+ if (maxOrderProperty.isSet()) {
+ // Compute maxOrder for this group if it's not there yet.
+ final String maxOrderStr =
groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId),
+ k ->
maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue());
+ if (isBlank(maxOrderStr)) {
+ transferToFailure(flowFile, String.format("%s was
specified but result was empty.", MAX_ORDER.getDisplayName()));
+ return false;
+ }
+
+ final Integer maxOrder;
+ try {
+ maxOrder = Integer.parseInt(maxOrderStr);
+ } catch (final NumberFormatException e) {
+ final String msg = String.format("Failed to get
Maximum Order for group [%s] due to %s", groupId, e);
+ transferToFailure(flowFile, msg, e);
+ return false;
+ }
+
+ // Check max order.
+ if (order > maxOrder) {
+ final String msg = String.format("Order (%d) is
greater than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId);
+ transferToFailure(flowFile, msg);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean computeInitialOrder() {
+ // Compute initial order. Use asInteger() to check if it's a
valid integer.
+ final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId);
+ try {
+ final AtomicReference<String> computedInitOrder = new
AtomicReference<>();
+ groupStates.computeIfAbsent(stateKeyOrder, k -> {
+ final String initOrderStr =
initOrderProperty.evaluateAttributeExpressions(flowFile).getValue();
+ final int initOrder = Integer.parseInt(initOrderStr);
+ computedInitOrder.set(initOrderStr);
+ return initOrderStr;
+ });
+ // If these map modification is in the computeIfAbsent
function, it causes this issue.
+ // JDK-8071667 : HashMap.computeIfAbsent() adds entry that
HashMap.get() does not find.
+ //
http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667
+ if (!isBlank(computedInitOrder.get())) {
+ groupStates.put(STATE_UPDATED_AT.apply(groupId),
String.valueOf(now));
+ }
+
+ } catch (final NumberFormatException e) {
+ final String msg = String.format("Failed to get Initial
Order for Group [%s] due to %s", groupId, e);
+ transferToFailure(flowFile, msg, e);
+ return false;
+ }
+ return true;
+ }
+
+ private void markFlowFileValied() {
--- End diff --
minor typo in the method name, should be Valid
> Implement an EnforceOrder processor
> -----------------------------------
>
> Key: NIFI-3414
> URL: https://issues.apache.org/jira/browse/NIFI-3414
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Koji Kawamura
>
> For some flows, it is imperative that the flow files are processed in a
> certain order. The PriorityAttributePrioritizer can be used on a connection
> to ensure that flow files going through that connection are in priority
> order, but depending on error-handling, branching, and other flow designs, it
> is possible for flow files to get out-of-order.
> I propose an EnforceOrder processor, which would be single-threaded and have
> (at a minimum) the following properties:
> 1) Order Attribute: This would be the name of a flow file attribute from
> which the current value will be retrieved.
> 2) Initial Value: This property specifies an initial value for the order. The
> processor is stateful, however, so this property is only used when there is
> no entry in the state map for current value.
> The processor would store the Initial Value into the state map (if no state
> map entry exists), then for each incoming flow file, it checks the value in
> the Order Attribute against the current value. If the attribute value
> matches the current value, the flow file is transferred to the "success"
> relationship, and the current value is incremented in the state map. If the
> attribute value does not match the current value, the session will be rolled
> back.
> Using this processor, along with a PriorityAttributePrioritizer on the
> incoming connection, will allow for out-of-order flow files to have a sort of
> "barrier", thereby guaranteeing that flow files transferred to the "success"
> relationship are in the specified order.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)