This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f4ad658 NIFI-8389 - Grok Reader - raw data strategy when no match
f4ad658 is described below
commit f4ad658faee24b48a266e9efc53d2b275a473cdd
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Apr 1 01:53:12 2021 +0200
NIFI-8389 - Grok Reader - raw data strategy when no match
added allowable value in property
added docs in additionalDetails
This closes #4966.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../main/java/org/apache/nifi/grok/GrokReader.java | 17 ++-
.../org/apache/nifi/grok/GrokRecordReader.java | 117 ++++++++-------
.../java/org/apache/nifi/grok/NoMatchStrategy.java | 24 +++
.../additionalDetails.html | 95 +++++++++++-
.../org/apache/nifi/grok/TestGrokRecordReader.java | 162 ++++++++++++++++++++-
5 files changed, 347 insertions(+), 68 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index ff24211..7dc6737 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -71,7 +71,7 @@ import java.util.regex.Matcher;
public class GrokReader extends SchemaRegistryService implements
RecordReaderFactory {
private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
- private volatile boolean appendUnmatchedLine;
+ private volatile NoMatchStrategy noMatchStrategy;
private volatile RecordSchema recordSchema;
private volatile RecordSchema recordSchemaFromGrok;
@@ -81,6 +81,8 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
"The line of text that does not match the Grok Expression will be
appended to the last field of the prior message.");
static final AllowableValue SKIP_LINE = new AllowableValue("skip-line",
"Skip Line",
"The line of text that does not match the Grok Expression will be
skipped.");
+ static final AllowableValue RAW_LINE = new AllowableValue("raw-line", "Raw
Line",
+ "The line of text that does not match the Grok Expression will
only be added to the _raw field.");
static final AllowableValue STRING_FIELDS_FROM_GROK_EXPRESSION = new
AllowableValue("string-fields-from-grok-expression", "Use String Fields From
Grok Expression",
"The schema will be derived by using the field names present in the
Grok Expression. All fields will be assumed to be of type String. Additionally,
a field will be included "
@@ -110,7 +112,7 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
.displayName("No Match Behavior")
.description("If a line of text is encountered and it does not match
the given Grok Expression, and it is not part of a stack trace, "
+ "this property specifies how the text should be processed.")
- .allowableValues(APPEND_TO_PREVIOUS_MESSAGE, SKIP_LINE)
+ .allowableValues(APPEND_TO_PREVIOUS_MESSAGE, SKIP_LINE, RAW_LINE)
.defaultValue(APPEND_TO_PREVIOUS_MESSAGE.getValue())
.required(true)
.build();
@@ -142,7 +144,14 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
grok =
grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
- appendUnmatchedLine =
context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
+
+
if(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
{
+ noMatchStrategy = NoMatchStrategy.APPEND;
+ } else if
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(RAW_LINE.getValue()))
{
+ noMatchStrategy = NoMatchStrategy.RAW;
+ } else {
+ noMatchStrategy = NoMatchStrategy.SKIP;
+ }
this.recordSchemaFromGrok = createRecordSchema(grok);
@@ -269,6 +278,6 @@ public class GrokReader extends SchemaRegistryService
implements RecordReaderFac
@Override
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok,
appendUnmatchedLine);
+ return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok,
noMatchStrategy);
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 0c82fb4..ebcefe5 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -43,7 +43,7 @@ import io.krakens.grok.api.Match;
public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
private final Grok grok;
- private final boolean append;
+ private final NoMatchStrategy noMatchStrategy;
private final RecordSchema schemaFromGrok;
private RecordSchema schema;
@@ -60,11 +60,11 @@ public class GrokRecordReader implements RecordReader {
+ "(?:Suppressed\\: )|"
+ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
- public GrokRecordReader(final InputStream in, final Grok grok, final
RecordSchema schema, final RecordSchema schemaFromGrok, final boolean append) {
+ public GrokRecordReader(final InputStream in, final Grok grok, final
RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy
noMatchStrategy) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.grok = grok;
this.schema = schema;
- this.append = append;
+ this.noMatchStrategy = noMatchStrategy;
this.schemaFromGrok = schemaFromGrok;
}
@@ -91,6 +91,10 @@ public class GrokRecordReader implements RecordReader {
final Match match = grok.match(line);
valueMap = match.capture();
+
+ if((valueMap == null || valueMap.isEmpty()) &&
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
+ break;
+ }
}
if (iterations == 0 && nextLine != null) {
@@ -104,14 +108,14 @@ public class GrokRecordReader implements RecordReader {
while ((nextLine = reader.readLine()) != null) {
final Match nextLineMatch = grok.match(nextLine);
final Map<String, Object> nextValueMap = nextLineMatch.capture();
- if (nextValueMap.isEmpty()) {
+ if (nextValueMap.isEmpty() &&
!noMatchStrategy.equals(NoMatchStrategy.RAW)) {
// next line did not match. Check if it indicates a Stack
Trace. If so, read until
// the stack trace ends. Otherwise, append the next line to
the last field in the record.
if (isStartOfStackTrace(nextLine)) {
stackTrace = readStackTrace(nextLine);
raw.append("\n").append(stackTrace);
break;
- } else if (append) {
+ } else if (noMatchStrategy.equals(NoMatchStrategy.APPEND)) {
trailingText.append("\n").append(nextLine);
raw.append("\n").append(nextLine);
}
@@ -128,68 +132,73 @@ public class GrokRecordReader implements RecordReader {
private Record createRecord(final Map<String, Object> valueMap, final
StringBuilder trailingText, final String stackTrace, final String raw, final
boolean coerceTypes, final boolean dropUnknown) {
final Map<String, Object> converted = new HashMap<>();
- for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
- final String fieldName = entry.getKey();
- final Object rawValue = entry.getValue();
-
- final Object normalizedValue;
- if (rawValue instanceof List) {
- final List<?> list = (List<?>) rawValue;
- final String[] array = new String[list.size()];
- for (int i = 0; i < list.size(); i++) {
- final Object rawObject = list.get(i);
- array[i] = rawObject == null ? null : rawObject.toString();
- }
- normalizedValue = array;
- } else {
- normalizedValue = rawValue == null ? null :
rawValue.toString();
- }
- final Optional<RecordField> optionalRecordField =
schema.getField(fieldName);
+ if(valueMap != null && !valueMap.isEmpty()) {
- final Object coercedValue;
- if (coerceTypes && optionalRecordField.isPresent()) {
- final RecordField field = optionalRecordField.get();
- final DataType fieldType = field.getDataType();
- coercedValue = convert(fieldType, normalizedValue, fieldName);
- } else {
- coercedValue = normalizedValue;
- }
+ for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
+ final String fieldName = entry.getKey();
+ final Object rawValue = entry.getValue();
- converted.put(fieldName, coercedValue);
- }
+ final Object normalizedValue;
+ if (rawValue instanceof List) {
+ final List<?> list = (List<?>) rawValue;
+ final String[] array = new String[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ final Object rawObject = list.get(i);
+ array[i] = rawObject == null ? null :
rawObject.toString();
+ }
+ normalizedValue = array;
+ } else {
+ normalizedValue = rawValue == null ? null :
rawValue.toString();
+ }
- // If there is any trailing text, determine the last column from the
grok schema
- // and then append the trailing text to it.
- if (append && trailingText.length() > 0) {
- String lastPopulatedFieldName = null;
- final List<RecordField> schemaFields = schemaFromGrok.getFields();
- for (int i = schemaFields.size() - 1; i >= 0; i--) {
- final RecordField field = schemaFields.get(i);
-
- Object value = converted.get(field.getFieldName());
- if (value != null) {
- lastPopulatedFieldName = field.getFieldName();
- break;
+ final Optional<RecordField> optionalRecordField =
schema.getField(fieldName);
+
+ final Object coercedValue;
+ if (coerceTypes && optionalRecordField.isPresent()) {
+ final RecordField field = optionalRecordField.get();
+ final DataType fieldType = field.getDataType();
+ coercedValue = convert(fieldType, normalizedValue,
fieldName);
+ } else {
+ coercedValue = normalizedValue;
}
- for (final String alias : field.getAliases()) {
- value = converted.get(alias);
+ converted.put(fieldName, coercedValue);
+ }
+
+ // If there is any trailing text, determine the last column from
the grok schema
+ // and then append the trailing text to it.
+ if (noMatchStrategy.equals(NoMatchStrategy.APPEND) &&
trailingText.length() > 0) {
+ String lastPopulatedFieldName = null;
+ final List<RecordField> schemaFields =
schemaFromGrok.getFields();
+ for (int i = schemaFields.size() - 1; i >= 0; i--) {
+ final RecordField field = schemaFields.get(i);
+
+ Object value = converted.get(field.getFieldName());
if (value != null) {
- lastPopulatedFieldName = alias;
+ lastPopulatedFieldName = field.getFieldName();
break;
}
+
+ for (final String alias : field.getAliases()) {
+ value = converted.get(alias);
+ if (value != null) {
+ lastPopulatedFieldName = alias;
+ break;
+ }
+ }
}
- }
- if (lastPopulatedFieldName != null) {
- final Object value = converted.get(lastPopulatedFieldName);
- if (value == null) {
- converted.put(lastPopulatedFieldName,
trailingText.toString());
- } else if (value instanceof String) { // if not a String it is
a List and we will just drop the trailing text
- converted.put(lastPopulatedFieldName, (String) value +
trailingText.toString());
+ if (lastPopulatedFieldName != null) {
+ final Object value = converted.get(lastPopulatedFieldName);
+ if (value == null) {
+ converted.put(lastPopulatedFieldName,
trailingText.toString());
+ } else if (value instanceof String) { // if not a String
it is a List and we will just drop the trailing text
+ converted.put(lastPopulatedFieldName, (String) value +
trailingText.toString());
+ }
}
}
+
}
converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
new file mode 100644
index 0000000..cc1defa
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+public enum NoMatchStrategy {
+ APPEND,
+ SKIP,
+ RAW
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
index a9059f7..e800fc2 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
@@ -35,6 +35,10 @@
compatible type, such as number. Additionally, if the schema
does not contain one of the fields in the
parsed data, that field will be ignored. This can be used to
filter out fields that are not of interest.
</p>
+
+ <p>
+ Note: a <code>_raw</code> field is also added to preserve the
original message.
+ </p>
<p>
The Required Property is named <code>Grok Expression</code> and
specifies how to parse each
@@ -47,11 +51,98 @@
produced by this Controller Service.
</p>
+ <h2>No Match Behavior</h2>
+
<p>
If a line is encountered in the FlowFile that does not match
the configured Grok Expression, it is assumed that the line
is part of the previous message. If the line is the start of a
stack trace, then the entire stack trace is read in and assigned
- to a field named <code>STACK_TRACE</code>. Otherwise, the line
is appended to the last field defined in the Grok Expression. This
- is done because typically the last field is a 'message' type of
field, which can consist of new-lines.
+ to a field named <code>STACK_TRACE</code>. Otherwise, the line
will be processed according to the value defined in the "No Match
+ Behavior" property.
+ </p>
+
+ <h3>Append to Previous Message</h3>
+
+ <p>
+ The line is appended to the last field defined in the Grok
Expression. This is typically done because the last field is a
+ 'message' type of field, which can consist of new-lines.
+ </p>
+
+ <h3>Skip Line</h3>
+
+ <p>
+ The line is completely dismissed.
+ </p>
+
+ <h3>Raw Line</h3>
+
+ <p>
+ The fields will be null except the <code>_raw</code> field that
will contain the line allowing further processing downstream.
+ </p>
+
+ <h2>
+ Examples
+ </h2>
+
+ <p>
+ Assuming two messages (<code><6>Feb 28 12:00:00 192.168.0.1
aliyun[11111]: [error] Syslog test</code> and <code>This is a
+ bad message...</code>) with the Grok Expression
<code><%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname}
+ %{WORD:ident}%{GREEDYDATA:message}</code> ; and assuming a JSON
Writer, the following results will be generated:
+ </p>
+
+ <h3>Append to Previous Message</h3>
+
+ <p>
+ <code><pre>
+[ {
+ "priority" : "6",
+ "timestamp" : "Feb 28 12:00:00",
+ "hostname" : "192.168.0.1",
+ "ident" : "aliyun",
+ "message" : "[11111]: [error] Syslog test\nThis is a bad message...",
+ "stackTrace" : null,
+ "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog
test\nThis is a bad message..."
+} ]
+ </pre></code>
+ </p>
+
+ <h3>Skip Line</h3>
+
+ <p>
+ <code><pre>
+[ {
+ "priority" : "6",
+ "timestamp" : "Feb 28 12:00:00",
+ "hostname" : "192.168.0.1",
+ "ident" : "aliyun",
+ "message" : "[11111]: [error] Syslog test",
+ "stackTrace" : null,
+ "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog test"
+} ]
+ </pre></code>
+ </p>
+
+ <h3>Raw Line</h3>
+
+ <p>
+ <code><pre>
+[ {
+ "priority" : "6",
+ "timestamp" : "Feb 28 12:00:00",
+ "hostname" : "192.168.0.1",
+ "ident" : "aliyun",
+ "message" : "[11111]: [error] Syslog test",
+ "stackTrace" : null,
+ "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog test"
+}, {
+ "priority" : null,
+ "timestamp" : null,
+ "hostname" : null,
+ "ident" : null,
+ "message" : null,
+ "stackTrace" : null,
+ "_raw" : "This is a bad message..."
+} ]
+ </pre></code>
</p>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 538faa8..f267028 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -60,7 +60,7 @@ public class TestGrokRecordReader {
public void testParseSingleLineLogMessages() throws GrokException,
IOException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/single-line-log-messages.txt"))) {
final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
true);
+ final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR",
"FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red",
"Green", "Blue", "Yellow"};
@@ -93,7 +93,7 @@ public class TestGrokRecordReader {
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election
Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure
we are able to capture stack traces";
final InputStream bais = new
ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
- final GrokRecordReader deserializer = new GrokRecordReader(bais, grok,
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
+ final GrokRecordReader deserializer = new GrokRecordReader(bais, grok,
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
final Object[] values = deserializer.nextRecord().getValues();
@@ -116,7 +116,7 @@ public class TestGrokRecordReader {
public void testParseNiFiSampleLog() throws IOException, GrokException,
MalformedRecordException {
try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/nifi-log-sample.log"))) {
final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
true);
+ final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO",
"WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
@@ -138,7 +138,7 @@ public class TestGrokRecordReader {
public void testParseNiFiSampleMultilineWithStackTrace() throws
IOException, GrokException, MalformedRecordException {
try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log")))
{
final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
true);
+ final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR",
"WARN", "WARN"};
for (int i = 0; i < logLevels.length; i++) {
@@ -171,7 +171,7 @@ public class TestGrokRecordReader {
public void testParseStackTrace() throws GrokException, IOException,
MalformedRecordException {
try (final InputStream fis = new FileInputStream(new
File("src/test/resources/grok/error-with-stack-trace.log"))) {
final Grok grok =
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
%{GREEDYDATA:message}");
- final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
true);
+ final GrokRecordReader deserializer = new GrokRecordReader(fis,
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok),
NoMatchStrategy.APPEND);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
final String[] messages = new String[] {"message without stack
trace",
@@ -236,7 +236,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("stackTrace")); // always
implicitly there
assertTrue(fieldNames.contains("_raw")); // always implicitly
there
- final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, true);
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.APPEND);
final Record record = deserializer.nextRecord();
assertEquals("May 22 15:58:23", record.getValue("timestamp"));
@@ -273,7 +273,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
- final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, false);
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.SKIP);
final Record record = deserializer.nextRecord();
assertEquals("1", record.getValue("first"));
@@ -288,6 +288,51 @@ public class TestGrokRecordReader {
}
@Test
+ public void testRawUnmatchedRecordFirstLine() throws GrokException,
IOException, MalformedRecordException {
+ final String nonMatchingRecord = "hello there";
+ final String matchingRecord = "1 2 3 4 5";
+
+ final String input = nonMatchingRecord + "\n" + matchingRecord;
+ final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+ try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+ final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+ final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final List<String> fieldNames = schema.getFieldNames();
+ assertEquals(7, fieldNames.size());
+ assertTrue(fieldNames.contains("first"));
+ assertTrue(fieldNames.contains("second"));
+ assertTrue(fieldNames.contains("third"));
+ assertTrue(fieldNames.contains("fourth"));
+ assertTrue(fieldNames.contains("fifth"));
+ assertTrue(fieldNames.contains("_raw"));
+
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.RAW);
+ Record record = deserializer.nextRecord();
+
+ assertEquals(null, record.getValue("first"));
+ assertEquals(null, record.getValue("second"));
+ assertEquals(null, record.getValue("third"));
+ assertEquals(null, record.getValue("fourth"));
+ assertEquals(null, record.getValue("fifth"));
+ assertEquals("hello there", record.getValue("_raw"));
+
+ record = deserializer.nextRecord();
+
+ assertEquals("1", record.getValue("first"));
+ assertEquals("2", record.getValue("second"));
+ assertEquals("3", record.getValue("third"));
+ assertEquals("4", record.getValue("fourth"));
+ assertEquals("5", record.getValue("fifth"));
+ assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+ assertNull(deserializer.nextRecord());
+ deserializer.close();
+ }
+ }
+
+ @Test
public void testSkipUnmatchedRecordMiddle() throws GrokException,
IOException, MalformedRecordException {
final String nonMatchingRecord = "hello there";
final String matchingRecord = "1 2 3 4 5";
@@ -306,8 +351,9 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("third"));
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
+ assertTrue(fieldNames.contains("_raw"));
- final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, false);
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.SKIP);
for (int i = 0; i < 2; i++) {
final Record record = deserializer.nextRecord();
@@ -316,10 +362,110 @@ public class TestGrokRecordReader {
assertEquals("3", record.getValue("third"));
assertEquals("4", record.getValue("fourth"));
assertEquals("5", record.getValue("fifth"));
+ assertEquals("1 2 3 4 5", record.getValue("_raw"));
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
+
+ @Test
+ public void testRawUnmatchedRecordMiddle() throws GrokException,
IOException, MalformedRecordException {
+ final String nonMatchingRecord = "hello there";
+ final String matchingRecord = "1 2 3 4 5";
+
+ final String input = matchingRecord + "\n" + nonMatchingRecord + "\n"
+ matchingRecord;
+ final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+ try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+ final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+ final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final List<String> fieldNames = schema.getFieldNames();
+ assertEquals(7, fieldNames.size());
+ assertTrue(fieldNames.contains("first"));
+ assertTrue(fieldNames.contains("second"));
+ assertTrue(fieldNames.contains("third"));
+ assertTrue(fieldNames.contains("fourth"));
+ assertTrue(fieldNames.contains("fifth"));
+ assertTrue(fieldNames.contains("_raw"));
+
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.RAW);
+ Record record = deserializer.nextRecord();
+
+ assertEquals("1", record.getValue("first"));
+ assertEquals("2", record.getValue("second"));
+ assertEquals("3", record.getValue("third"));
+ assertEquals("4", record.getValue("fourth"));
+ assertEquals("5", record.getValue("fifth"));
+ assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+ record = deserializer.nextRecord();
+
+ assertEquals(null, record.getValue("first"));
+ assertEquals(null, record.getValue("second"));
+ assertEquals(null, record.getValue("third"));
+ assertEquals(null, record.getValue("fourth"));
+ assertEquals(null, record.getValue("fifth"));
+ assertEquals("hello there", record.getValue("_raw"));
+
+ record = deserializer.nextRecord();
+
+ assertEquals("1", record.getValue("first"));
+ assertEquals("2", record.getValue("second"));
+ assertEquals("3", record.getValue("third"));
+ assertEquals("4", record.getValue("fourth"));
+ assertEquals("5", record.getValue("fifth"));
+ assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+ assertNull(deserializer.nextRecord());
+ deserializer.close();
+ }
+ }
+
+ @Test
+ public void testRawUnmatchedRecordlast() throws GrokException,
IOException, MalformedRecordException {
+ final String nonMatchingRecord = "hello there";
+ final String matchingRecord = "1 2 3 4 5";
+
+ final String input = matchingRecord + "\n" + nonMatchingRecord;
+ final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+ try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+ final Grok grok = grokCompiler.compile("%{NUMBER:first}
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+ final RecordSchema schema = GrokReader.createRecordSchema(grok);
+ final List<String> fieldNames = schema.getFieldNames();
+ assertEquals(7, fieldNames.size());
+ assertTrue(fieldNames.contains("first"));
+ assertTrue(fieldNames.contains("second"));
+ assertTrue(fieldNames.contains("third"));
+ assertTrue(fieldNames.contains("fourth"));
+ assertTrue(fieldNames.contains("fifth"));
+ assertTrue(fieldNames.contains("_raw"));
+
+ final GrokRecordReader deserializer = new GrokRecordReader(in,
grok, schema, schema, NoMatchStrategy.RAW);
+ Record record = deserializer.nextRecord();
+
+ assertEquals("1", record.getValue("first"));
+ assertEquals("2", record.getValue("second"));
+ assertEquals("3", record.getValue("third"));
+ assertEquals("4", record.getValue("fourth"));
+ assertEquals("5", record.getValue("fifth"));
+ assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+ record = deserializer.nextRecord();
+
+ assertEquals(null, record.getValue("first"));
+ assertEquals(null, record.getValue("second"));
+ assertEquals(null, record.getValue("third"));
+ assertEquals(null, record.getValue("fourth"));
+ assertEquals(null, record.getValue("fifth"));
+ assertEquals("hello there", record.getValue("_raw"));
+
+ assertNull(deserializer.nextRecord());
+ deserializer.close();
+ }
+ }
}
\ No newline at end of file