Repository: nifi
Updated Branches:
  refs/heads/master 35f4f48f3 -> a5d630672


NIFI-3635: This closes #1631. Avoid using a static member variable for the 
'Grok' object. Code cleanup

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a5d63067
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a5d63067
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a5d63067

Branch: refs/heads/master
Commit: a5d630672a7b24ca1f60073182209b998172c0b3
Parents: 35f4f48
Author: Mark Payne <[email protected]>
Authored: Tue Mar 28 15:59:02 2017 -0400
Committer: joewitt <[email protected]>
Committed: Fri Apr 7 13:22:46 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExtractGrok.java   | 114 ++++++++-----------
 .../processors/standard/TestExtractGrok.java    |  16 +--
 2 files changed, 49 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a5d63067/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
index 116513c..2790dc9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
@@ -43,7 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
@@ -65,61 +64,59 @@ import java.util.concurrent.TimeUnit;
 
 @Tags({"grok", "log", "text", "parse", "delimit", "extract"})
 @CapabilityDescription("Evaluates one or more Grok Expressions against the 
content of a FlowFile, " +
-        "adding the results as attributes or replacing the content of the 
FlowFile with a JSON " +
-        "notation of the matched content")
+    "adding the results as attributes or replacing the content of the FlowFile 
with a JSON " +
+    "notation of the matched content")
 @WritesAttributes({
-        @WritesAttribute(attribute = "grok.XXX", description = "When operating 
in flowfile-attribute mode, each of the Grok identifier that is matched in the 
flowfile " +
-                                                               "will be added 
as an attribute, prefixed with \"grok.\" For example," +
-                                                               "if the grok 
identifier \"timestamp\" is matched, then the value will be added to an 
attribute named \"grok.timestamp\"")})
+    @WritesAttribute(attribute = "grok.XXX", description = "When operating in 
flowfile-attribute mode, each of the Grok identifier that is matched in the 
flowfile " +
+        "will be added as an attribute, prefixed with \"grok.\" For example," +
+        "if the grok identifier \"timestamp\" is matched, then the value will 
be added to an attribute named \"grok.timestamp\"")})
 public class ExtractGrok extends AbstractProcessor {
 
-
     public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute";
     public static final String FLOWFILE_CONTENT = "flowfile-content";
     private static final String APPLICATION_JSON = "application/json";
 
-    public static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor
-            .Builder().name("Grok Expression")
-            .description("Grok expression")
-            .required(true)
-            .addValidator(validateGrokExpression())
-            .build();
+    public static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor.Builder()
+        .name("Grok Expression")
+        .description("Grok expression")
+        .required(true)
+        .addValidator(validateGrokExpression())
+        .build();
 
-    public static final PropertyDescriptor GROK_PATTERN_FILE = new 
PropertyDescriptor
-            .Builder().name("Grok Pattern file")
-            .description("Grok Pattern file definition")
-            .required(true)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .build();
+    public static final PropertyDescriptor GROK_PATTERN_FILE = new 
PropertyDescriptor.Builder()
+        .name("Grok Pattern file")
+        .description("Grok Pattern file definition")
+        .required(true)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .build();
 
     public static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
-            .name("Destination")
-            .description("Control if Grok output value is written as a new 
flowfile attributes, in this case " +
-                    "each of the Grok identifier that is matched in the 
flowfile will be added as an attribute, " +
-                    "prefixed with \"grok.\" or written in the flowfile 
content. Writing to flowfile content " +
-                    "will overwrite any existing flowfile content.")
-
-            .required(true)
-            .allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT)
-            .defaultValue(FLOWFILE_ATTRIBUTE)
-            .build();
-
-    public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor
-            .Builder().name("Character Set")
-            .description("The Character Set in which the file is encoded")
-            .required(true)
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .defaultValue("UTF-8")
-            .build();
-
-    public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor
-            .Builder().name("Maximum Buffer Size")
-            .description("Specifies the maximum amount of data to buffer (per 
file) in order to apply the Grok expressions. Files larger than the specified 
maximum will not be fully evaluated.")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .addValidator(StandardValidators.createDataSizeBoundsValidator(0, 
Integer.MAX_VALUE))
-            .defaultValue("1 MB")
-            .build();
+        .name("Destination")
+        .description("Control if Grok output value is written as a new 
flowfile attributes, in this case " +
+            "each of the Grok identifier that is matched in the flowfile will 
be added as an attribute, " +
+            "prefixed with \"grok.\" or written in the flowfile content. 
Writing to flowfile content " +
+            "will overwrite any existing flowfile content.")
+        .required(true)
+        .allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT)
+        .defaultValue(FLOWFILE_ATTRIBUTE)
+        .build();
+
+    public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the file is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Maximum Buffer Size")
+        .description("Specifies the maximum amount of data to buffer (per 
file) in order to apply the Grok expressions. Files larger than the specified 
maximum will not be fully evaluated.")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .addValidator(StandardValidators.createDataSizeBoundsValidator(0, 
Integer.MAX_VALUE))
+        .defaultValue("1 MB")
+        .build();
 
     public static final Relationship REL_MATCH = new Relationship.Builder()
             .name("matched")
