Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159700110 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " + + "The resulting flowfile will not have its content modified.") +@WritesAttributes({ + @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), + @WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), + @WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), + @WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { + private static final List<Charset> STANDARD_CHARSETS = Arrays.asList( + StandardCharsets.UTF_8, + StandardCharsets.US_ASCII, + StandardCharsets.ISO_8859_1, + StandardCharsets.UTF_16, + StandardCharsets.UTF_16LE, + StandardCharsets.UTF_16BE); + + private static final String SYMBOL_REGEX = "[\\s-\\._]"; + private static final String WHITESPACE_ONLY_REGEX = "\\s"; + + // Attribute keys + public static final String TEXT_LINE_COUNT = "text.line.count"; + public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; + public static final String TEXT_WORD_COUNT = "text.word.count"; + public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + + + public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-count") + .displayName("Text Line Count") + .description("If enabled, will count the number of lines present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-nonempty-count") + .displayName("Text Line (non-empty) Count") + .description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-word-count") + .displayName("Text Word Count") + .description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" + + " present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-character-count") + .displayName("Text Character Count") + .description("If enabled, will count the number of characters (including whitespace and symbols) present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder() + .name("split-words-on-symbols") + .displayName("Split Words on Symbols") + .description("If enabled, the word count will identify strings separated by common logical delimiters [_-.] as independent words (ex. split-words-on-symbols = 4 words).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + // TODO: Stream map allowable values to name/key pair? + public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder() + .name("character-encoding") + .displayName("Character Encoding") + .description("Specifies a character encoding to use.") + .required(true) + .allowableValues(getStandardCharsetNames()) + .defaultValue(StandardCharsets.UTF_8.displayName()) + .build(); + + private static Set<String> getStandardCharsetNames() { + return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet()); + } + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The flowfile contains the original content with one or more attributes added containing the respective counts") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") + .build(); + + private static final List<PropertyDescriptor> properties; + private static final Set<Relationship> relationships; + + static { + properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD, + TEXT_LINE_NONEMPTY_COUNT_PD, + TEXT_WORD_COUNT_PD, + TEXT_CHARACTER_COUNT_PD, + SPLIT_WORDS_ON_SYMBOLS_PD, + CHARACTER_ENCODING_PD)); + + relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, + REL_FAILURE))); + } + + private volatile boolean countLines; + private volatile boolean countLinesNonEmpty; + private volatile boolean countWords; + private volatile boolean countCharacters; + private volatile boolean splitWordsOnSymbols; + private volatile String characterEncoding; + + private volatile int lineCount; + private volatile int lineNonEmptyCount; + private volatile int wordCount; + private volatile int characterCount; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onSchedule(ProcessContext context) { + this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false; + this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false; + this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet() + ? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false; + this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet() + ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false; + this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet() + ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false; + this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue(); + } + + /** + * Will count text attributes of the incoming stream. + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { + FlowFile sourceFlowFile = processSession.get(); + if (sourceFlowFile == null) { + return; + } + AtomicBoolean error = new AtomicBoolean(); + + lineCount = 0; + lineNonEmptyCount = 0; + wordCount = 0; + characterCount = 0; + + processSession.read(sourceFlowFile, in -> { + long start = System.nanoTime(); + + // Iterate over the lines in the text input + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding)); + String line; + while ((line = bufferedReader.readLine()) != null) { + if (countLines) { + lineCount++; + } + + if (countLinesNonEmpty) { + if (line.trim().length() > 0) { + lineNonEmptyCount++; + } + } + + if (countWords) { + wordCount += countWordsInLine(line, splitWordsOnSymbols); + } + + if (countCharacters) { + characterCount += line.length(); + } + } + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + final long durationNanos = stop - start; + DecimalFormat df = new DecimalFormat("#.###"); + getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds)."); + } + String message = generateMetricsMessage(); + getLogger().info(message); + } catch (IllegalStateException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure.", e); + } + }); + + if (error.get()) { + processSession.transfer(sourceFlowFile, REL_FAILURE); + } else { + Map<String, String> metricAttributes = new HashMap<>(); + if (countLines) { + metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount)); + } + if (countLinesNonEmpty) { + metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount)); + } + if (countWords) { + metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount)); + } + if (countCharacters) { + metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount)); + } + FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } + } + + private String generateMetricsMessage() { + StringBuilder sb = new StringBuilder("Counted "); + List<String> metrics = new ArrayList<>(); + if (countLines) { + metrics.add(lineCount + " lines"); + } + if (countLinesNonEmpty) { + metrics.add(lineNonEmptyCount + " non-empty lines"); + } + if (countWords) { + metrics.add(wordCount + " words"); + } + if (countCharacters) { + metrics.add(characterCount + " characters"); + } + sb.append(StringUtils.join(metrics, ", ")); + return sb.toString(); + } + + private int countWordsInLine(String line, boolean splitWordsOnSymbols) { + if (line == null || line.trim().length() == 0) { + return 0; + } else { + String regex = splitWordsOnSymbols ? SYMBOL_REGEX : WHITESPACE_ONLY_REGEX; --- End diff -- We should pre-compile these regexes, and a Pattern object as the member variable, then call regex.split(line) -- this way we don't re-compile the regex for every line and instead only compile it once for the processor lifecycle.
---