[
https://issues.apache.org/jira/browse/NIFI-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468267#comment-15468267
]
ASF GitHub Bot commented on NIFI-2735:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/988#discussion_r77700099
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AggregateValues.java
---
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidJsonException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import net.sf.saxon.lib.NamespaceConstant;
+import net.sf.saxon.xpath.XPathEvaluator;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.xml.sax.InputSource;
+
+import javax.xml.transform.Source;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import javax.xml.xpath.XPathFactoryConfigurationException;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static javax.xml.xpath.XPathConstants.NODESET;
+
+/**
+ * A processor to intermediateAggregate values from incoming flow files.
The flow files are expected to have certain attributes set to allow for
+ * aggregations over a batch of flow files.
+ */
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "xml", "csv", "aggregate"})
+@SeeAlso({SplitText.class, SplitJson.class, SplitXml.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Aggregates values from incoming flow files based
on a path to the desired field and an aggregation operation (SUM, AVG, etc.). "
+ + "The flow files are expected to have mime.type, fragment.count,
fragment.identifier and fragment.index attributes set, in order for the
processor to determine "
+ + "when a fragment/batch is complete. Supported MIME types include
text/csv, text/plain, application/json, and application/xml. For text/csv and
text/plain types, "
+ + "the flow file is expected to contain a single row of
comma-delimited data (no headers). SplitText can be used for this purpose. For
JSON files, SplitJson can be "
+ + "used to split the document (setting the expected attributes,
etc.). For XML files, SplitXml can be used in a similar manner."
+ + "Note that all flow files with the same identifier must be
routed to this processor, otherwise the count will need "
+ + "need to be decremented for each flow file in order to represent
the number of flow files with the same identifier that will be routed to this
processor. "
+ + "Once all fragments with the same identifier have been
processed, a flow file will be transferred to the 'aggregate' relationship
containing an attribute with the "
+ + "aggregated value")
+@Stateful(scopes = Scope.LOCAL, description = "The processor will
temporarily keep state information for each fragment.identifier, including the
current intermediateAggregate "
+ + "value, the fragment count, and the current number of fragments
processed. Once all fragments have been processed, the state information for
that fragment "
+ + "will be cleared.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "mime.type", description = "The MIME
type of the flow file content."),
+ @ReadsAttribute(attribute = "fragment.identifier", description =
"The unique identifier for a collection of flow files."),
+ @ReadsAttribute(attribute = "fragment.count", description = "The
total number of incoming flow files with the unique identifier."),
+ @ReadsAttribute(attribute = "fragment.index", description = "The
index of this flow file into the collection of flow files with the same
identifier.")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "aggregate.value", description = "The
aggregated value from applying the specified operation to each flow file with
the same identifier."),
+ @WritesAttribute(attribute = "aggregate.operation", description =
"The name of the aggregation operation applied to the field in the flow
files."),
+ @WritesAttribute(attribute = "fragment.identifier", description =
"The unique identifier for the flow files used in the aggregation."),
+ @WritesAttribute(attribute = "fragment.count", description = "The
total number of incoming flow files with the unique identifier."),
+
+})
+public class AggregateValues extends AbstractSessionFactoryProcessor {
+
+ public static final String FRAGMENT_ID = "fragment.identifier";
+ public static final String FRAGMENT_INDEX = "fragment.index";
+ public static final String FRAGMENT_COUNT = "fragment.count";
+ public static final String AGGREGATE_VALUE = "aggregate.value";
+ public static final String AGGREGATE_OPERATION = "aggregate.operation";
+
+ public static final String CSV_MIME_TYPE = "text/csv";
+ public static final String JSON_MIME_TYPE = "application/json";
+ public static final String XML_MIME_TYPE = "application/xml";
+
+ public static final String COUNT = "COUNT";
+ public static final String SUM = "SUM";
+ public static final String AVG = "AVERAGE";
+ public static final String MIN = "MIN";
+ public static final String MAX = "MAX";
+ public static final String CONCAT = "CONCAT";
+
+ private static final Configuration STRICT_PROVIDER_CONFIGURATION =
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
+ private final AtomicReference<XPathFactory> factoryRef = new
AtomicReference<>();
+
+ // Properties
+ public static final PropertyDescriptor PATH = new
PropertyDescriptor.Builder()
+ .name("aggregator-path")
+ .displayName("Path To Field")
+ .description("An expression for selecting a field from the
incoming file. For JSON files this will be a JSON Path expression to a field of
primitive type, "
+ + "for XML files this will be a XPath expression to a
single element, and for CSV files this will be a column name (if a header line
is present) "
+ + "or an column index (0-based).")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor OPERATION = new
PropertyDescriptor.Builder()
+ .name("aggregator-operation")
+ .displayName("Operation")
+ .description("Which aggregation operation to perform on the
specified field for a batch of flow files. Note that some operations can/must
take a parameter, "
+ + "which is specified in the Operation Parameter
property.")
+ .required(true)
+ .allowableValues(COUNT, SUM, AVG, MIN, MAX, CONCAT)
+ .defaultValue(COUNT)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final PropertyDescriptor OPERATION_PARAM = new
PropertyDescriptor.Builder()
+ .name("aggregator-operation-parameter")
+ .displayName("Operation Parameter")
+ .description("An optional parameter given to the aggregation
operation. For COUNT, this is an optional integer value that indicates the
number to "
+ + "increment the count by (defaults to 1). For SUM,
MAX, MIN, AVERAGE, the value is ignored. For CONCAT, the value is a string
inserted "
+ + "between each field's value in the
intermediateAggregate string.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ // Relationships
+ public static final Relationship REL_AGGREGATE = new
Relationship.Builder()
+ .name("aggregate")
+ .description("Successfully created FlowFile with an attribute
for the aggregated value.")
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder()
+ .name("original")
+ .description("Once an incoming flow file is processed
successfully, it will be routed to this relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile fails processing for any reason
(for example, the FlowFile is not in a valid format or the specified "
+ + "path does not exist), it will be routed to this
relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+
+ static {
+ System.setProperty("javax.xml.xpath.XPathFactory:" +
NamespaceConstant.OBJECT_MODEL_SAXON, "net.sf.saxon.xpath.XPathFactoryImpl");
--- End diff --
Given that NiFi operates in a single JVM instance do we have any other
processor that may be doing the same thing yet with different value?
> Add processor to perform simple aggregations
> --------------------------------------------
>
> Key: NIFI-2735
> URL: https://issues.apache.org/jira/browse/NIFI-2735
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> This is a proposal for a new processor (AggregateValues, for example) that
> can perform simple aggregation operations such as count, sum, average, min,
> max, and concatenate, over a set of "related" flow files. For example, when a
> JSON file is split on an array (using the SplitJson processor), the total
> count of the splits, the index of each split, and the unique indentifier
> (shared by each split) are stored as attributes in each flow file sent to the
> "splits" relationship:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.SplitJson/index.html
> These attributes are the "fragment.*" attributes in the documentation for
> SplitText, SplitXml, and SplitJson, for example.
> Such a processor could perform these operations for each flow file split from
> the original document, and when all documents from a split have been
> processed, a flow file could be transferred to an "aggregate" relationship
> containing attributes for the operation, aggregate value, etc.
> An interesting application of this (besides the actual aggregation
> operations) is that you can use the "aggregate" relationship as an event
> trigger. For example if you need to wait until all files from a group are
> processed, you can use AggregateValues and the "aggregate" relationship to
> indicate downstream that the entire group has been processed. If there is not
> a Split processor upstream, then the attributes (fragment.*) would have to be
> manipulated by the data flow designer, but this can be accomplished with
> other processors (including the scripting processors if necessary).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)