Repository: nifi Updated Branches: refs/heads/master 5a39d2a81 -> 1803c15bc
NIFI-5231 Added RecordStats processor. NIFI-5231 Minor fixes from code review. NIFI-5231 Added documentation and made record_count a constant. NIFI-5231 Added changes requested in a code review. NIFI-5231 Added changes based on code review. This closes #2737. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1803c15b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1803c15b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1803c15b Branch: refs/heads/master Commit: 1803c15bcd552ca05cbb116ed0ffb8a7fe92519a Parents: 5a39d2a Author: Mike Thomsen <mikerthom...@gmail.com> Authored: Wed May 23 19:30:11 2018 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Mon Jun 11 13:35:01 2018 +0900 ---------------------------------------------------------------------- .../standard/CalculateRecordStats.java | 237 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 47 ++++ .../standard/TestCalculateRecordStats.groovy | 151 ++++++++++++ 4 files changed, 436 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java new file mode 100644 index 0000000..93029a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CalculateRecordStats.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.google.common.collect.Lists; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +@Tags({ "record", "stats", "metrics" }) +@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " + + "user-defined criteria on subsets of the record set.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = CalculateRecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile."), + @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.count", description = "A count of the records that contain a value for the user defined property."), + @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.<value>.count", + description = "Each value discovered for the user defined property will have its own count attribute. " + + "Total number of top N value counts to be added is defined by the limit configuration.") +}) +public class CalculateRecordStats extends AbstractProcessor { + static final String RECORD_COUNT_ATTR = "record_count"; + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-stats-reader") + .displayName("Record Reader") + .description("A record reader to use for reading the records.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("record-stats-limit") + .description("Limit the number of individual stats that are returned for each record path to the top N results.") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("If a flowfile is successfully processed, it goes here.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a flowfile fails to be processed, it goes here.") + .build(); + + private RecordPathCache cache; + + static final Set RELATIONSHIPS; + static final List<PropertyDescriptor> PROPERTIES; + + static { + Set _rels = new HashSet(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(_rels); + List<PropertyDescriptor> _temp = new ArrayList<>(); + _temp.add(RECORD_READER); + _temp.add(LIMIT); + PROPERTIES = Collections.unmodifiableList(_temp); + } + + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .dynamic(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onEnabled(ProcessContext context) { + cache = new RecordPathCache(25); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = session.get(); + if (input == null) { + return; + } + + try { + Map<String, RecordPath> paths = getRecordPaths(context, input); + Map<String, String> stats = getStats(input, paths, context, session); + + input = session.putAllAttributes(input, stats); + + session.transfer(input, REL_SUCCESS); + + } catch (Exception ex) { + getLogger().error("Error processing stats.", ex); + session.transfer(input, REL_FAILURE); + } + + } + + protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) { + return context.getProperties().keySet() + .stream().filter(p -> p.isDynamic()) + .collect(Collectors.toMap( + e -> e.getName(), + e -> { + String val = context.getProperty(e).evaluateAttributeExpressions(flowFile).getValue(); + return cache.getCompiled(val); + }) + ); + } + + protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) { + try (InputStream is = session.read(flowFile)) { + RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final Integer limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger(); + RecordReader reader = factory.createRecordReader(flowFile, is, getLogger()); + + Map<String, Integer> retVal = new HashMap<>(); + Record record; + + int recordCount = 0; + List<String> baseKeys = new ArrayList<>(); + while ((record = reader.nextRecord()) != null) { + for (Map.Entry<String, RecordPath> entry : paths.entrySet()) { + RecordPathResult result = entry.getValue().evaluate(record); + Optional<FieldValue> value = result.getSelectedFields().findFirst(); + if (value.isPresent() && value.get().getValue() != null) { + String approxValue = value.get().getValue().toString(); + String baseKey = String.format("recordStats.%s", entry.getKey()); + String key = String.format("%s.%s", baseKey, approxValue); + Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0; + Integer baseStat = retVal.getOrDefault(baseKey, 0); + stat++; + baseStat++; + + retVal.put(key, stat); + retVal.put(baseKey, baseStat); + + if (!baseKeys.contains(baseKey)) { + baseKeys.add(baseKey); + } + } + } + + recordCount++; + } + + retVal = filterBySize(retVal, limit, baseKeys); + + retVal.put(RECORD_COUNT_ATTR, recordCount); + + return retVal.entrySet().stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().toString() + )); + } catch (Exception e) { + getLogger().error("Could not read flowfile", e); + throw new ProcessException(e); + } + } + + protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) { + Map<String, Integer> toFilter = values.entrySet().stream() + .filter(e -> !baseKeys.contains(e.getKey())) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + Map<String, Integer> retVal = values.entrySet().stream() + .filter((e -> baseKeys.contains(e.getKey()))) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + + List<Map.Entry<String, Integer>> _flat = new ArrayList<>(toFilter.entrySet()); + _flat.sort(Map.Entry.comparingByValue()); + _flat = Lists.reverse(_flat); + for (int index = 0; index < _flat.size() && index < limit; index++) { + retVal.put(_flat.get(index).getKey(), _flat.get(index).getValue()); + } + + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8195963..72f0768 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,6 +15,7 @@ org.apache.nifi.processors.standard.AttributesToJSON org.apache.nifi.processors.standard.AttributesToCSV org.apache.nifi.processors.standard.Base64EncodeContent +org.apache.nifi.processors.standard.CalculateRecordStats org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html new file mode 100644 index 0000000..411d7a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.CalculateRecordStats/additionalDetails.html @@ -0,0 +1,47 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>CalculateRecordStats</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> +</head> +<body> + <p>This processor takes in a record set and counts both the overall count and counts that are defined as dynamic properties + that map a property name to a record path. Record path counts are provided at two levels:</p> + <ul> + <li>The overall count of all records that successfully evaluated a record path.</li> + <li>A breakdown of counts of unique values that matched the record path operation.</li> + </ul> + <p>Consider the following record structure:</p> + <pre> + { + "sport": "Soccer", + "name": "John Smith" + } + </pre> + <p>A valid mapping here would be <em>sport => /sport</em>.</p> + <p>For a record set with JSON like that, five entries and 3 instances of soccer and two instances of football, it would set the following + attributes:</p> + <ul> + <li>record_count: 5</li> + <li>sport: 5</li> + <li>sport.Soccer: 3</li> + <li>sport.Football: 2</li> + </ul> +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1803c15b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy new file mode 100644 index 0000000..6e3b1fc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestCalculateRecordStats.groovy @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard + +import org.apache.nifi.serialization.SimpleRecordSchema +import org.apache.nifi.serialization.record.MapRecord +import org.apache.nifi.serialization.record.MockRecordParser +import org.apache.nifi.serialization.record.RecordField +import org.apache.nifi.serialization.record.RecordFieldType +import org.apache.nifi.serialization.record.RecordSchema +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Before +import org.junit.Test + +class TestCalculateRecordStats { + TestRunner runner + MockRecordParser recordParser + RecordSchema personSchema + + @Before + void setup() { + runner = TestRunners.newTestRunner(CalculateRecordStats.class) + recordParser = new MockRecordParser() + runner.addControllerService("recordReader", recordParser) + runner.setProperty(CalculateRecordStats.RECORD_READER, "recordReader") + runner.enableControllerService(recordParser) + runner.assertValid() + + recordParser.addSchemaField("id", RecordFieldType.INT) + List<RecordField> personFields = new ArrayList<>() + RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()) + RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType()) + RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType()) + personFields.add(nameField) + personFields.add(ageField) + personFields.add(sportField) + personSchema = new SimpleRecordSchema(personFields) + recordParser.addSchemaField("person", RecordFieldType.RECORD) + } + + @Test + void testNoNullOrEmptyRecordFields() { + def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football", "Basketball" ] + def expectedAttributes = [ + "recordStats.sport.Soccer": "3", + "recordStats.sport.Football": "2", + "recordStats.sport.Basketball": "1", + "recordStats.sport": "6", + "record_count": "6" + ] + + commonTest([ "sport": "/person/sport"], sports, expectedAttributes) + } + + @Test + void testWithNullFields() { + def sports = [ "Soccer", null, null, "Football", null, "Basketball" ] + def expectedAttributes = [ + "recordStats.sport.Soccer": "1", + "recordStats.sport.Football": "1", + "recordStats.sport.Basketball": "1", + "recordStats.sport": "3", + "record_count": "6" + ] + + commonTest([ "sport": "/person/sport"], sports, expectedAttributes) + } + + @Test + void testWithFilters() { + def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football", "Basketball" ] + def expectedAttributes = [ + "recordStats.sport.Soccer": "3", + "recordStats.sport.Basketball": "1", + "recordStats.sport": "4", + "record_count": "6" + ] + + def propz = [ + "sport": "/person/sport[. != 'Football']" + ] + + commonTest(propz, sports, expectedAttributes) + } + + @Test + void testWithSizeLimit() { + runner.setProperty(CalculateRecordStats.LIMIT, "3") + def sports = [ "Soccer", "Soccer", "Soccer", "Football", "Football", + "Basketball", "Baseball", "Baseball", "Baseball", "Baseball", + "Skiing", "Skiing", "Skiing", "Snowboarding" + ] + def expectedAttributes = [ + "recordStats.sport.Skiing": "3", + "recordStats.sport.Soccer": "3", + "recordStats.sport.Baseball": "4", + "recordStats.sport": String.valueOf(sports.size()), + "record_count": String.valueOf(sports.size()) + ] + + def propz = [ + "sport": "/person/sport" + ] + + commonTest(propz, sports, expectedAttributes) + } + + private void commonTest(Map procProperties, List sports, Map expectedAttributes) { + int index = 1 + sports.each { sport -> + recordParser.addRecord(index++, new MapRecord(personSchema, [ + "name" : "John Doe", + "age" : 48, + "sport": sport + ])) + } + + procProperties.each { kv -> + runner.setProperty(kv.key, kv.value) + } + + runner.enqueue("") + runner.run() + runner.assertTransferCount(CalculateRecordStats.REL_FAILURE, 0) + runner.assertTransferCount(CalculateRecordStats.REL_SUCCESS, 1) + + def flowFiles = runner.getFlowFilesForRelationship(CalculateRecordStats.REL_SUCCESS) + def ff = flowFiles[0] + expectedAttributes.each { kv -> + Assert.assertNotNull("Missing ${kv.key}", ff.getAttribute(kv.key)) + Assert.assertEquals(kv.value, ff.getAttribute(kv.key)) + } + } +}