Repository: nifi Updated Branches: refs/heads/master 4196140e4 -> a7f1eb89c
NIFI-4727 Added CountText processor and unit test. This closes #2371. Signed-off-by: Kevin Doran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7f1eb89 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7f1eb89 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7f1eb89 Branch: refs/heads/master Commit: a7f1eb89c23e90f813be12adc4dca99de36c7c15 Parents: 4196140 Author: Andy LoPresto <[email protected]> Authored: Tue Jan 2 14:47:33 2018 -0500 Committer: Andy LoPresto <[email protected]> Committed: Mon Jan 8 14:26:14 2018 -0800 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 1 + .../nifi/processors/standard/CountText.java | 327 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/CountTextTest.groovy | 332 +++++++++++++++++++ .../resources/TestCountText/jabberwocky.txt | 34 ++ 5 files changed, 695 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2557d1a..89021c6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -395,6 +395,7 @@ <exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude> <exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude> <exclude>src/test/resources/TestConvertJSONToSQL/person-with-bool.json</exclude> + <exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude> <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude> <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude> <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude> http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java new file mode 100644 index 0000000..20195bd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " + + "The resulting flowfile will not have its content modified.") +@WritesAttributes({ + @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), + @WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), + @WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), + @WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { + private static final List<Charset> STANDARD_CHARSETS = Arrays.asList( + StandardCharsets.UTF_8, + StandardCharsets.US_ASCII, + StandardCharsets.ISO_8859_1, + StandardCharsets.UTF_16, + StandardCharsets.UTF_16LE, + StandardCharsets.UTF_16BE); + + private static final Pattern SYMBOL_PATTERN = Pattern.compile("[\\s-\\._]"); + private static final Pattern WHITESPACE_ONLY_PATTERN = Pattern.compile("\\s"); + + // Attribute keys + public static final String TEXT_LINE_COUNT = "text.line.count"; + public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; + public static final String TEXT_WORD_COUNT = "text.word.count"; + public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + + + public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-count") + .displayName("Count Lines") + .description("If enabled, will count the number of lines present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-nonempty-count") + .displayName("Count Non-Empty Lines") + .description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-word-count") + .displayName("Count Words") + .description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" + + " present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-character-count") + .displayName("Count Characters") + .description("If enabled, will count the number of characters (including whitespace and symbols, but not including newlines and carriage returns) present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder() + .name("split-words-on-symbols") + .displayName("Split Words on Symbols") + .description("If enabled, the word count will identify strings separated by common logical delimiters [ _ - . ] as independent words (ex. split-words-on-symbols = 4 words).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder() + .name("character-encoding") + .displayName("Character Encoding") + .description("Specifies a character encoding to use.") + .required(true) + .allowableValues(getStandardCharsetNames()) + .defaultValue(StandardCharsets.UTF_8.displayName()) + .build(); + + private static Set<String> getStandardCharsetNames() { + return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet()); + } + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The flowfile contains the original content with one or more attributes added containing the respective counts") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") + .build(); + + private static final List<PropertyDescriptor> properties; + private static final Set<Relationship> relationships; + + static { + properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD, + TEXT_LINE_NONEMPTY_COUNT_PD, + TEXT_WORD_COUNT_PD, + TEXT_CHARACTER_COUNT_PD, + SPLIT_WORDS_ON_SYMBOLS_PD, + CHARACTER_ENCODING_PD)); + + relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, + REL_FAILURE))); + } + + private volatile boolean countLines; + private volatile boolean countLinesNonEmpty; + private volatile boolean countWords; + private volatile boolean countCharacters; + private volatile boolean splitWordsOnSymbols; + private volatile String characterEncoding = StandardCharsets.UTF_8.name(); + + private volatile int lineCount; + private volatile int lineNonEmptyCount; + private volatile int wordCount; + private volatile int characterCount; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onSchedule(ProcessContext context) { + this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false; + this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false; + this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet() + ? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false; + this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet() + ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false; + this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet() + ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false; + this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue(); + } + + /** + * Will count text attributes of the incoming stream. + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { + FlowFile sourceFlowFile = processSession.get(); + if (sourceFlowFile == null) { + return; + } + AtomicBoolean error = new AtomicBoolean(); + + lineCount = 0; + lineNonEmptyCount = 0; + wordCount = 0; + characterCount = 0; + + processSession.read(sourceFlowFile, in -> { + long start = System.nanoTime(); + + // Iterate over the lines in the text input + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding)); + String line; + while ((line = bufferedReader.readLine()) != null) { + if (countLines) { + lineCount++; + } + + if (countLinesNonEmpty) { + if (line.trim().length() > 0) { + lineNonEmptyCount++; + } + } + + if (countWords) { + wordCount += countWordsInLine(line, splitWordsOnSymbols); + } + + if (countCharacters) { + characterCount += line.length(); + } + } + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + final long durationNanos = stop - start; + DecimalFormat df = new DecimalFormat("#.###"); + getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds)."); + } + if (getLogger().isInfoEnabled()) { + String message = generateMetricsMessage(); + getLogger().info(message); + } + + // Update session counters + processSession.adjustCounter("Lines Counted", (long) lineCount, false); + processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false); + processSession.adjustCounter("Words Counted", (long) wordCount, false); + processSession.adjustCounter("Characters Counted", (long) characterCount, false); + } catch (IOException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure.", e); + } + }); + + if (error.get()) { + processSession.transfer(sourceFlowFile, REL_FAILURE); + } else { + Map<String, String> metricAttributes = new HashMap<>(); + if (countLines) { + metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount)); + } + if (countLinesNonEmpty) { + metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount)); + } + if (countWords) { + metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount)); + } + if (countCharacters) { + metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount)); + } + FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } + } + + private String generateMetricsMessage() { + StringBuilder sb = new StringBuilder("Counted "); + List<String> metrics = new ArrayList<>(); + if (countLines) { + metrics.add(lineCount + " lines"); + } + if (countLinesNonEmpty) { + metrics.add(lineNonEmptyCount + " non-empty lines"); + } + if (countWords) { + metrics.add(wordCount + " words"); + } + if (countCharacters) { + metrics.add(characterCount + " characters"); + } + sb.append(StringUtils.join(metrics, ", ")); + return sb.toString(); + } + + int countWordsInLine(String line, boolean splitWordsOnSymbols) throws IOException { + if (line == null || line.trim().length() == 0) { + return 0; + } else { + Pattern regex = splitWordsOnSymbols ? SYMBOL_PATTERN : WHITESPACE_ONLY_PATTERN; + final String[] words = regex.split(line); + // TODO: Trim individual words before counting to eliminate whitespace words? + if (getLogger().isDebugEnabled()) { + getLogger().debug("Split [" + line + "] to [" + StringUtils.join(Arrays.asList(words), ", ") + "] (" + words.length + ")"); + } + return words.length; + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 3fb0de3..a93b37f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -19,6 +19,7 @@ org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.ConvertJSONToSQL org.apache.nifi.processors.standard.ConvertRecord +org.apache.nifi.processors.standard.CountText org.apache.nifi.processors.standard.DebugFlow org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy new file mode 100644 index 0000000..52b6908 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.util.MockProcessSession +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.mockito.Mockito +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.nio.charset.StandardCharsets +import java.security.Security + +import static org.mockito.Matchers.anyBoolean +import static org.mockito.Matchers.anyString +import static org.mockito.Mockito.when + +@RunWith(JUnit4.class) +class CountTextTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class) + + private static final String TLC = "text.line.count" + private static final String TLNEC = "text.line.nonempty.count" + private static final String TWC = "text.word.count" + private static final String TCC = "text.character.count" + + + @BeforeClass + static void setUpOnce() throws Exception { + Security.addProvider(new BouncyCastleProvider()) + + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + } + + @After + void tearDown() throws Exception { + } + + @Test + void testShouldCountAllMetrics() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + // This text is the same as in src/test/resources/TestCountText/jabberwocky.txt but is copied here + // to ensure that reading from a file vs. static text doesn't cause line break issues + String INPUT_TEXT = """âTwas brillig, and the slithy toves +Did gyre and gimble in the wade; +All mimsy were the borogoves, +And the mome raths outgrabe. + +"Beware the Jabberwock, my son! +The jaws that bite, the claws that catch! +Beware the Jubjub bird, and shun +The frumious Bandersnatch!" + +He took his vorpal sword in hand: +Long time the manxome foe he soughtâ +So rested he by the Tumtum tree, +And stood awhile in thought. + +And as in uffish thought he stood, +The Jabberwock, with eyes of flame, +Came whiffling through the tulgey wood. +And burbled as it came! + +One, two! One, two! And through and through +The vorpal blade went snicker-snack! +He left it dead, and with its head +He went galumphing back. + +"And hast thou slain the Jabberwock? +Come to my arms, my beamish boy! +O frabjous day! Callooh! Callay!" +He chortled in his joy. + +âTwas brillig, and the slithy toves +Did gyre and gimble in the wabe; +All mimsy were the borogoves, +And the mome raths outgrabe.""" + + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + assert flowFile.attributes."$TLC" == 34 as String + assert flowFile.attributes."$TLNEC" == 28 as String + assert flowFile.attributes."$TWC" == 166 as String + assert flowFile.attributes."$TCC" == 900 as String + } + + @Test + void testShouldCountEachMetric() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + def linesOnly = [(CountText.TEXT_LINE_COUNT_PD): "true"] + def linesNonEmptyOnly = [(CountText.TEXT_LINE_NONEMPTY_COUNT_PD): "true"] + def wordsOnly = [(CountText.TEXT_WORD_COUNT_PD): "true"] + def charactersOnly = [(CountText.TEXT_CHARACTER_COUNT_PD): "true"] + + final List<Map<PropertyDescriptor, String>> SCENARIOS = [linesOnly, linesNonEmptyOnly, wordsOnly, charactersOnly] + + SCENARIOS.each { map -> + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false") + + // Apply the scenario-specific properties + map.each { key, value -> + runner.setProperty(key, value) + } + + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + } + + @Test + void testShouldCountWordsSplitOnSymbol() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final int EXPECTED_WORD_COUNT = 167 + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false") + runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true") + + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String + } + + @Test + void testShouldCountIndependentlyPerFlowFile() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + 2.times { int i -> + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + } + + @Test + void testShouldTrackSessionCountersAcrossMultipleFlowfiles() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + MockProcessSession mockPS = runner.processSessionFactory.createSession() + def sessionCounters = mockPS.sharedState.counterMap + logger.info("Session counters (0): ${sessionCounters}") + + int n = 2 + + n.times { int i -> + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + logger.info("Session counters (${i + 1}): ${sessionCounters}") + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + + assert sessionCounters.get("Lines Counted").get() == EXPECTED_VALUES[TLC] * n as long + assert sessionCounters.get("Lines (non-empty) Counted").get() == EXPECTED_VALUES[TLNEC] * n as long + assert sessionCounters.get("Words Counted").get() == EXPECTED_VALUES[TWC] * n as long + assert sessionCounters.get("Characters Counted").get() == EXPECTED_VALUES[TCC] * n as long + } + + @Test + void testShouldHandleInternalError() throws Exception { + // Arrange + CountText ct = new CountText() + ct.countLines = true + ct.countLinesNonEmpty = true + ct.countWords = true + ct.countCharacters = true + + CountText ctSpy = Mockito.spy(ct) + when(ctSpy.countWordsInLine(anyString(), anyBoolean())).thenThrow(new IOException("Expected exception")) + + final TestRunner runner = TestRunners.newTestRunner(ctSpy) + final String INPUT_TEXT = "This flowfile should throw an error" + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + runner.setProperty(CountText.CHARACTER_ENCODING_PD, StandardCharsets.US_ASCII.displayName()) + + runner.enqueue(INPUT_TEXT.bytes) + + // Act + // Need initialize = true to run #onScheduled() + runner.run(1, true, true) + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_FAILURE, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_FAILURE).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt new file mode 100644 index 0000000..9d76b37 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt @@ -0,0 +1,34 @@ +âTwas brillig, and the slithy toves +Did gyre and gimble in the wade; +All mimsy were the borogoves, +And the mome raths outgrabe. + +"Beware the Jabberwock, my son! +The jaws that bite, the claws that catch! +Beware the Jubjub bird, and shun +The frumious Bandersnatch!" + +He took his vorpal sword in hand: +Long time the manxome foe he soughtâ +So rested he by the Tumtum tree, +And stood awhile in thought. + +And as in uffish thought he stood, +The Jabberwock, with eyes of flame, +Came whiffling through the tulgey wood. +And burbled as it came! + +One, two! One, two! And through and through +The vorpal blade went snicker-snack! +He left it dead, and with its head +He went galumphing back. + +"And hast thou slain the Jabberwock? +Come to my arms, my beamish boy! +O frabjous day! Callooh! Callay!" +He chortled in his joy. + +âTwas brillig, and the slithy toves +Did gyre and gimble in the wabe; +All mimsy were the borogoves, +And the mome raths outgrabe. \ No newline at end of file
