This is an automated email from the ASF dual-hosted git repository.
otto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e6e4109 NIFI-9850 Add support for multiple expressions to GrokReader
(#5918)
e6e4109 is described below
commit e6e4109cf901d19e99f5d6b9df7a988750a16065
Author: exceptionfactory <[email protected]>
AuthorDate: Sat Apr 2 12:30:55 2022 -0500
NIFI-9850 Add support for multiple expressions to GrokReader (#5918)
* NIFI-9850 Added support for multiple expressions to GrokReader
- Updated Grok Expression property to support Resources
* NIFI-9850 Updated documentation for Fields from Grok Expression strategy
This closes #5918
Signed-off-by: Otto Fowler <[email protected]>
---
.../main/java/org/apache/nifi/grok/GrokReader.java | 109 +++++++----
.../org/apache/nifi/grok/GrokRecordReader.java | 56 +++---
.../java/org/apache/nifi/grok/TestGrokReader.java | 217 +++++++++++----------
.../org/apache/nifi/grok/TestGrokRecordReader.java | 96 ++++-----
4 files changed, 263 insertions(+), 215 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index e7459d7..6d4aa18 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -29,6 +29,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
@@ -48,6 +49,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -60,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
+import java.util.stream.Collectors;
@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record",
"reader", "regex", "pattern", "logstash"})
@CapabilityDescription("Provides a mechanism for reading unstructured text
data, such as log files, and structuring the data "
@@ -71,8 +74,7 @@ import java.util.regex.Matcher;
+ "no stack trace, it will have a NULL value for the stackTrace field
(assuming that the schema does in fact include a stackTrace field of type
String). "
+ "Assuming that the schema includes a '_raw' field of type String, the
raw message will be included in the Record.")
public class GrokReader extends SchemaRegistryService implements
RecordReaderFactory {
- private volatile GrokCompiler grokCompiler;
- private volatile Grok grok;
+ private volatile List<Grok> groks;
private volatile NoMatchStrategy noMatchStrategy;
private volatile RecordSchema recordSchema;
private volatile RecordSchema recordSchemaFromGrok;
@@ -87,8 +89,10 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
"The line of text that does not match the Grok Expression will
only be added to the _raw field.");
static final AllowableValue STRING_FIELDS_FROM_GROK_EXPRESSION = new
AllowableValue("string-fields-from-grok-expression", "Use String Fields From
Grok Expression",
- "The schema will be derived by using the field names present in the
Grok Expression. All fields will be assumed to be of type String. Additionally,
a field will be included "
- + "with a name of 'stackTrace' and a type of String.");
+ "The schema will be derived using the field names present in all
configured Grok Expressions. "
+ + "All schema fields will have a String type and will be marked as
nullable. "
+ + "The schema will also include a `stackTrace` field, and a `_raw`
field containing the input line string."
+ );
static final PropertyDescriptor PATTERN_FILE = new
PropertyDescriptor.Builder()
.name("Grok Pattern File")
@@ -102,10 +106,14 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
static final PropertyDescriptor GROK_EXPRESSION = new
PropertyDescriptor.Builder()
.name("Grok Expression")
+ .displayName("Grok Expressions")
.description("Specifies the format of a log line in Grok format. This
allows the Record Reader to understand how to parse each log line. "
- + "If a line in the log file does not match this pattern, the line
will be assumed to belong to the previous log message."
- + "If other Grok expressions are referenced by this expression,
they need to be supplied in the Grok Pattern File")
+ + "The property supports one or more Grok expressions. The Reader
attempts to parse input lines according to the configured order of the
expressions."
+ + "If a line in the log file does not match any expressions, the
line will be assumed to belong to the previous log message."
+ + "If other Grok patterns are referenced by this expression, they
need to be supplied in the Grok Pattern File property."
+ )
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .identifiesExternalResource(ResourceCardinality.SINGLE,
ResourceType.TEXT, ResourceType.URL, ResourceType.FILE)
.required(true)
.build();
@@ -130,11 +138,10 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
@OnEnabled
public void preCompile(final ConfigurationContext context) throws
GrokException, IOException {
- grokCompiler = GrokCompiler.newInstance();
+ GrokCompiler grokCompiler = GrokCompiler.newInstance();
- try (final InputStream in =
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
+ try (final Reader defaultPatterns = getDefaultPatterns()) {
+ grokCompiler.register(defaultPatterns);
}
if (context.getProperty(PATTERN_FILE).isSet()) {
@@ -144,10 +151,11 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
}
}
- grok =
grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
-
+ groks = readGrokExpressions(context).stream()
+ .map(grokCompiler::compile)
+ .collect(Collectors.toList());
-
if(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
{
+ if
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
{
noMatchStrategy = NoMatchStrategy.APPEND;
} else if
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(RAW_LINE.getValue()))
{
noMatchStrategy = NoMatchStrategy.RAW;
@@ -155,7 +163,7 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
noMatchStrategy = NoMatchStrategy.SKIP;
}
- this.recordSchemaFromGrok = createRecordSchema(grok);
+ this.recordSchemaFromGrok = createRecordSchema(groks);
final String schemaAccess =
context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if
(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
@@ -167,42 +175,61 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
@Override
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- ArrayList<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
- // validate the grok expression against configuration
- GrokCompiler grokCompiler = GrokCompiler.newInstance();
- String subject = GROK_EXPRESSION.getName();
- String input =
validationContext.getProperty(GROK_EXPRESSION).getValue();
- GrokExpressionValidator validator;
-
- try (final InputStream in =
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
- } catch (IOException e) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+ final GrokCompiler grokCompiler = GrokCompiler.newInstance();
+
+ final String expressionSubject = GROK_EXPRESSION.getDisplayName();
+
+ try (final Reader defaultPatterns = getDefaultPatterns()) {
+ grokCompiler.register(defaultPatterns);
+ } catch (final IOException e) {
results.add(new ValidationResult.Builder()
- .input(input)
- .subject(subject)
+ .input("Default Grok Patterns")
+ .subject(expressionSubject)
.valid(false)
.explanation("Unable to load default patterns: " +
e.getMessage())
.build());
}
- validator = new
GrokExpressionValidator(validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue(),grokCompiler);
- results.add(validator.validate(subject,input,validationContext));
+ final String patternFileName =
validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
+ final GrokExpressionValidator validator = new
GrokExpressionValidator(patternFileName, grokCompiler);
+
+ try {
+ final List<String> grokExpressions =
readGrokExpressions(validationContext);
+ final List<ValidationResult> grokExpressionResults =
grokExpressions.stream()
+ .map(grokExpression ->
validator.validate(expressionSubject, grokExpression,
validationContext)).collect(Collectors.toList());
+ results.addAll(grokExpressionResults);
+ } catch (final IOException e) {
+ results.add(new ValidationResult.Builder()
+ .input("Configured Grok Expressions")
+ .subject(expressionSubject)
+ .valid(false)
+ .explanation(String.format("Read Grok Expressions failed:
%s", e.getMessage()))
+ .build());
+ }
+
return results;
}
- static RecordSchema createRecordSchema(final Grok grok) {
+ private List<String> readGrokExpressions(final PropertyContext
propertyContext) throws IOException {
+ final ResourceReference expressionsResource =
propertyContext.getProperty(GROK_EXPRESSION).asResource();
+ try (
+ final InputStream expressionsStream =
expressionsResource.read();
+ final BufferedReader expressionsReader = new
BufferedReader(new InputStreamReader(expressionsStream))
+ ) {
+ return expressionsReader.lines().collect(Collectors.toList());
+ }
+ }
+
+ static RecordSchema createRecordSchema(final List<Grok> groks) {
final Set<RecordField> fields = new LinkedHashSet<>();
- String grokExpression = grok.getOriginalGrokPattern();
- populateSchemaFieldNames(grok, grokExpression, fields);
+ groks.forEach(grok -> populateSchemaFieldNames(grok,
grok.getOriginalGrokPattern(), fields));
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME,
RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME,
RecordFieldType.STRING.getDataType(), true));
- final RecordSchema schema = new SimpleRecordSchema(new
ArrayList<>(fields));
-
- return schema;
+ return new SimpleRecordSchema(new ArrayList<>(fields));
}
private static void populateSchemaFieldNames(final Grok grok, String
grokExpression, final Collection<RecordField> fields) {
@@ -267,7 +294,7 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
@Override
- public RecordSchema getSchema(Map<String, String> variables,
InputStream contentStream, RecordSchema readSchema) throws
SchemaNotFoundException {
+ public RecordSchema getSchema(Map<String, String> variables,
InputStream contentStream, RecordSchema readSchema) {
return recordSchema;
}
@@ -281,6 +308,14 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
@Override
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok,
noMatchStrategy);
+ return new GrokRecordReader(in, groks, schema, recordSchemaFromGrok,
noMatchStrategy);
+ }
+
+ private Reader getDefaultPatterns() throws IOException {
+ final InputStream inputStream =
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
+ if (inputStream == null) {
+ throw new IOException(String.format("Default Patterns [%s] not
found", DEFAULT_PATTERN_NAME));
+ }
+ return new InputStreamReader(inputStream);
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 48317c9..51a9228 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,14 +41,13 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import io.krakens.grok.api.Grok;
-import io.krakens.grok.api.Match;
public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
- private final Grok grok;
+ private final List<Grok> groks;
private final NoMatchStrategy noMatchStrategy;
private final RecordSchema schemaFromGrok;
- private RecordSchema schema;
+ private final RecordSchema schema;
private String nextLine;
Map<String, Object> nextMap = null;
@@ -56,15 +56,19 @@ public class GrokRecordReader implements RecordReader {
static final String RAW_MESSAGE_NAME = "_raw";
private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
- "^\\s*(?:(?: |\\t)+at )|"
- + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
- + "(?:Caused by\\: )|"
- + "(?:Suppressed\\: )|"
+ "^\\s*(?:(?:\\s{4}|\\t)+at )|"
+ + "(?:(?:\\s{4}|\\t)+\\[CIRCULAR REFERENCE:)|"
+ + "(?:Caused by: )|"
+ + "(?:Suppressed: )|"
+ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
public GrokRecordReader(final InputStream in, final Grok grok, final
RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy
noMatchStrategy) {
+ this(in, Collections.singletonList(grok), schema, schemaFromGrok,
noMatchStrategy);
+ }
+
+ public GrokRecordReader(final InputStream in, final List<Grok> groks,
final RecordSchema schema, final RecordSchema schemaFromGrok, final
NoMatchStrategy noMatchStrategy) {
this.reader = new BufferedReader(new InputStreamReader(in));
- this.grok = grok;
+ this.groks = groks;
this.schema = schema;
this.noMatchStrategy = noMatchStrategy;
this.schemaFromGrok = schemaFromGrok;
@@ -91,10 +95,8 @@ public class GrokRecordReader implements RecordReader {
return null;
}
- final Match match = grok.match(line);
- valueMap = match.capture();
-
- if((valueMap == null || valueMap.isEmpty()) &&
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
+ valueMap = capture(line);
+ if ((valueMap == null || valueMap.isEmpty()) &&
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
break;
}
}
@@ -108,8 +110,7 @@ public class GrokRecordReader implements RecordReader {
String stackTrace = null;
final StringBuilder trailingText = new StringBuilder();
while ((nextLine = reader.readLine()) != null) {
- final Match nextLineMatch = grok.match(nextLine);
- final Map<String, Object> nextValueMap = nextLineMatch.capture();
+ final Map<String, Object> nextValueMap = capture(nextLine);
if (nextValueMap.isEmpty() &&
!noMatchStrategy.equals(NoMatchStrategy.RAW)) {
// next line did not match. Check if it indicates a Stack
Trace. If so, read until
// the stack trace ends. Otherwise, append the next line to
the last field in the record.
@@ -128,14 +129,13 @@ public class GrokRecordReader implements RecordReader {
}
}
- final Record record = createRecord(valueMap, trailingText, stackTrace,
raw.toString(), coerceTypes, dropUnknownFields);
- return record;
+ return createRecord(valueMap, trailingText, stackTrace,
raw.toString(), coerceTypes);
}
- private Record createRecord(final Map<String, Object> valueMap, final
StringBuilder trailingText, final String stackTrace, final String raw, final
boolean coerceTypes, final boolean dropUnknown) {
+ private Record createRecord(final Map<String, Object> valueMap, final
StringBuilder trailingText, final String stackTrace, final String raw, final
boolean coerceTypes) {
final Map<String, Object> converted = new HashMap<>();
- if(valueMap != null && !valueMap.isEmpty()) {
+ if (valueMap != null && !valueMap.isEmpty()) {
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
@@ -203,7 +203,7 @@ public class GrokRecordReader implements RecordReader {
if (value == null) {
converted.put(lastPopulatedFieldName,
trailingText.toString());
} else if (value instanceof String) { // if not a String
it is a List and we will just drop the trailing text
- converted.put(lastPopulatedFieldName, (String) value +
trailingText.toString());
+ converted.put(lastPopulatedFieldName, value +
trailingText.toString());
}
}
}
@@ -238,11 +238,7 @@ public class GrokRecordReader implements RecordReader {
return false;
}
- if (line.indexOf(" ") < index) {
- return false;
- }
-
- return true;
+ return line.indexOf(" ") >= index;
}
private String readStackTrace(final String firstLine) throws IOException {
@@ -291,4 +287,16 @@ public class GrokRecordReader implements RecordReader {
return schema;
}
+ private Map<String, Object> capture(final String log) {
+ Map<String, Object> capture = Collections.emptyMap();
+
+ for (final Grok grok : groks) {
+ capture = grok.capture(log);
+ if (!capture.isEmpty()) {
+ break;
+ }
+ }
+
+ return capture;
+ }
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
index 4c29c1b..0a5f3fc 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -16,90 +16,58 @@
*/
package org.apache.nifi.grok;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class TestGrokReader {
private TestRunner runner;
- private List<Record> records;
- private static final PropertyDescriptor READER = new
PropertyDescriptor.Builder()
- .name("reader")
- .identifiesControllerService(GrokReader.class)
- .build();
+ private static final String TIMESTAMP_FIELD = "timestamp";
+
+ private static final String LEVEL_FIELD = "level";
+
+ private static final String FACILITY_FIELD = "facility";
+
+ private static final String PROGRAM_FIELD = "program";
+
+ private static final String MESSAGE_FIELD = "message";
+
+ private static final String STACKTRACE_FIELD = "stackTrace";
+
+ private static final String RAW_FIELD = "_raw";
@BeforeEach
void setUp() {
- Processor processor = new AbstractProcessor() {
- Relationship SUCCESS = new Relationship.Builder()
- .name("success")
- .build();
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession
session) throws ProcessException {
- FlowFile flowFile = session.get();
- final RecordReaderFactory readerFactory =
context.getProperty(READER).asControllerService(RecordReaderFactory.class);
-
- try (final InputStream in = session.read(flowFile);
- final RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
- Record record;
- while ((record = reader.nextRecord()) != null) {
- records.add(record);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- session.transfer(flowFile, SUCCESS);
- }
-
- @Override
- protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
- return Arrays.asList(READER);
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return new HashSet<>(Arrays.asList(SUCCESS));
- }
- };
-
- runner = TestRunners.newTestRunner(processor);
-
- records = new ArrayList<>();
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
}
@Test
void testComplexGrokExpression() throws Exception {
- // GIVEN
String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage"
+ System.lineSeparator()
+ "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" +
System.lineSeparator();
@@ -107,65 +75,100 @@ public class TestGrokReader {
String grokExpression = "%{LINE}";
SimpleRecordSchema expectedSchema = new
SimpleRecordSchema(Arrays.asList(
- new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
- new RecordField("facility", RecordFieldType.STRING.getDataType()),
+ new RecordField(TIMESTAMP_FIELD,
RecordFieldType.STRING.getDataType()),
+ new RecordField(FACILITY_FIELD,
RecordFieldType.STRING.getDataType()),
new RecordField("priority", RecordFieldType.STRING.getDataType()),
new RecordField("logsource", RecordFieldType.STRING.getDataType()),
- new RecordField("program", RecordFieldType.STRING.getDataType()),
+ new RecordField(PROGRAM_FIELD,
RecordFieldType.STRING.getDataType()),
new RecordField("pid", RecordFieldType.STRING.getDataType()),
- new RecordField("message", RecordFieldType.STRING.getDataType()),
- new RecordField("stackTrace",
RecordFieldType.STRING.getDataType()),
- new RecordField("_raw", RecordFieldType.STRING.getDataType())
+ new RecordField(MESSAGE_FIELD,
RecordFieldType.STRING.getDataType()),
+ new RecordField(STACKTRACE_FIELD,
RecordFieldType.STRING.getDataType()),
+ new RecordField(RAW_FIELD, RecordFieldType.STRING.getDataType())
));
- List<Record> expectedRecords = Arrays.asList(
- new MapRecord(expectedSchema, new HashMap<String, Object>() {{
- put("timestamp", "1021-09-09 09:03:06");
- put("facility", null);
- put("priority", null);
- put("logsource", "127.0.0.1");
- put("program", "nifi");
- put("pid", "1000");
- put("message", " LogMessage");
- put("stackstrace", null);
- put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]:
LogMessage");
- }}),
- new MapRecord(expectedSchema, new HashMap<String, Object>() {{
- put("timestamp", "October 19 19:13:16");
- put("facility", null);
- put("priority", null);
- put("logsource", "127.0.0.1");
- put("program", "nifi");
- put("pid", "1000");
- put("message", " LogMessage2");
- put("stackstrace", null);
- put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]:
LogMessage2");
- }})
- );
-
- // WHEN
- GrokReader grokReader = new GrokReader();
-
- runner.addControllerService("grokReader", grokReader);
- runner.setProperty(READER, "grokReader");
-
+ final Record expectedFirstRecord = new MapRecord(expectedSchema, new
HashMap<String, Object>() {{
+ put(TIMESTAMP_FIELD, "1021-09-09 09:03:06");
+ put(FACILITY_FIELD, null);
+ put("priority", null);
+ put("logsource", "127.0.0.1");
+ put(PROGRAM_FIELD, "nifi");
+ put("pid", "1000");
+ put("message", " LogMessage");
+ put(STACKTRACE_FIELD, null);
+ put(RAW_FIELD, "1021-09-09 09:03:06 127.0.0.1 nifi[1000]:
LogMessage");
+ }});
+
+ final Record expectedSecondRecord = new MapRecord(expectedSchema, new
HashMap<String, Object>() {{
+ put(TIMESTAMP_FIELD, "October 19 19:13:16");
+ put(FACILITY_FIELD, null);
+ put("priority", null);
+ put("logsource", "127.0.0.1");
+ put(PROGRAM_FIELD, "nifi");
+ put("pid", "1000");
+ put(MESSAGE_FIELD, " LogMessage2");
+ put(STACKTRACE_FIELD, null);
+ put(RAW_FIELD, "October 19 19:13:16 127.0.0.1 nifi[1000]:
LogMessage2");
+ }});
+
+ final GrokReader grokReader = new GrokReader();
+ runner.addControllerService(GrokReader.class.getSimpleName(),
grokReader);
runner.setProperty(grokReader, GrokReader.PATTERN_FILE,
grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION,
grokExpression);
+ runner.enableControllerService(grokReader);
+
+ final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+ final ByteArrayInputStream inputStream = new
ByteArrayInputStream(inputBytes);
+ final RecordReader recordReader =
grokReader.createRecordReader(Collections.emptyMap(), inputStream,
inputBytes.length, runner.getLogger());
+
+ final Record firstRecord = recordReader.nextRecord();
+
+ assertArrayEquals(expectedFirstRecord.getValues(),
firstRecord.getValues());
+ assertEquals(expectedSchema, firstRecord.getSchema());
+ final Record secondRecord = recordReader.nextRecord();
+ assertArrayEquals(expectedSecondRecord.getValues(),
secondRecord.getValues());
+ assertEquals(expectedSchema, secondRecord.getSchema());
+
+ assertNull(recordReader.nextRecord());
+ }
+
+ @Test
+ public void testMultipleExpressions() throws InitializationException,
IOException, SchemaNotFoundException, MalformedRecordException {
+ final String program = "NiFi";
+ final String level = "INFO";
+ final String message = "Processing Started";
+ final String timestamp = "Jan 10 12:30:45";
+
+ final String logs = String.format("%s %s %s%n%s %s %s %s%n", program,
level, message, timestamp, program, level, message);
+ final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
+
+ final String matchingExpression = "%{PROG:program} %{LOGLEVEL:level}
%{GREEDYDATA:message}";
+ final String firstExpression = "%{SYSLOGTIMESTAMP:timestamp}
%{PROG:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
+ final String expressions = String.format("%s%n%s", firstExpression,
matchingExpression);
+
+ final GrokReader grokReader = new GrokReader();
+ runner.addControllerService(GrokReader.class.getSimpleName(),
grokReader);
+ runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION,
expressions);
+ runner.setProperty(grokReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
runner.enableControllerService(grokReader);
- runner.enqueue(input);
- runner.run();
+ final ByteArrayInputStream inputStream = new
ByteArrayInputStream(bytes);
+ final RecordReader recordReader =
grokReader.createRecordReader(Collections.emptyMap(), inputStream,
bytes.length, runner.getLogger());
- // THEN
- List<Function<Record, Object>> propertyProviders = Arrays.asList(
- Record::getSchema,
- Record::getValues
- );
+ final Record firstRecord = recordReader.nextRecord();
+ assertNotNull(firstRecord);
+ assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
+ assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
+ assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
+ assertNull(firstRecord.getValue(TIMESTAMP_FIELD));
- List<EqualsWrapper<Record>> wrappedExpected =
EqualsWrapper.wrapList(expectedRecords, propertyProviders);
- List<EqualsWrapper<Record>> wrappedActual =
EqualsWrapper.wrapList(records, propertyProviders);
+ final Record secondRecord = recordReader.nextRecord();
+ assertNotNull(secondRecord);
+ assertEquals(program, secondRecord.getValue(PROGRAM_FIELD));
+ assertEquals(level, secondRecord.getValue(LEVEL_FIELD));
+ assertEquals(message, secondRecord.getValue(MESSAGE_FIELD));
+ assertEquals(timestamp, secondRecord.getValue(TIMESTAMP_FIELD));
- Assertions.assertEquals(wrappedExpected, wrappedActual);
+ assertNull(recordReader.nextRecord());
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 84e93c7..0b34541 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,7 +45,7 @@ public class TestGrokRecordReader {
@BeforeAll
public static void beforeClass() throws Exception {
- try (final InputStream fis = new FileInputStream(new
File("src/main/resources/default-grok-patterns.txt"))) {
+ try (final InputStream fis = new
FileInputStream("src/main/resources/default-grok-patterns.txt")) {
grokCompiler = GrokCompiler.newInstance();
grokCompiler.register(fis);
}
@@ -58,9 +58,8 @@ public class TestGrokRecordReader {
@Test
public void testParseSingleLineLogMessages() throws GrokException,
IOException, MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/single-line-log-messages.txt"))) {
- final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
+ try (final InputStream fis = new
FileInputStream("src/test/resources/grok/single-line-log-messages.txt")) {
+ final GrokRecordReader deserializer =
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR",
"FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red",
"Green", "Blue", "Yellow"};
@@ -85,15 +84,13 @@ public class TestGrokRecordReader {
}
}
-
@Test
public void testParseEmptyMessageWithStackTrace() throws GrokException,
IOException, MalformedRecordException {
- final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp}
%{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
-
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election
Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure
we are able to capture stack traces";
final InputStream bais = new
ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
- final GrokRecordReader deserializer = new GrokRecordReader(bais, grok,
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
+
+ final GrokRecordReader deserializer =
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", bais);
final Object[] values = deserializer.nextRecord().getValues();
@@ -110,21 +107,18 @@ public class TestGrokRecordReader {
deserializer.close();
}
-
-
@Test
public void testParseNiFiSampleLog() throws IOException, GrokException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/nifi-log-sample.log"))) {
- final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
+ try (final InputStream fis = new
FileInputStream("src/test/resources/grok/nifi-log-sample.log")) {
+ final GrokRecordReader deserializer =
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO",
"WARN", "WARN"};
- for (int i = 0; i < logLevels.length; i++) {
+ for (String logLevel : logLevels) {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(7, values.length); // values[] contains 6
elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
- assertEquals(logLevels[i], values[1]);
+ assertEquals(logLevel, values[1]);
assertNull(values[5]);
assertNotNull(values[6]);
}
@@ -136,18 +130,17 @@ public class TestGrokRecordReader {
@Test
public void testParseNiFiSampleMultilineWithStackTrace() throws
IOException, GrokException, MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log")))
{
- final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
+ try (final InputStream fis = new
FileInputStream("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))
{
+ final GrokRecordReader deserializer =
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?", fis);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR",
"WARN", "WARN"};
- for (int i = 0; i < logLevels.length; i++) {
+ for (String logLevel : logLevels) {
final Record record = deserializer.nextRecord();
final Object[] values = record.getValues();
assertNotNull(values);
assertEquals(7, values.length); // values[] contains 6
elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
- assertEquals(logLevels[i], values[1]);
+ assertEquals(logLevel, values[1]);
if ("ERROR".equals(values[1])) {
final String msg = (String) values[4];
assertEquals("One\nTwo\nThree", msg);
@@ -166,12 +159,10 @@ public class TestGrokRecordReader {
}
}
-
@Test
public void testParseStackTrace() throws GrokException, IOException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/error-with-stack-trace.log"))) {
- final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
+ try (final InputStream fis = new
FileInputStream("src/test/resources/grok/error-with-stack-trace.log")) {
+ final GrokRecordReader deserializer =
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}", fis);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
final String[] messages = new String[] {"message without stack
trace",
@@ -223,7 +214,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
final Grok grok =
grokCompiler.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(9, fieldNames.size());
assertTrue(fieldNames.contains("timestamp"));
@@ -264,7 +255,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@@ -298,7 +289,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@@ -311,11 +302,11 @@ public class TestGrokRecordReader {
final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.RAW);
Record record = deserializer.nextRecord();
- assertEquals(null, record.getValue("first"));
- assertEquals(null, record.getValue("second"));
- assertEquals(null, record.getValue("third"));
- assertEquals(null, record.getValue("fourth"));
- assertEquals(null, record.getValue("fifth"));
+ assertNull(record.getValue("first"));
+ assertNull(record.getValue("second"));
+ assertNull(record.getValue("third"));
+ assertNull(record.getValue("fourth"));
+ assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
record = deserializer.nextRecord();
@@ -343,7 +334,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@@ -381,7 +372,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@@ -403,11 +394,11 @@ public class TestGrokRecordReader {
record = deserializer.nextRecord();
- assertEquals(null, record.getValue("first"));
- assertEquals(null, record.getValue("second"));
- assertEquals(null, record.getValue("third"));
- assertEquals(null, record.getValue("fourth"));
- assertEquals(null, record.getValue("fifth"));
+ assertNull(record.getValue("first"));
+ assertNull(record.getValue("second"));
+ assertNull(record.getValue("third"));
+ assertNull(record.getValue("fourth"));
+ assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
record = deserializer.nextRecord();
@@ -425,7 +416,7 @@ public class TestGrokRecordReader {
}
@Test
- public void testRawUnmatchedRecordlast() throws GrokException,
IOException, MalformedRecordException {
+ public void testRawUnmatchedRecordLast() throws GrokException,
IOException, MalformedRecordException {
final String nonMatchingRecord = "hello there";
final String matchingRecord = "1 2 3 4 5";
@@ -435,7 +426,7 @@ public class TestGrokRecordReader {
try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
- final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final RecordSchema schema = getRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
@@ -457,15 +448,26 @@ public class TestGrokRecordReader {
record = deserializer.nextRecord();
- assertEquals(null, record.getValue("first"));
- assertEquals(null, record.getValue("second"));
- assertEquals(null, record.getValue("third"));
- assertEquals(null, record.getValue("fourth"));
- assertEquals(null, record.getValue("fifth"));
+ assertNull(record.getValue("first"));
+ assertNull(record.getValue("second"));
+ assertNull(record.getValue("third"));
+ assertNull(record.getValue("fourth"));
+ assertNull(record.getValue("fifth"));
assertEquals("hello there", record.getValue("_raw"));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
+
+ private RecordSchema getRecordSchema(final Grok grok) {
+ final List<Grok> groks = Collections.singletonList(grok);
+ return GrokReader.createRecordSchema(groks);
+ }
+
+ private GrokRecordReader getRecordReader(final String pattern, final
InputStream inputStream) {
+ final Grok grok = grokCompiler.compile(pattern);
+ final RecordSchema recordSchema = getRecordSchema(grok);
+ return new GrokRecordReader(inputStream, grok, recordSchema,
recordSchema, NoMatchStrategy.APPEND);
+ }
}
\ No newline at end of file