Repository: incubator-nifi
Updated Branches:
  refs/heads/develop eb5ec703b -> a6740a6e2


NIFI-399 addressed items in the ticket


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a6740a6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a6740a6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a6740a6e

Branch: refs/heads/develop
Commit: a6740a6e2c87f4c994d305db55c0777dc4f99976
Parents: ad18853
Author: joewitt <[email protected]>
Authored: Thu Mar 19 01:21:32 2015 -0400
Committer: joewitt <[email protected]>
Committed: Thu Mar 19 10:05:19 2015 -0400

----------------------------------------------------------------------
 .../nifi/processor/util/StandardValidators.java |  22 +++
 .../processor/util/TestStandardValidators.java  |  31 ++++
 .../nifi/processors/standard/ExtractText.java   | 157 +++++++++++--------
 .../processors/standard/TestExtractText.java    |  15 +-
 4 files changed, 146 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 10748fe..c9ae609 100644
--- 
a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ 
b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -287,6 +287,28 @@ public class StandardValidators {
         return createAttributeExpressionLanguageValidator(expectedResultType, 
true);
     }
 
+    public static Validator createDataSizeBoundsValidator(final long 
minBytesInclusive, final long maxBytesInclusive) {
+        return new Validator() {
+
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                final ValidationResult vr = 
DATA_SIZE_VALIDATOR.validate(subject, input, context);
+                if(!vr.isValid()){
+                    return vr;
+                }
+                final long dataSizeBytes = DataUnit.parseDataSize(input, 
DataUnit.B).longValue();
+                if(dataSizeBytes < minBytesInclusive){
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot
 be smaller than " + minBytesInclusive + " bytes").build();
+                }
+                if(dataSizeBytes > maxBytesInclusive){
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot
 be larger than " + maxBytesInclusive + " bytes").build();
+                }
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+
+    }
+
     public static Validator createRegexMatchingValidator(final Pattern 
pattern) {
         return new Validator() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
 
b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
index 2ae50c9..70b8d21 100644
--- 
a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
+++ 
b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
@@ -51,4 +51,35 @@ public class TestStandardValidators {
         vr = val.validate("TimePeriodTest", "1 sec", null);
         assertTrue(vr.isValid());
     }
+    
+    @Test
+    public void testDataSizeBoundsValidator() {
+        Validator val = StandardValidators.createDataSizeBoundsValidator(100, 
1000);
+        ValidationResult vr; 
+        
+        vr = val.validate("DataSizeBounds", "5 GB", null);
+        assertFalse(vr.isValid());
+        
+        vr = val.validate("DataSizeBounds", "0 B", null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("DataSizeBounds", "99 B", null);
+        assertFalse(vr.isValid());
+        
+        vr = val.validate("DataSizeBounds", "100 B", null);
+        assertTrue(vr.isValid());
+
+        vr = val.validate("DataSizeBounds", "999 B", null);
+        assertTrue(vr.isValid());
+
+        vr = val.validate("DataSizeBounds", "1000 B", null);
+        assertTrue(vr.isValid());
+        
+        vr = val.validate("DataSizeBounds", "1001 B", null);
+        assertFalse(vr.isValid());
+        
+        vr = val.validate("DataSizeBounds", "water", null);
+        assertFalse(vr.isValid());
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java
index 6c914d8..aa1b7a9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -47,7 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 
 @EventDriven
 @SideEffectFree
@@ -58,10 +59,14 @@ import org.apache.commons.lang3.StringUtils;
         + "The results of those Regular Expressions are assigned to FlowFile 
Attributes.  "
         + "Regular Expressions are entered by adding user-defined properties; "
         + "the name of the property maps to the Attribute Name into which the 
result will be placed.  "
-        + "The value of the property must be a valid Regular Expressions with 
exactly one capturing group.  "
+        + "The first capture group, if any found, will be placed into that 
attribute name."
+        + "But all catpure groups, including the matching string sequence 
itself will also be "
+        + "provided at that attribute name with an index value provided."
+        + "The value of the property must be a valid Regular Expressions with 
one or more capturing groups.  "
         + "If the Regular Expression matches more than once, only the first 
match will be used.  "
         + "If any provided Regular Expression matches, the FlowFile(s) will be 
routed to 'matched'. "
-        + "If no provided Regular Expression matches, the FlowFile will be 
routed to 'unmatched' and no attributes will be applied to the FlowFile.")
+        + "If no provided Regular Expression matches, the FlowFile will be 
routed to 'unmatched' "
+        + "and no attributes will be applied to the FlowFile.")
 
 public class ExtractText extends AbstractProcessor {
 
@@ -78,9 +83,18 @@ public class ExtractText extends AbstractProcessor {
             .description("Specifies the maximum amount of data to buffer (per 
file) in order to apply the regular expressions.  Files larger than the 
specified maximum will not be fully evaluated.")
             .required(true)
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(0, 
Integer.MAX_VALUE))
             .defaultValue("1 MB")
             .build();
 
+    public static final PropertyDescriptor MAX_CAPTURE_GROUP_LENGTH = new 
PropertyDescriptor.Builder()
+            .name("Maximum Capture Group Length")
+            .description("Specifies the maximum number of characters a given 
capture group value can have.  Any characters beyond the max will be 
truncated.")
+            .required(false)
+            .defaultValue("1024")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor CANON_EQ = new 
PropertyDescriptor.Builder()
             .name("Enable Canonical Equivalence")
             .description("Indicates that two characters match only when their 
full canonical decompositions match.")
@@ -168,27 +182,29 @@ public class ExtractText extends AbstractProcessor {
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
+    private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef 
= new AtomicReference<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_MATCH);
-        relationships.add(REL_NO_MATCH);
-        this.relationships = Collections.unmodifiableSet(relationships);
-
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(CHARACTER_SET);
-        properties.add(MAX_BUFFER_SIZE);
-        properties.add(CANON_EQ);
-        properties.add(CASE_INSENSITIVE);
-        properties.add(COMMENTS);
-        properties.add(DOTALL);
-        properties.add(LITERAL);
-        properties.add(MULTILINE);
-        properties.add(UNICODE_CASE);
-        properties.add(UNICODE_CHARACTER_CLASS);
-        properties.add(UNIX_LINES);
-        this.properties = Collections.unmodifiableList(properties);
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_MATCH);
+        rels.add(REL_NO_MATCH);
+        this.relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(CHARACTER_SET);
+        props.add(MAX_BUFFER_SIZE);
+        props.add(MAX_CAPTURE_GROUP_LENGTH);
+        props.add(CANON_EQ);
+        props.add(CASE_INSENSITIVE);
+        props.add(COMMENTS);
+        props.add(DOTALL);
+        props.add(LITERAL);
+        props.add(MULTILINE);
+        props.add(UNICODE_CASE);
+        props.add(UNICODE_CHARACTER_CLASS);
+        props.add(UNIX_LINES);
+        this.properties = Collections.unmodifiableList(props);
     }
 
     @Override
@@ -206,77 +222,80 @@ public class ExtractText extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
                 .expressionLanguageSupported(false)
-                .addValidator(StandardValidators.createRegexValidator(1, 1, 
true))
+                .addValidator(StandardValidators.createRegexValidator(1, 40, 
true))
                 .required(false)
                 .dynamic(true)
                 .build();
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final List<FlowFile> flowFileBatch = session.get(50);
-        if (flowFileBatch.isEmpty()) {
-            return;
-        }
-        final ProcessorLog logger = getLogger();
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        final Map<String, Pattern> compiledPatternsMap = new HashMap<>();
 
-        // Compile the Regular Expressions
-        Map<String, Matcher> regexMap = new HashMap<>();
         for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
             if (!entry.getKey().isDynamic()) {
                 continue;
             }
             final int flags = getCompileFlags(context);
-            final Matcher matcher = Pattern.compile(entry.getValue(), 
flags).matcher("");
-            regexMap.put(entry.getKey().getName(), matcher);
+            final Pattern pattern = Pattern.compile(entry.getValue(), flags);
+            compiledPatternsMap.put(entry.getKey().getName(), pattern);
         }
+        compiledPattersMapRef.set(compiledPatternsMap);
+    }
 
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final ProcessorLog logger = getLogger();
         final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final int maxCaptureGroupLength = 
context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger();
+        final long maxBufferSizeL = Math.min(flowFile.getSize(), 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue());
+        final byte[] buffer = new byte[(int) maxBufferSizeL];
+
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer, false);
+            }
+        });
 
