[ 
https://issues.apache.org/jira/browse/NIFI-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15470471#comment-15470471
 ] 

ASF GitHub Bot commented on NIFI-2735:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/988#discussion_r77811419
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AggregateValues.java
 ---
    @@ -0,0 +1,580 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import com.jayway.jsonpath.Configuration;
    +import com.jayway.jsonpath.DocumentContext;
    +import com.jayway.jsonpath.InvalidJsonException;
    +import com.jayway.jsonpath.JsonPath;
    +import com.jayway.jsonpath.PathNotFoundException;
    +import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
    +import net.sf.saxon.lib.NamespaceConstant;
    +import net.sf.saxon.xpath.XPathEvaluator;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +import org.xml.sax.InputSource;
    +
    +import javax.xml.transform.Source;
    +import javax.xml.xpath.XPathConstants;
    +import javax.xml.xpath.XPathExpression;
    +import javax.xml.xpath.XPathExpressionException;
    +import javax.xml.xpath.XPathFactory;
    +import javax.xml.xpath.XPathFactoryConfigurationException;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static javax.xml.xpath.XPathConstants.NODESET;
    +
    +/**
    + * A processor to intermediateAggregate values from incoming flow files. 
The flow files are expected to have certain attributes set to allow for
    + * aggregations over a batch of flow files.
    + */
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"json", "xml", "csv", "aggregate"})
    +@SeeAlso({SplitText.class, SplitJson.class, SplitXml.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Aggregates values from incoming flow files based 
on a path to the desired field and an aggregation operation (SUM, AVG, etc.). "
    +        + "The flow files are expected to have mime.type, fragment.count, 
fragment.identifier and fragment.index attributes set, in order for the 
processor to determine "
    +        + "when a fragment/batch is complete. Supported MIME types include 
text/csv, text/plain, application/json, and application/xml. For text/csv and 
text/plain types, "
    +        + "the flow file is expected to contain a single row of 
comma-delimited data (no headers). SplitText can be used for this purpose. For 
JSON files, SplitJson can be "
    +        + "used to split the document (setting the expected attributes, 
etc.). For XML files, SplitXml can be used in a similar manner."
    +        + "Note that all flow files with the same identifier must be 
routed to this processor, otherwise the count will need "
    +        + "need to be decremented for each flow file in order to represent 
the number of flow files with the same identifier that will be routed to this 
processor. "
    +        + "Once all fragments with the same identifier have been 
processed, a flow file will be transferred to the 'aggregate' relationship 
containing an attribute with the "
    +        + "aggregated value")
    +@Stateful(scopes = Scope.LOCAL, description = "The processor will 
temporarily keep state information for each fragment.identifier, including the 
current intermediateAggregate "
    --- End diff --
    
    This should be CLUSTER scope, will change.


> Add processor to perform simple aggregations
> --------------------------------------------
>
>                 Key: NIFI-2735
>                 URL: https://issues.apache.org/jira/browse/NIFI-2735
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> This is a proposal for a new processor (AggregateValues, for example) that 
> can perform simple aggregation operations such as count, sum, average, min, 
> max, and concatenate, over a set of "related" flow files. For example, when a 
> JSON file is split on an array (using the SplitJson processor), the total 
> count of the splits, the index of each split, and the unique indentifier 
> (shared by each split) are stored as attributes in each flow file sent to the 
> "splits" relationship:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.SplitJson/index.html
> These attributes are the "fragment.*" attributes in the documentation for 
> SplitText, SplitXml, and SplitJson, for example.
> Such a processor could perform these operations for each flow file split from 
> the original document, and when all documents from a split have been 
> processed, a flow file could be transferred to an "aggregate" relationship 
> containing attributes for the operation, aggregate value, etc.
> An interesting application of this (besides the actual aggregation 
> operations) is that you can use the "aggregate" relationship as an event 
> trigger. For example if you need to wait until all files from a group are 
> processed, you can use AggregateValues and the "aggregate" relationship to 
> indicate downstream that the entire group has been processed. If there is not 
> a Split processor upstream, then the attributes (fragment.*) would have to be 
> manipulated by the data flow designer, but this can be accomplished with 
> other processors (including the scripting processors if necessary). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to