Repository: nifi Updated Branches: refs/heads/master 6937a6cf6 -> a1b07b1e9
NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when determining the schema This closes #1839. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1b07b1e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1b07b1e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1b07b1e Branch: refs/heads/master Commit: a1b07b1e9c388d8642699b1d8b101a606dc5bd6a Parents: 6937a6c Author: Mark Payne <[email protected]> Authored: Mon May 22 16:04:34 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Mon May 22 16:30:47 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/grok/GrokReader.java | 45 ++++++++++++++------ .../apache/nifi/grok/TestGrokRecordReader.java | 40 +++++++++++++++++ 2 files changed, 73 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1b07b1e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index a874632..dcf8b5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -133,35 +133,56 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()); - this.recordSchema = createRecordSchema(grok); + final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); + if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) { + this.recordSchema = createRecordSchema(grok); + } else { + this.recordSchema = null; + } } static RecordSchema createRecordSchema(final Grok grok) { final List<RecordField> fields = new ArrayList<>(); String grokExpression = grok.getOriginalGrokPattern(); + populateSchemaFieldNames(grok, grokExpression, fields); + + fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + return schema; + } + + private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List<RecordField> fields) { while (grokExpression.length() > 0) { final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); if (matcher.find()) { - final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression); - final String fieldName = namedGroups.get("subname"); + final Map<String, String> extractedGroups = GrokUtils.namedGroups(matcher, grokExpression); + final String subName = extractedGroups.get("subname"); - DataType dataType = RecordFieldType.STRING.getDataType(); - final RecordField recordField = new RecordField(fieldName, dataType); - fields.add(recordField); + if (subName == null) { + final String subPatternName = extractedGroups.get("pattern"); + if (subPatternName == null) { + continue; + } + + final String subExpression = grok.getPatterns().get(subPatternName); + populateSchemaFieldNames(grok, subExpression, fields); + } else { + DataType dataType = RecordFieldType.STRING.getDataType(); + final RecordField recordField = new RecordField(subName, dataType); + fields.add(recordField); + } if (grokExpression.length() > matcher.end() + 1) { - grokExpression = grokExpression.substring(matcher.end() + 1); + grokExpression = grokExpression.substring(matcher.end()); } else { break; } + } else { + break; } } - - fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); - - final RecordSchema schema = new SimpleRecordSchema(fields); - return schema; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a1b07b1e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index ae5d433..1f9d572 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import io.thekraken.grok.api.Grok; @@ -186,4 +189,41 @@ public class TestGrokRecordReader { } } + @Test + public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException { + final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message"; + final byte[] msgBytes = syslogMsg.getBytes(); + + try (final InputStream in = new ByteArrayInputStream(msgBytes)) { + final Grok grok = new Grok(); + grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); + grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}"); + + final RecordSchema schema = GrokReader.createRecordSchema(grok); + final List<String> fieldNames = schema.getFieldNames(); + assertEquals(8, fieldNames.size()); + assertTrue(fieldNames.contains("timestamp")); + assertTrue(fieldNames.contains("logsource")); + assertTrue(fieldNames.contains("facility")); + assertTrue(fieldNames.contains("priority")); + assertTrue(fieldNames.contains("program")); + assertTrue(fieldNames.contains("pid")); + assertTrue(fieldNames.contains("message")); + assertTrue(fieldNames.contains("stackTrace")); // always implicitly there + + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true); + final Record record = deserializer.nextRecord(); + + assertEquals("May 22 15:58:23", record.getValue("timestamp")); + assertEquals("my-host", record.getValue("logsource")); + assertNull(record.getValue("facility")); + assertNull(record.getValue("priority")); + assertEquals("nifi", record.getValue("program")); + assertEquals("12345", record.getValue("pid")); + assertEquals("My Message", record.getValue("message")); + + assertNull(deserializer.nextRecord()); + } + } + }
