Repository: incubator-nifi Updated Branches: refs/heads/develop eb5ec703b -> a6740a6e2
NIFI-399 addressed items in the ticket Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a6740a6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a6740a6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a6740a6e Branch: refs/heads/develop Commit: a6740a6e2c87f4c994d305db55c0777dc4f99976 Parents: ad18853 Author: joewitt <[email protected]> Authored: Thu Mar 19 01:21:32 2015 -0400 Committer: joewitt <[email protected]> Committed: Thu Mar 19 10:05:19 2015 -0400 ---------------------------------------------------------------------- .../nifi/processor/util/StandardValidators.java | 22 +++ .../processor/util/TestStandardValidators.java | 31 ++++ .../nifi/processors/standard/ExtractText.java | 157 +++++++++++-------- .../processors/standard/TestExtractText.java | 15 +- 4 files changed, 146 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 10748fe..c9ae609 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -287,6 +287,28 @@ public class StandardValidators { return createAttributeExpressionLanguageValidator(expectedResultType, true); } + public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) { + return new Validator() { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context); + if(!vr.isValid()){ + return vr; + } + final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue(); + if(dataSizeBytes < minBytesInclusive){ + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build(); + } + if(dataSizeBytes > maxBytesInclusive){ + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build(); + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + + } + public static Validator createRegexMatchingValidator(final Pattern pattern) { return new Validator() { @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java index 2ae50c9..70b8d21 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -51,4 +51,35 @@ public class TestStandardValidators { vr = val.validate("TimePeriodTest", "1 sec", null); assertTrue(vr.isValid()); } + + @Test + public void testDataSizeBoundsValidator() { + Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000); + ValidationResult vr; + + vr = val.validate("DataSizeBounds", "5 GB", null); + assertFalse(vr.isValid()); + + vr = val.validate("DataSizeBounds", "0 B", null); + assertFalse(vr.isValid()); + + vr = val.validate("DataSizeBounds", "99 B", null); + assertFalse(vr.isValid()); + + vr = val.validate("DataSizeBounds", "100 B", null); + assertTrue(vr.isValid()); + + vr = val.validate("DataSizeBounds", "999 B", null); + assertTrue(vr.isValid()); + + vr = val.validate("DataSizeBounds", "1000 B", null); + assertTrue(vr.isValid()); + + vr = val.validate("DataSizeBounds", "1001 B", null); + assertFalse(vr.isValid()); + + vr = val.validate("DataSizeBounds", "water", null); + assertFalse(vr.isValid()); + + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java index 6c914d8..aa1b7a9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -47,7 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; @EventDriven @SideEffectFree @@ -58,10 +59,14 @@ import org.apache.commons.lang3.StringUtils; + "The results of those Regular Expressions are assigned to FlowFile Attributes. " + "Regular Expressions are entered by adding user-defined properties; " + "the name of the property maps to the Attribute Name into which the result will be placed. " - + "The value of the property must be a valid Regular Expressions with exactly one capturing group. " + + "The first capture group, if any found, will be placed into that attribute name." + + "But all catpure groups, including the matching string sequence itself will also be " + + "provided at that attribute name with an index value provided." + + "The value of the property must be a valid Regular Expressions with one or more capturing groups. " + "If the Regular Expression matches more than once, only the first match will be used. " + "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. " - + "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' and no attributes will be applied to the FlowFile.") + + "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' " + + "and no attributes will be applied to the FlowFile.") public class ExtractText extends AbstractProcessor { @@ -78,9 +83,18 @@ public class ExtractText extends AbstractProcessor { .description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE)) .defaultValue("1 MB") .build(); + public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new PropertyDescriptor.Builder() + .name("Maximum Capture Group Length") + .description("Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.") + .required(false) + .defaultValue("1024") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder() .name("Enable Canonical Equivalence") .description("Indicates that two characters match only when their full canonical decompositions match.") @@ -168,27 +182,29 @@ public class ExtractText extends AbstractProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> properties; + private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>(); @Override protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_MATCH); - relationships.add(REL_NO_MATCH); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(CHARACTER_SET); - properties.add(MAX_BUFFER_SIZE); - properties.add(CANON_EQ); - properties.add(CASE_INSENSITIVE); - properties.add(COMMENTS); - properties.add(DOTALL); - properties.add(LITERAL); - properties.add(MULTILINE); - properties.add(UNICODE_CASE); - properties.add(UNICODE_CHARACTER_CLASS); - properties.add(UNIX_LINES); - this.properties = Collections.unmodifiableList(properties); + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_MATCH); + rels.add(REL_NO_MATCH); + this.relationships = Collections.unmodifiableSet(rels); + + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(CHARACTER_SET); + props.add(MAX_BUFFER_SIZE); + props.add(MAX_CAPTURE_GROUP_LENGTH); + props.add(CANON_EQ); + props.add(CASE_INSENSITIVE); + props.add(COMMENTS); + props.add(DOTALL); + props.add(LITERAL); + props.add(MULTILINE); + props.add(UNICODE_CASE); + props.add(UNICODE_CHARACTER_CLASS); + props.add(UNIX_LINES); + this.properties = Collections.unmodifiableList(props); } @Override @@ -206,77 +222,80 @@ public class ExtractText extends AbstractProcessor { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .expressionLanguageSupported(false) - .addValidator(StandardValidators.createRegexValidator(1, 1, true)) + .addValidator(StandardValidators.createRegexValidator(1, 40, true)) .required(false) .dynamic(true) .build(); } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final List<FlowFile> flowFileBatch = session.get(50); - if (flowFileBatch.isEmpty()) { - return; - } - final ProcessorLog logger = getLogger(); + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + final Map<String, Pattern> compiledPatternsMap = new HashMap<>(); - // Compile the Regular Expressions - Map<String, Matcher> regexMap = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { if (!entry.getKey().isDynamic()) { continue; } final int flags = getCompileFlags(context); - final Matcher matcher = Pattern.compile(entry.getValue(), flags).matcher(""); - regexMap.put(entry.getKey().getName(), matcher); + final Pattern pattern = Pattern.compile(entry.getValue(), flags); + compiledPatternsMap.put(entry.getKey().getName(), pattern); } + compiledPattersMapRef.set(compiledPatternsMap); + } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final ProcessorLog logger = getLogger(); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger(); + final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()); + final byte[] buffer = new byte[(int) maxBufferSizeL]; + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + } + }); - final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - - for (FlowFile flowFile : flowFileBatch) { - - final Map<String, String> regexResults = new HashMap<>(); - - final byte[] buffer = new byte[maxBufferSize]; - - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer, false); - } - }); - - final int flowFileSize = Math.min((int) flowFile.getSize(), maxBufferSize); - - final String contentString = new String(buffer, 0, flowFileSize, charset); - - for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) { + final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset); + final Map<String, String> regexResults = new HashMap<>(); - final Matcher matcher = entry.getValue(); + final Map<String, Pattern> patternMap = compiledPattersMapRef.get(); + for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) { - matcher.reset(contentString); + final Matcher matcher = entry.getValue().matcher(contentString); - if (matcher.find()) { - final String group = matcher.group(1); - if (!StringUtils.isBlank(group)) { - regexResults.put(entry.getKey(), group); + if (matcher.find()) { + final String baseKey = entry.getKey(); + for (int i = 0; i <= matcher.groupCount(); i++) { + final String key = new StringBuilder(baseKey).append(".").append(i).toString(); + String value = matcher.group(i); + if (value.length() > maxCaptureGroupLength) { + value = value.substring(0, maxCaptureGroupLength); + } + regexResults.put(key, value); + if (i == 1) { + regexResults.put(baseKey, value); } } } + } - if (!regexResults.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, regexResults); - session.getProvenanceReporter().modifyAttributes(flowFile); - session.transfer(flowFile, REL_MATCH); - logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile}); - } else { - session.transfer(flowFile, REL_NO_MATCH); - logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile}); - } + if (!regexResults.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, regexResults); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_MATCH); + logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile}); + } else { + session.transfer(flowFile, REL_NO_MATCH); + logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile}); + } - } // end flowFileLoop } int getCompileFlags(ProcessContext context) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java index 355d255..2025767 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.nifi.processors.standard.ExtractText; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -43,7 +42,7 @@ public class TestExtractText { testRunner.setProperty("regex.result1", "(?s)(.*)"); testRunner.setProperty("regex.result2", "(?s).*(bar1).*"); testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*"); // reluctant gets first - testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*"); // reluctant w/ repeated pattern gets second + testRunner.setProperty("regex.result4", "(?s).*?(?:bar\\d).*?(bar\\d).*?(bar3).*"); // reluctant w/ repeated pattern gets second testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*"); // greedy gets last testRunner.setProperty("regex.result6", "(?s)^(.*)$"); testRunner.setProperty("regex.result7", "(?s)(XXX)"); @@ -57,6 +56,10 @@ public class TestExtractText { out.assertAttributeEquals("regex.result2", "bar1"); out.assertAttributeEquals("regex.result3", "bar1"); out.assertAttributeEquals("regex.result4", "bar2"); + out.assertAttributeEquals("regex.result4.0", SAMPLE_STRING); + out.assertAttributeEquals("regex.result4.1", "bar2"); + out.assertAttributeEquals("regex.result4.2", "bar3"); + out.assertAttributeNotExists("regex.result4.3"); out.assertAttributeEquals("regex.result5", "bar3"); out.assertAttributeEquals("regex.result6", SAMPLE_STRING); out.assertAttributeEquals("regex.result7", null); @@ -209,14 +212,6 @@ public class TestExtractText { } - @Test(expected = java.lang.AssertionError.class) - public void testTooManyCaptureGroups() throws UnsupportedEncodingException { - final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText()); - testRunner.setProperty("regex.result1", "(.)(.)"); - testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8")); - testRunner.run(); - } - @Test public void testMatchOutsideBuffer() throws Exception { final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