-        final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        for (FlowFile flowFile : flowFileBatch) {
-
-            final Map<String, String> regexResults = new HashMap<>();
-
-            final byte[] buffer = new byte[maxBufferSize];
-
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    StreamUtils.fillBuffer(in, buffer, false);
-                }
-            });
-
-            final int flowFileSize = Math.min((int) flowFile.getSize(), 
maxBufferSize);
-
-            final String contentString = new String(buffer, 0, flowFileSize, 
charset);
-
-            for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) 
{
+        final String contentString = new String(buffer, 0, (int) 
maxBufferSizeL, charset);
+        final Map<String, String> regexResults = new HashMap<>();
 
-                final Matcher matcher = entry.getValue();
+        final Map<String, Pattern> patternMap = compiledPattersMapRef.get();
+        for (final Map.Entry<String, Pattern> entry : patternMap.entrySet()) {
 
-                matcher.reset(contentString);
+            final Matcher matcher = entry.getValue().matcher(contentString);
 
-                if (matcher.find()) {
-                    final String group = matcher.group(1);
-                    if (!StringUtils.isBlank(group)) {
-                        regexResults.put(entry.getKey(), group);
+            if (matcher.find()) {
+                final String baseKey = entry.getKey();
+                for (int i = 0; i <= matcher.groupCount(); i++) {
+                    final String key = new 
StringBuilder(baseKey).append(".").append(i).toString();
+                    String value = matcher.group(i);
+                    if (value.length() > maxCaptureGroupLength) {
+                        value = value.substring(0, maxCaptureGroupLength);
+                    }
+                    regexResults.put(key, value);
+                    if (i == 1) {
+                        regexResults.put(baseKey, value);
                     }
                 }
             }
+        }
 
-            if (!regexResults.isEmpty()) {
-                flowFile = session.putAllAttributes(flowFile, regexResults);
-                session.getProvenanceReporter().modifyAttributes(flowFile);
-                session.transfer(flowFile, REL_MATCH);
-                logger.info("Matched {} Regular Expressions and added 
attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
-            } else {
-                session.transfer(flowFile, REL_NO_MATCH);
-                logger.info("Did not match any Regular Expressions for  
FlowFile {}", new Object[]{flowFile});
-            }
+        if (!regexResults.isEmpty()) {
+            flowFile = session.putAllAttributes(flowFile, regexResults);
+            session.getProvenanceReporter().modifyAttributes(flowFile);
+            session.transfer(flowFile, REL_MATCH);
+            logger.info("Matched {} Regular Expressions and added attributes 
to FlowFile {}", new Object[]{regexResults.size(), flowFile});
+        } else {
+            session.transfer(flowFile, REL_NO_MATCH);
+            logger.info("Did not match any Regular Expressions for  FlowFile 
{}", new Object[]{flowFile});
+        }
 
-        } // end flowFileLoop
     }
 
     int getCompileFlags(ProcessContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6740a6e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java
index 355d255..2025767 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractText.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.nifi.processors.standard.ExtractText;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -43,7 +42,7 @@ public class TestExtractText {
         testRunner.setProperty("regex.result1", "(?s)(.*)");
         testRunner.setProperty("regex.result2", "(?s).*(bar1).*");
         testRunner.setProperty("regex.result3", "(?s).*?(bar\\d).*");  // 
reluctant gets first
-        testRunner.setProperty("regex.result4", 
"(?s).*?(?:bar\\d).*?(bar\\d).*"); // reluctant w/ repeated pattern gets second
+        testRunner.setProperty("regex.result4", 
"(?s).*?(?:bar\\d).*?(bar\\d).*?(bar3).*"); // reluctant w/ repeated pattern 
gets second
         testRunner.setProperty("regex.result5", "(?s).*(bar\\d).*");   // 
greedy gets last
         testRunner.setProperty("regex.result6", "(?s)^(.*)$");
         testRunner.setProperty("regex.result7", "(?s)(XXX)");
@@ -57,6 +56,10 @@ public class TestExtractText {
         out.assertAttributeEquals("regex.result2", "bar1");
         out.assertAttributeEquals("regex.result3", "bar1");
         out.assertAttributeEquals("regex.result4", "bar2");
+        out.assertAttributeEquals("regex.result4.0", SAMPLE_STRING);
+        out.assertAttributeEquals("regex.result4.1", "bar2");
+        out.assertAttributeEquals("regex.result4.2", "bar3");
+        out.assertAttributeNotExists("regex.result4.3");
         out.assertAttributeEquals("regex.result5", "bar3");
         out.assertAttributeEquals("regex.result6", SAMPLE_STRING);
         out.assertAttributeEquals("regex.result7", null);
@@ -209,14 +212,6 @@ public class TestExtractText {
 
     }
 
-    @Test(expected = java.lang.AssertionError.class)
-    public void testTooManyCaptureGroups() throws UnsupportedEncodingException 
{
-        final TestRunner testRunner = TestRunners.newTestRunner(new 
ExtractText());
-        testRunner.setProperty("regex.result1", "(.)(.)");
-        testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8"));
-        testRunner.run();
-    }
-
     @Test
     public void testMatchOutsideBuffer() throws Exception {
         final TestRunner testRunner = TestRunners.newTestRunner(new 
ExtractText());

Reply via email to