[ 
https://issues.apache.org/jira/browse/NIFI-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468772#comment-15468772
 ] 

ASF GitHub Bot commented on NIFI-2735:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/988#discussion_r77727912
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AggregateValues.java
 ---
    @@ -0,0 +1,580 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import com.jayway.jsonpath.Configuration;
    +import com.jayway.jsonpath.DocumentContext;
    +import com.jayway.jsonpath.InvalidJsonException;
    +import com.jayway.jsonpath.JsonPath;
    +import com.jayway.jsonpath.PathNotFoundException;
    +import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
    +import net.sf.saxon.lib.NamespaceConstant;
    +import net.sf.saxon.xpath.XPathEvaluator;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +import org.xml.sax.InputSource;
    +
    +import javax.xml.transform.Source;
    +import javax.xml.xpath.XPathConstants;
    +import javax.xml.xpath.XPathExpression;
    +import javax.xml.xpath.XPathExpressionException;
    +import javax.xml.xpath.XPathFactory;
    +import javax.xml.xpath.XPathFactoryConfigurationException;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static javax.xml.xpath.XPathConstants.NODESET;
    +
    +/**
    + * A processor to intermediateAggregate values from incoming flow files. 
The flow files are expected to have certain attributes set to allow for
    + * aggregations over a batch of flow files.
    + */
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"json", "xml", "csv", "aggregate"})
    +@SeeAlso({SplitText.class, SplitJson.class, SplitXml.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Aggregates values from incoming flow files based 
on a path to the desired field and an aggregation operation (SUM, AVG, etc.). "
    +        + "The flow files are expected to have mime.type, fragment.count, 
fragment.identifier and fragment.index attributes set, in order for the 
processor to determine "
    +        + "when a fragment/batch is complete. Supported MIME types include 
text/csv, text/plain, application/json, and application/xml. For text/csv and 
text/plain types, "
    +        + "the flow file is expected to contain a single row of 
comma-delimited data (no headers). SplitText can be used for this purpose. For 
JSON files, SplitJson can be "
    +        + "used to split the document (setting the expected attributes, 
etc.). For XML files, SplitXml can be used in a similar manner."
    +        + "Note that all flow files with the same identifier must be 
routed to this processor, otherwise the count will need "
    +        + "need to be decremented for each flow file in order to represent 
the number of flow files with the same identifier that will be routed to this 
processor. "
    +        + "Once all fragments with the same identifier have been 
processed, a flow file will be transferred to the 'aggregate' relationship 
containing an attribute with the "
    +        + "aggregated value")
    +@Stateful(scopes = Scope.LOCAL, description = "The processor will 
temporarily keep state information for each fragment.identifier, including the 
current intermediateAggregate "
    +        + "value, the fragment count, and the current number of fragments 
processed. Once all fragments have been processed, the state information for 
that fragment "
    +        + "will be cleared.")
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "mime.type", description = "The MIME 
type of the flow file content."),
    +        @ReadsAttribute(attribute = "fragment.identifier", description = 
"The unique identifier for a collection of flow files."),
    +        @ReadsAttribute(attribute = "fragment.count", description = "The 
total number of incoming flow files with the unique identifier."),
    +        @ReadsAttribute(attribute = "fragment.index", description = "The 
index of this flow file into the collection of flow files with the same 
identifier.")
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "aggregate.value", description = "The 
aggregated value from applying the specified operation to each flow file with 
the same identifier."),
    +        @WritesAttribute(attribute = "aggregate.operation", description = 
"The name of the aggregation operation applied to the field in the flow 
files."),
    +        @WritesAttribute(attribute = "fragment.identifier", description = 
"The unique identifier for the flow files used in the aggregation."),
    +        @WritesAttribute(attribute = "fragment.count", description = "The 
total number of incoming flow files with the unique identifier."),
    +
    +})
    +public class AggregateValues extends AbstractSessionFactoryProcessor {
    +
    +    public static final String FRAGMENT_ID = "fragment.identifier";
    +    public static final String FRAGMENT_INDEX = "fragment.index";
    +    public static final String FRAGMENT_COUNT = "fragment.count";
    +    public static final String AGGREGATE_VALUE = "aggregate.value";
    +    public static final String AGGREGATE_OPERATION = "aggregate.operation";
    +
    +    public static final String CSV_MIME_TYPE = "text/csv";
    +    public static final String JSON_MIME_TYPE = "application/json";
    +    public static final String XML_MIME_TYPE = "application/xml";
    +
    +    public static final String COUNT = "COUNT";
    +    public static final String SUM = "SUM";
    +    public static final String AVG = "AVERAGE";
    +    public static final String MIN = "MIN";
    +    public static final String MAX = "MAX";
    +    public static final String CONCAT = "CONCAT";
    +
    +    private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
    +    private final AtomicReference<XPathFactory> factoryRef = new 
AtomicReference<>();
    +
    +    // Properties
    +    public static final PropertyDescriptor PATH = new 
PropertyDescriptor.Builder()
    +            .name("aggregator-path")
    +            .displayName("Path To Field")
    +            .description("An expression for selecting a field from the 
incoming file. For JSON files this will be a JSON Path expression to a field of 
primitive type, "
    +                    + "for XML files this will be a XPath expression to a 
single element, and for CSV files this will be a column name (if a header line 
is present) "
    +                    + "or an column index (0-based).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION = new 
PropertyDescriptor.Builder()
    +            .name("aggregator-operation")
    +            .displayName("Operation")
    +            .description("Which aggregation operation to perform on the 
specified field for a batch of flow files. Note that some operations can/must 
take a parameter, "
    +                    + "which is specified in the Operation Parameter 
property.")
    +            .required(true)
    +            .allowableValues(COUNT, SUM, AVG, MIN, MAX, CONCAT)
    +            .defaultValue(COUNT)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_PARAM = new 
PropertyDescriptor.Builder()
    +            .name("aggregator-operation-parameter")
    +            .displayName("Operation Parameter")
    +            .description("An optional parameter given to the aggregation 
operation. For COUNT, this is an optional integer value that indicates the 
number to "
    +                    + "increment the count by (defaults to 1). For SUM, 
MAX, MIN, AVERAGE, the value is ignored. For CONCAT, the value is a string 
inserted "
    +                    + "between each field's value in the 
intermediateAggregate string.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_AGGREGATE = new 
Relationship.Builder()
    +            .name("aggregate")
    +            .description("Successfully created FlowFile with an attribute 
for the aggregated value.")
    +            .build();
    +
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +            .name("original")
    +            .description("Once an incoming flow file is processed 
successfully, it will be routed to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("If a FlowFile fails processing for any reason 
(for example, the FlowFile is not in a valid format or the specified "
    +                    + "path does not exist), it will be routed to this 
relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +
    +    static {
    +        System.setProperty("javax.xml.xpath.XPathFactory:" + 
NamespaceConstant.OBJECT_MODEL_SAXON, "net.sf.saxon.xpath.XPathFactoryImpl");
    --- End diff --
    
    Not with a different value that I know of, I stole this from SplitXml or 
EvaluateXPath. Probably could move both to a separate place.


> Add processor to perform simple aggregations
> --------------------------------------------
>
>                 Key: NIFI-2735
>                 URL: https://issues.apache.org/jira/browse/NIFI-2735
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> This is a proposal for a new processor (AggregateValues, for example) that 
> can perform simple aggregation operations such as count, sum, average, min, 
> max, and concatenate, over a set of "related" flow files. For example, when a 
> JSON file is split on an array (using the SplitJson processor), the total 
> count of the splits, the index of each split, and the unique indentifier 
> (shared by each split) are stored as attributes in each flow file sent to the 
> "splits" relationship:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.SplitJson/index.html
> These attributes are the "fragment.*" attributes in the documentation for 
> SplitText, SplitXml, and SplitJson, for example.
> Such a processor could perform these operations for each flow file split from 
> the original document, and when all documents from a split have been 
> processed, a flow file could be transferred to an "aggregate" relationship 
> containing attributes for the operation, aggregate value, etc.
> An interesting application of this (besides the actual aggregation 
> operations) is that you can use the "aggregate" relationship as an event 
> trigger. For example if you need to wait until all files from a group are 
> processed, you can use AggregateValues and the "aggregate" relationship to 
> indicate downstream that the entire group has been processed. If there is not 
> a Split processor upstream, then the attributes (fragment.*) would have to be 
> manipulated by the data flow designer, but this can be accomplished with 
> other processors (including the scripting processors if necessary). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to