Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1496#discussion_r110380550
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java
---
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@EventDriven
+@Tags({"sort", "order"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@TriggerSerially
+@CapabilityDescription("Enforces expected ordering of FlowFiles those
belong to the same data group. " +
+ " Although PriorityAttributePrioritizer can be used on a
connection to ensure that flow files going through that connection are in
priority order," +
+ " depending on error-handling, branching, and other flow designs,
it is possible for FlowFiles to get out-of-order." +
+ " EnforceOrder can be used to enforce original ordering for those
FlowFiles." +
+ " [IMPORTANT] In order to take effect of EnforceOrder,
FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" +
+ " UNTIL the order of FlowFiles physically get FIXED by operation
such as MergeContent or being stored to the final destination.")
+@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following
states per ordering group:" +
+ " '<groupId>.target' is a order number which is being waited to
arrive next." +
+ " When a FlowFile with a matching order arrives, or a FlowFile
overtakes the FlowFile being waited for because of wait timeout," +
+ " target order will be updated to (FlowFile.order + 1)." +
+ " '<groupId>.max is the maximum order number for a group." +
+ " '<groupId>.updatedAt' is a timestamp when the order of a group
was updated last time." +
+ " These managed states will be removed automatically once a group
is determined as inactive, see 'Inactive Timeout' for detail.")
+@WritesAttributes({
+ @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT,
+ description = "All FlowFiles going through this processor will
have this attribute. This value is used to determine wait timeout."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT,
+ description = "All FlowFiles going through this processor will
have this attribute denoting which relationship it was routed to."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL,
+ description = "FlowFiles routed to 'failure' or 'skipped'
relationship will have this attribute describing details."),
+ @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER,
+ description = "FlowFiles routed to 'wait' or 'skipped'
relationship will have this attribute denoting expected order when the FlowFile
was processed.")
+})
+public class EnforceOrder extends AbstractProcessor {
+
+ public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
+ public static final String ATTR_EXPECTED_ORDER =
"EnforceOrder.expectedOrder";
+ public static final String ATTR_RESULT = "EnforceOrder.result";
+ public static final String ATTR_DETAIL = "EnforceOrder.detail";
+ private static final Function<String, String> STATE_TARGET_ORDER =
groupId -> groupId + ".target";
+ private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
+ private static final Function<String, String> STATE_UPDATED_AT =
groupId -> groupId + STATE_SUFFIX_UPDATED_AT;
+ private static final Function<String, String> STATE_MAX_ORDER =
groupId -> groupId + ".max";
+
+ public static final PropertyDescriptor GROUP_IDENTIFIER = new
PropertyDescriptor.Builder()
+ .name("group-id")
+ .displayName("Group Identifier")
+ .description("EnforceOrder is capable of multiple ordering
groups." +
+ " 'Group Identifier' is used to determine which group a
FlowFile belongs to." +
+ " This property will be evaluated with each incoming
FlowFile." +
+ " If evaluated result is empty, the FlowFile will be
routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("${filename}")
+ .build();
+
+ public static final PropertyDescriptor ORDER_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("order-attribute")
+ .displayName("Order Attribute")
+ .description("A name of FlowFile attribute whose value will be
used to enforce order of FlowFiles within a group." +
+ " If a FlowFile does not have this attribute, or its value
is not an integer, the FlowFile will be routed to failure.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_ORDER = new
PropertyDescriptor.Builder()
+ .name("initial-order")
+ .displayName("Initial Order")
+ .description("When the first FlowFile of a group arrives, initial
target order will be computed and stored in the managed state." +
+ " After that, target order will start being tracked by
EnforceOrder and stored in the state management store." +
+ " If Expression Language is used but evaluated result was
not an integer, then the FlowFile will be routed to failure," +
+ " and initial order will be left unknown until consecutive
FlowFiles provide a valid initial order.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("1")
--- End diff --
A number of processors (the Split processors, CaptureChange, etc.) use zero
as the first index / sequence ID by default, I'm thinking we should default
this property to zero as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---