@@ -134,11 +131,9 @@ public class ExtractGrok extends AbstractProcessor {
     private final static List<PropertyDescriptor> descriptors;
     private final static Set<Relationship> relationships;
 
-
-    private final static Grok grok = Grok.EMPTY;
+    private volatile Grok grok = new Grok();
     private final BlockingQueue<byte[]> bufferQueue = new 
LinkedBlockingQueue<>();
 
-
     static {
         final Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_MATCH);
@@ -154,7 +149,6 @@ public class ExtractGrok extends AbstractProcessor {
         descriptors = Collections.unmodifiableList(_descriptors);
     }
 
-
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -165,7 +159,6 @@ public class ExtractGrok extends AbstractProcessor {
         return descriptors;
     }
 
-
     @OnStopped
     public void onStopped() {
         bufferQueue.clear();
@@ -173,18 +166,15 @@ public class ExtractGrok extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws GrokException 
{
-
-
         for (int i = 0; i < context.getMaxConcurrentTasks(); i++) {
             final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
             final byte[] buffer = new byte[maxBufferSize];
             bufferQueue.add(buffer);
         }
 
-
+        grok = new Grok();
         
grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue());
         grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
-
     }
 
     @Override
@@ -216,20 +206,18 @@ public class ExtractGrok extends AbstractProcessor {
             bufferQueue.offer(buffer);
         }
 
-
         final Match gm = grok.match(contentString);
         gm.captures();
 
-
         if (gm.toMap().isEmpty()) {
             session.transfer(flowFile, REL_NO_MATCH);
             getLogger().info("Did not match any Grok Expressions for FlowFile 
{}", new Object[]{flowFile});
             return;
         }
+
         final ObjectMapper objectMapper = new ObjectMapper();
         switch (context.getProperty(DESTINATION).getValue()) {
             case FLOWFILE_ATTRIBUTE:
-
                 Map<String, String> grokResults = new HashMap<>();
                 for (Map.Entry<String, Object> entry : gm.toMap().entrySet()) {
                     if (null != entry.getValue()) {
@@ -244,13 +232,10 @@ public class ExtractGrok extends AbstractProcessor {
 
                 break;
             case FLOWFILE_CONTENT:
-
                 FlowFile conFlowfile = session.write(flowFile, new 
StreamCallback() {
                     @Override
                     public void process(InputStream in, OutputStream out) 
throws IOException {
-                        try (OutputStream outputStream = new 
BufferedOutputStream(out)) {
-                            
outputStream.write(objectMapper.writeValueAsBytes(gm.toMap()));
-                        }
+                        out.write(objectMapper.writeValueAsBytes(gm.toMap()));
                     }
                 });
                 conFlowfile = session.putAttribute(conFlowfile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
@@ -258,15 +243,12 @@ public class ExtractGrok extends AbstractProcessor {
                 session.transfer(conFlowfile, REL_MATCH);
 
                 break;
-
         }
-
     }
 
 
     public static final Validator validateGrokExpression() {
         return new Validator() {
-
             @Override
             public ValidationResult validate(String subject, String input, 
ValidationContext context) {
 
@@ -290,10 +272,8 @@ public class ExtractGrok extends AbstractProcessor {
                 }
 
                 return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-
             }
         };
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a5d63067/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
index 580b308..b503c40 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
@@ -31,9 +31,8 @@ import java.nio.file.Paths;
 public class TestExtractGrok {
 
     private TestRunner testRunner;
-    final static Path GROK_LOG_INPUT = 
Paths.get("src/test/resources/TestExtractGrok/apache.log");
-    final static Path GROK_TEXT_INPUT = 
Paths.get("src/test/resources/TestExtractGrok/simple_text.log");
-
+    private final static Path GROK_LOG_INPUT = 
Paths.get("src/test/resources/TestExtractGrok/apache.log");
+    private final static Path GROK_TEXT_INPUT = 
Paths.get("src/test/resources/TestExtractGrok/simple_text.log");
 
     @Before
     public void init() {
@@ -42,8 +41,6 @@ public class TestExtractGrok {
 
     @Test
     public void testExtractGrokWithMatchedContent() throws IOException {
-
-
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, 
"%{COMMONAPACHELOG}");
         testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, 
"src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_LOG_INPUT);
@@ -59,13 +56,10 @@ public class TestExtractGrok {
         matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 
-0800");
         
matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables");
         matched.assertAttributeEquals("grok.httpversion","1.1");
-
     }
 
     @Test
     public void testExtractGrokWithUnMatchedContent() throws IOException {
-
-
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}");
         testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, 
"src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_TEXT_INPUT);
@@ -73,29 +67,23 @@ public class TestExtractGrok {
         testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH);
         final MockFlowFile notMatched = 
testRunner.getFlowFilesForRelationship(ExtractGrok.REL_NO_MATCH).get(0);
         notMatched.assertContentEquals(GROK_TEXT_INPUT);
-
     }
 
     @Test
     public void testExtractGrokWithNotFoundPatternFile() throws IOException {
-
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, 
"%{COMMONAPACHELOG}");
         testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, 
"src/test/resources/TestExtractGrok/toto_file");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.assertNotValid();
-
     }
 
 
     @Test
     public void testExtractGrokWithBadGrokExpression() throws IOException {
-
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO");
         testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, 
"src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.assertNotValid();
-
-
     }
 
 }

Reply via email to