Repository: nifi
Updated Branches:
  refs/heads/master 6937a6cf6 -> a1b07b1e9


NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when 
determining the schema

This closes #1839.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1b07b1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1b07b1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1b07b1e

Branch: refs/heads/master
Commit: a1b07b1e9c388d8642699b1d8b101a606dc5bd6a
Parents: 6937a6c
Author: Mark Payne <[email protected]>
Authored: Mon May 22 16:04:34 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Mon May 22 16:30:47 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/grok/GrokReader.java   | 45 ++++++++++++++------
 .../apache/nifi/grok/TestGrokRecordReader.java  | 40 +++++++++++++++++
 2 files changed, 73 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1b07b1e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index a874632..dcf8b5a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -133,35 +133,56 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
         appendUnmatchedLine = 
context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
 
-        this.recordSchema = createRecordSchema(grok);
+        final String schemaAccess = 
context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        if 
(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
+            this.recordSchema = createRecordSchema(grok);
+        } else {
+            this.recordSchema = null;
+        }
     }
 
     static RecordSchema createRecordSchema(final Grok grok) {
         final List<RecordField> fields = new ArrayList<>();
 
         String grokExpression = grok.getOriginalGrokPattern();
+        populateSchemaFieldNames(grok, grokExpression, fields);
+
+        fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        return schema;
+    }
+
+    private static void populateSchemaFieldNames(final Grok grok, String 
grokExpression, final List<RecordField> fields) {
         while (grokExpression.length() > 0) {
             final Matcher matcher = 
GrokUtils.GROK_PATTERN.matcher(grokExpression);
             if (matcher.find()) {
-                final Map<String, String> namedGroups = 
GrokUtils.namedGroups(matcher, grokExpression);
-                final String fieldName = namedGroups.get("subname");
+                final Map<String, String> extractedGroups = 
GrokUtils.namedGroups(matcher, grokExpression);
+                final String subName = extractedGroups.get("subname");
 
-                DataType dataType = RecordFieldType.STRING.getDataType();
-                final RecordField recordField = new RecordField(fieldName, 
dataType);
-                fields.add(recordField);
+                if (subName == null) {
+                    final String subPatternName = 
extractedGroups.get("pattern");
+                    if (subPatternName == null) {
+                        continue;
+                    }
+
+                    final String subExpression = 
grok.getPatterns().get(subPatternName);
+                    populateSchemaFieldNames(grok, subExpression, fields);
+                } else {
+                    DataType dataType = RecordFieldType.STRING.getDataType();
+                    final RecordField recordField = new RecordField(subName, 
dataType);
+                    fields.add(recordField);
+                }
 
                 if (grokExpression.length() > matcher.end() + 1) {
-                    grokExpression = grokExpression.substring(matcher.end() + 
1);
+                    grokExpression = grokExpression.substring(matcher.end());
                 } else {
                     break;
                 }
+            } else {
+                break;
             }
         }
-
-        fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType()));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        return schema;
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1b07b1e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index ae5d433..1f9d572 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Test;
 
 import io.thekraken.grok.api.Grok;
@@ -186,4 +189,41 @@ public class TestGrokRecordReader {
         }
     }
 
+    @Test
+    public void testInheritNamedParameters() throws FileNotFoundException, 
IOException, GrokException, MalformedRecordException {
+        final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My 
Message";
+        final byte[] msgBytes = syslogMsg.getBytes();
+
+        try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
+            final Grok grok = new Grok();
+            
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
+            grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
+
+            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final List<String> fieldNames = schema.getFieldNames();
+            assertEquals(8, fieldNames.size());
+            assertTrue(fieldNames.contains("timestamp"));
+            assertTrue(fieldNames.contains("logsource"));
+            assertTrue(fieldNames.contains("facility"));
+            assertTrue(fieldNames.contains("priority"));
+            assertTrue(fieldNames.contains("program"));
+            assertTrue(fieldNames.contains("pid"));
+            assertTrue(fieldNames.contains("message"));
+            assertTrue(fieldNames.contains("stackTrace"));  // always 
implicitly there
+
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, true);
+            final Record record = deserializer.nextRecord();
+
+            assertEquals("May 22 15:58:23", record.getValue("timestamp"));
+            assertEquals("my-host", record.getValue("logsource"));
+            assertNull(record.getValue("facility"));
+            assertNull(record.getValue("priority"));
+            assertEquals("nifi", record.getValue("program"));
+            assertEquals("12345", record.getValue("pid"));
+            assertEquals("My Message", record.getValue("message"));
+
+            assertNull(deserializer.nextRecord());
+        }
+    }
+
 }

Reply via email to