This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f4ad658  NIFI-8389 - Grok Reader - raw data strategy when no match
f4ad658 is described below

commit f4ad658faee24b48a266e9efc53d2b275a473cdd
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Apr 1 01:53:12 2021 +0200

    NIFI-8389 - Grok Reader - raw data strategy when no match
    
    added allowable value in property
    added docs in additionalDetails
    
    This closes #4966.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../main/java/org/apache/nifi/grok/GrokReader.java |  17 ++-
 .../org/apache/nifi/grok/GrokRecordReader.java     | 117 ++++++++-------
 .../java/org/apache/nifi/grok/NoMatchStrategy.java |  24 +++
 .../additionalDetails.html                         |  95 +++++++++++-
 .../org/apache/nifi/grok/TestGrokRecordReader.java | 162 ++++++++++++++++++++-
 5 files changed, 347 insertions(+), 68 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index ff24211..7dc6737 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -71,7 +71,7 @@ import java.util.regex.Matcher;
 public class GrokReader extends SchemaRegistryService implements 
RecordReaderFactory {
     private volatile GrokCompiler grokCompiler;
     private volatile Grok grok;
-    private volatile boolean appendUnmatchedLine;
+    private volatile NoMatchStrategy noMatchStrategy;
     private volatile RecordSchema recordSchema;
     private volatile RecordSchema recordSchemaFromGrok;
 
@@ -81,6 +81,8 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
         "The line of text that does not match the Grok Expression will be 
appended to the last field of the prior message.");
     static final AllowableValue SKIP_LINE = new AllowableValue("skip-line", 
"Skip Line",
         "The line of text that does not match the Grok Expression will be 
skipped.");
+    static final AllowableValue RAW_LINE = new AllowableValue("raw-line", "Raw 
Line",
+            "The line of text that does not match the Grok Expression will 
only be added to the _raw field.");
 
     static final AllowableValue STRING_FIELDS_FROM_GROK_EXPRESSION = new 
AllowableValue("string-fields-from-grok-expression", "Use String Fields From 
Grok Expression",
         "The schema will be derived by using the field names present in the 
Grok Expression. All fields will be assumed to be of type String. Additionally, 
a field will be included "
@@ -110,7 +112,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
         .displayName("No Match Behavior")
         .description("If a line of text is encountered and it does not match 
the given Grok Expression, and it is not part of a stack trace, "
             + "this property specifies how the text should be processed.")
-        .allowableValues(APPEND_TO_PREVIOUS_MESSAGE, SKIP_LINE)
+        .allowableValues(APPEND_TO_PREVIOUS_MESSAGE, SKIP_LINE, RAW_LINE)
         .defaultValue(APPEND_TO_PREVIOUS_MESSAGE.getValue())
         .required(true)
         .build();
@@ -142,7 +144,14 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
         grok = 
grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
 
-        appendUnmatchedLine = 
context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
+
+        
if(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
 {
+            noMatchStrategy = NoMatchStrategy.APPEND;
+        } else if 
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(RAW_LINE.getValue()))
 {
+            noMatchStrategy = NoMatchStrategy.RAW;
+        } else {
+            noMatchStrategy = NoMatchStrategy.SKIP;
+        }
 
         this.recordSchemaFromGrok = createRecordSchema(grok);
 
@@ -269,6 +278,6 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
     @Override
     public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger) throws IOException, SchemaNotFoundException {
         final RecordSchema schema = getSchema(variables, in, null);
-        return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, 
appendUnmatchedLine);
+        return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, 
noMatchStrategy);
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 0c82fb4..ebcefe5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -43,7 +43,7 @@ import io.krakens.grok.api.Match;
 public class GrokRecordReader implements RecordReader {
     private final BufferedReader reader;
     private final Grok grok;
-    private final boolean append;
+    private final NoMatchStrategy noMatchStrategy;
     private final RecordSchema schemaFromGrok;
     private RecordSchema schema;
 
@@ -60,11 +60,11 @@ public class GrokRecordReader implements RecordReader {
             + "(?:Suppressed\\: )|"
             + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
 
-    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema, final RecordSchema schemaFromGrok, final boolean append) {
+    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy 
noMatchStrategy) {
         this.reader = new BufferedReader(new InputStreamReader(in));
         this.grok = grok;
         this.schema = schema;
-        this.append = append;
+        this.noMatchStrategy = noMatchStrategy;
         this.schemaFromGrok = schemaFromGrok;
     }
 
@@ -91,6 +91,10 @@ public class GrokRecordReader implements RecordReader {
 
             final Match match = grok.match(line);
             valueMap = match.capture();
+
+            if((valueMap == null || valueMap.isEmpty()) && 
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
+                break;
+            }
         }
 
         if (iterations == 0 && nextLine != null) {
@@ -104,14 +108,14 @@ public class GrokRecordReader implements RecordReader {
         while ((nextLine = reader.readLine()) != null) {
             final Match nextLineMatch = grok.match(nextLine);
             final Map<String, Object> nextValueMap = nextLineMatch.capture();
-            if (nextValueMap.isEmpty()) {
+            if (nextValueMap.isEmpty() && 
!noMatchStrategy.equals(NoMatchStrategy.RAW)) {
                 // next line did not match. Check if it indicates a Stack 
Trace. If so, read until
                 // the stack trace ends. Otherwise, append the next line to 
the last field in the record.
                 if (isStartOfStackTrace(nextLine)) {
                     stackTrace = readStackTrace(nextLine);
                     raw.append("\n").append(stackTrace);
                     break;
-                } else if (append) {
+                } else if (noMatchStrategy.equals(NoMatchStrategy.APPEND)) {
                     trailingText.append("\n").append(nextLine);
                     raw.append("\n").append(nextLine);
                 }
@@ -128,68 +132,73 @@ public class GrokRecordReader implements RecordReader {
 
     private Record createRecord(final Map<String, Object> valueMap, final 
StringBuilder trailingText, final String stackTrace, final String raw, final 
boolean coerceTypes, final boolean dropUnknown) {
         final Map<String, Object> converted = new HashMap<>();
-        for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
-            final String fieldName = entry.getKey();
-            final Object rawValue = entry.getValue();
-
-            final Object normalizedValue;
-            if (rawValue instanceof List) {
-                final List<?> list = (List<?>) rawValue;
-                final String[] array = new String[list.size()];
-                for (int i = 0; i < list.size(); i++) {
-                    final Object rawObject = list.get(i);
-                    array[i] = rawObject == null ? null : rawObject.toString();
-                }
-                normalizedValue = array;
-            } else {
-                normalizedValue = rawValue == null ? null : 
rawValue.toString();
-            }
 
-            final Optional<RecordField> optionalRecordField = 
schema.getField(fieldName);
+        if(valueMap != null && !valueMap.isEmpty()) {
 
-            final Object coercedValue;
-            if (coerceTypes && optionalRecordField.isPresent()) {
-                final RecordField field = optionalRecordField.get();
-                final DataType fieldType = field.getDataType();
-                coercedValue = convert(fieldType, normalizedValue, fieldName);
-            } else {
-                coercedValue = normalizedValue;
-            }
+            for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
+                final String fieldName = entry.getKey();
+                final Object rawValue = entry.getValue();
 
-            converted.put(fieldName, coercedValue);
-        }
+                final Object normalizedValue;
+                if (rawValue instanceof List) {
+                    final List<?> list = (List<?>) rawValue;
+                    final String[] array = new String[list.size()];
+                    for (int i = 0; i < list.size(); i++) {
+                        final Object rawObject = list.get(i);
+                        array[i] = rawObject == null ? null : 
rawObject.toString();
+                    }
+                    normalizedValue = array;
+                } else {
+                    normalizedValue = rawValue == null ? null : 
rawValue.toString();
+                }
 
-        // If there is any trailing text, determine the last column from the 
grok schema
-        // and then append the trailing text to it.
-        if (append && trailingText.length() > 0) {
-            String lastPopulatedFieldName = null;
-            final List<RecordField> schemaFields = schemaFromGrok.getFields();
-            for (int i = schemaFields.size() - 1; i >= 0; i--) {
-                final RecordField field = schemaFields.get(i);
-
-                Object value = converted.get(field.getFieldName());
-                if (value != null) {
-                    lastPopulatedFieldName = field.getFieldName();
-                    break;
+                final Optional<RecordField> optionalRecordField = 
schema.getField(fieldName);
+
+                final Object coercedValue;
+                if (coerceTypes && optionalRecordField.isPresent()) {
+                    final RecordField field = optionalRecordField.get();
+                    final DataType fieldType = field.getDataType();
+                    coercedValue = convert(fieldType, normalizedValue, 
fieldName);
+                } else {
+                    coercedValue = normalizedValue;
                 }
 
-                for (final String alias : field.getAliases()) {
-                    value = converted.get(alias);
+                converted.put(fieldName, coercedValue);
+            }
+
+            // If there is any trailing text, determine the last column from 
the grok schema
+            // and then append the trailing text to it.
+            if (noMatchStrategy.equals(NoMatchStrategy.APPEND) && 
trailingText.length() > 0) {
+                String lastPopulatedFieldName = null;
+                final List<RecordField> schemaFields = 
schemaFromGrok.getFields();
+                for (int i = schemaFields.size() - 1; i >= 0; i--) {
+                    final RecordField field = schemaFields.get(i);
+
+                    Object value = converted.get(field.getFieldName());
                     if (value != null) {
-                        lastPopulatedFieldName = alias;
+                        lastPopulatedFieldName = field.getFieldName();
                         break;
                     }
+
+                    for (final String alias : field.getAliases()) {
+                        value = converted.get(alias);
+                        if (value != null) {
+                            lastPopulatedFieldName = alias;
+                            break;
+                        }
+                    }
                 }
-            }
 
-            if (lastPopulatedFieldName != null) {
-                final Object value = converted.get(lastPopulatedFieldName);
-                if (value == null) {
-                    converted.put(lastPopulatedFieldName, 
trailingText.toString());
-                } else if (value instanceof String) { // if not a String it is 
a List and we will just drop the trailing text
-                    converted.put(lastPopulatedFieldName, (String) value + 
trailingText.toString());
+                if (lastPopulatedFieldName != null) {
+                    final Object value = converted.get(lastPopulatedFieldName);
+                    if (value == null) {
+                        converted.put(lastPopulatedFieldName, 
trailingText.toString());
+                    } else if (value instanceof String) { // if not a String 
it is a List and we will just drop the trailing text
+                        converted.put(lastPopulatedFieldName, (String) value + 
trailingText.toString());
+                    }
                 }
             }
+
         }
 
         converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
new file mode 100644
index 0000000..cc1defa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/NoMatchStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+public enum NoMatchStrategy {
+    APPEND,
+    SKIP,
+    RAW
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
index a9059f7..e800fc2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.grok.GrokReader/additionalDetails.html
@@ -35,6 +35,10 @@
                compatible type, such as number. Additionally, if the schema 
does not contain one of the fields in the
                parsed data, that field will be ignored. This can be used to 
filter out fields that are not of interest.
                </p>
+
+        <p>
+               Note: a <code>_raw</code> field is also added to preserve the 
original message.
+               </p>
                
                <p>
                The Required Property is named <code>Grok Expression</code> and 
specifies how to parse each
@@ -47,11 +51,98 @@
                produced by this Controller Service.
         </p>
         
+        <h2>No Match Behavior</h2>
+        
         <p>
                If a line is encountered in the FlowFile that does not match 
the configured Grok Expression, it is assumed that the line
                is part of the previous message. If the line is the start of a 
stack trace, then the entire stack trace is read in and assigned
-               to a field named <code>STACK_TRACE</code>. Otherwise, the line 
is appended to the last field defined in the Grok Expression. This
-               is done because typically the last field is a 'message' type of 
field, which can consist of new-lines.
+               to a field named <code>STACK_TRACE</code>. Otherwise, the line 
will be processed according to the value defined in the "No Match 
+               Behavior" property.
+        </p>
+        
+        <h3>Append to Previous Message</h3>
+        
+        <p>
+               The line is appended to the last field defined in the Grok 
Expression. This is typically done because the last field is a 
+               'message' type of field, which can consist of new-lines.
+        </p>
+        
+        <h3>Skip Line</h3>
+        
+        <p>
+               The line is completely dismissed.
+        </p>
+        
+        <h3>Raw Line</h3>
+        
+        <p>
+               The fields will be null except the <code>_raw</code> field that 
will contain the line allowing further processing downstream.
+        </p>
+        
+        <h2>
+               Examples
+               </h2>
+               
+               <p>
+               Assuming two messages (<code><6>Feb 28 12:00:00 192.168.0.1 
aliyun[11111]: [error] Syslog test</code> and <code>This is a 
+               bad message...</code>) with the Grok Expression 
<code><%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} 
+               %{WORD:ident}%{GREEDYDATA:message}</code> ; and assuming a JSON 
Writer, the following results will be generated:
+        </p>
+        
+        <h3>Append to Previous Message</h3>
+        
+        <p>
+               <code><pre>
+[ {
+  "priority" : "6",
+  "timestamp" : "Feb 28 12:00:00",
+  "hostname" : "192.168.0.1",
+  "ident" : "aliyun",
+  "message" : "[11111]: [error] Syslog test\nThis is a bad message...",
+  "stackTrace" : null,
+  "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog 
test\nThis is a bad message..."
+} ]
+               </pre></code>
+        </p>
+        
+        <h3>Skip Line</h3>
+        
+        <p>
+               <code><pre>
+[ {
+  "priority" : "6",
+  "timestamp" : "Feb 28 12:00:00",
+  "hostname" : "192.168.0.1",
+  "ident" : "aliyun",
+  "message" : "[11111]: [error] Syslog test",
+  "stackTrace" : null,
+  "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog test"
+} ]
+               </pre></code>
+        </p>
+        
+        <h3>Raw Line</h3>
+        
+        <p>
+               <code><pre>
+[ {
+  "priority" : "6",
+  "timestamp" : "Feb 28 12:00:00",
+  "hostname" : "192.168.0.1",
+  "ident" : "aliyun",
+  "message" : "[11111]: [error] Syslog test",
+  "stackTrace" : null,
+  "_raw" : "<6>Feb 28 12:00:00 192.168.0.1 aliyun[11111]: [error] Syslog test"
+}, {
+  "priority" : null,
+  "timestamp" : null,
+  "hostname" : null,
+  "ident" : null,
+  "message" : null,
+  "stackTrace" : null,
+  "_raw" : "This is a bad message..."
+} ]
+               </pre></code>
         </p>
 
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 538faa8..f267028 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -60,7 +60,7 @@ public class TestGrokRecordReader {
     public void testParseSingleLineLogMessages() throws GrokException, 
IOException, MalformedRecordException {
         try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/single-line-log-messages.txt"))) {
             final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
 
             final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", 
"FATAL", "FINE"};
             final String[] messages = new String[] {"Test Message 1", "Red", 
"Green", "Blue", "Yellow"};
@@ -93,7 +93,7 @@ public class TestGrokRecordReader {
         final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election 
Notification Thread-1] o.a.n.LoggerClass \n"
             + "org.apache.nifi.exception.UnitTestException: Testing to ensure 
we are able to capture stack traces";
         final InputStream bais = new 
ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
-        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, 
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
+        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, 
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
 
         final Object[] values = deserializer.nextRecord().getValues();
 
@@ -116,7 +116,7 @@ public class TestGrokRecordReader {
     public void testParseNiFiSampleLog() throws IOException, GrokException, 
MalformedRecordException {
         try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/nifi-log-sample.log"))) {
             final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
             final String[] logLevels = new String[] {"INFO", "INFO", "INFO", 
"WARN", "WARN"};
 
             for (int i = 0; i < logLevels.length; i++) {
@@ -138,7 +138,7 @@ public class TestGrokRecordReader {
     public void testParseNiFiSampleMultilineWithStackTrace() throws 
IOException, GrokException, MalformedRecordException {
         try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))) 
{
             final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
             final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", 
"WARN", "WARN"};
 
             for (int i = 0; i < logLevels.length; i++) {
@@ -171,7 +171,7 @@ public class TestGrokRecordReader {
     public void testParseStackTrace() throws GrokException, IOException, 
MalformedRecordException {
         try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/error-with-stack-trace.log"))) {
             final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
true);
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
 
             final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
             final String[] messages = new String[] {"message without stack 
trace",
@@ -236,7 +236,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("stackTrace"));  // always 
implicitly there
             assertTrue(fieldNames.contains("_raw"));  // always implicitly 
there
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, true);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.APPEND);
             final Record record = deserializer.nextRecord();
 
             assertEquals("May 22 15:58:23", record.getValue("timestamp"));
@@ -273,7 +273,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("fourth"));
             assertTrue(fieldNames.contains("fifth"));
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, false);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.SKIP);
             final Record record = deserializer.nextRecord();
 
             assertEquals("1", record.getValue("first"));
@@ -288,6 +288,51 @@ public class TestGrokRecordReader {
     }
 
     @Test
+    public void testRawUnmatchedRecordFirstLine() throws GrokException, 
IOException, MalformedRecordException {
+        final String nonMatchingRecord = "hello there";
+        final String matchingRecord = "1 2 3 4 5";
+
+        final String input = nonMatchingRecord + "\n" + matchingRecord;
+        final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+        try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+            final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final List<String> fieldNames = schema.getFieldNames();
+            assertEquals(7, fieldNames.size());
+            assertTrue(fieldNames.contains("first"));
+            assertTrue(fieldNames.contains("second"));
+            assertTrue(fieldNames.contains("third"));
+            assertTrue(fieldNames.contains("fourth"));
+            assertTrue(fieldNames.contains("fifth"));
+            assertTrue(fieldNames.contains("_raw"));
+
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.RAW);
+            Record record = deserializer.nextRecord();
+
+            assertEquals(null, record.getValue("first"));
+            assertEquals(null, record.getValue("second"));
+            assertEquals(null, record.getValue("third"));
+            assertEquals(null, record.getValue("fourth"));
+            assertEquals(null, record.getValue("fifth"));
+            assertEquals("hello there", record.getValue("_raw"));
+
+            record = deserializer.nextRecord();
+
+            assertEquals("1", record.getValue("first"));
+            assertEquals("2", record.getValue("second"));
+            assertEquals("3", record.getValue("third"));
+            assertEquals("4", record.getValue("fourth"));
+            assertEquals("5", record.getValue("fifth"));
+            assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+            assertNull(deserializer.nextRecord());
+            deserializer.close();
+        }
+    }
+
+    @Test
     public void testSkipUnmatchedRecordMiddle() throws GrokException, 
IOException, MalformedRecordException {
         final String nonMatchingRecord = "hello there";
         final String matchingRecord = "1 2 3 4 5";
@@ -306,8 +351,9 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("third"));
             assertTrue(fieldNames.contains("fourth"));
             assertTrue(fieldNames.contains("fifth"));
+            assertTrue(fieldNames.contains("_raw"));
 
-            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, false);
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.SKIP);
             for (int i = 0; i < 2; i++) {
                 final Record record = deserializer.nextRecord();
 
@@ -316,10 +362,110 @@ public class TestGrokRecordReader {
                 assertEquals("3", record.getValue("third"));
                 assertEquals("4", record.getValue("fourth"));
                 assertEquals("5", record.getValue("fifth"));
+                assertEquals("1 2 3 4 5", record.getValue("_raw"));
             }
 
             assertNull(deserializer.nextRecord());
             deserializer.close();
         }
     }
+
+    @Test
+    public void testRawUnmatchedRecordMiddle() throws GrokException, 
IOException, MalformedRecordException {
+        final String nonMatchingRecord = "hello there";
+        final String matchingRecord = "1 2 3 4 5";
+
+        final String input = matchingRecord + "\n" + nonMatchingRecord + "\n" 
+ matchingRecord;
+        final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+        try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+            final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final List<String> fieldNames = schema.getFieldNames();
+            assertEquals(7, fieldNames.size());
+            assertTrue(fieldNames.contains("first"));
+            assertTrue(fieldNames.contains("second"));
+            assertTrue(fieldNames.contains("third"));
+            assertTrue(fieldNames.contains("fourth"));
+            assertTrue(fieldNames.contains("fifth"));
+            assertTrue(fieldNames.contains("_raw"));
+
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.RAW);
+            Record record = deserializer.nextRecord();
+
+            assertEquals("1", record.getValue("first"));
+            assertEquals("2", record.getValue("second"));
+            assertEquals("3", record.getValue("third"));
+            assertEquals("4", record.getValue("fourth"));
+            assertEquals("5", record.getValue("fifth"));
+            assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+            record = deserializer.nextRecord();
+
+            assertEquals(null, record.getValue("first"));
+            assertEquals(null, record.getValue("second"));
+            assertEquals(null, record.getValue("third"));
+            assertEquals(null, record.getValue("fourth"));
+            assertEquals(null, record.getValue("fifth"));
+            assertEquals("hello there", record.getValue("_raw"));
+
+            record = deserializer.nextRecord();
+
+            assertEquals("1", record.getValue("first"));
+            assertEquals("2", record.getValue("second"));
+            assertEquals("3", record.getValue("third"));
+            assertEquals("4", record.getValue("fourth"));
+            assertEquals("5", record.getValue("fifth"));
+            assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+            assertNull(deserializer.nextRecord());
+            deserializer.close();
+        }
+    }
+
+    @Test
+    public void testRawUnmatchedRecordlast() throws GrokException, 
IOException, MalformedRecordException {
+        final String nonMatchingRecord = "hello there";
+        final String matchingRecord = "1 2 3 4 5";
+
+        final String input = matchingRecord + "\n" + nonMatchingRecord;
+        final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+
+        try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
+            final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
+
+            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final List<String> fieldNames = schema.getFieldNames();
+            assertEquals(7, fieldNames.size());
+            assertTrue(fieldNames.contains("first"));
+            assertTrue(fieldNames.contains("second"));
+            assertTrue(fieldNames.contains("third"));
+            assertTrue(fieldNames.contains("fourth"));
+            assertTrue(fieldNames.contains("fifth"));
+            assertTrue(fieldNames.contains("_raw"));
+
+            final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.RAW);
+            Record record = deserializer.nextRecord();
+
+            assertEquals("1", record.getValue("first"));
+            assertEquals("2", record.getValue("second"));
+            assertEquals("3", record.getValue("third"));
+            assertEquals("4", record.getValue("fourth"));
+            assertEquals("5", record.getValue("fifth"));
+            assertEquals("1 2 3 4 5", record.getValue("_raw"));
+
+            record = deserializer.nextRecord();
+
+            assertEquals(null, record.getValue("first"));
+            assertEquals(null, record.getValue("second"));
+            assertEquals(null, record.getValue("third"));
+            assertEquals(null, record.getValue("fourth"));
+            assertEquals(null, record.getValue("fifth"));
+            assertEquals("hello there", record.getValue("_raw"));
+
+            assertNull(deserializer.nextRecord());
+            deserializer.close();
+        }
+    }
 }
\ No newline at end of file

Reply via email to