This is an automated email from the ASF dual-hosted git repository.

otto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e6e4109  NIFI-9850 Add support for multiple expressions to GrokReader 
(#5918)
e6e4109 is described below

commit e6e4109cf901d19e99f5d6b9df7a988750a16065
Author: exceptionfactory <[email protected]>
AuthorDate: Sat Apr 2 12:30:55 2022 -0500

    NIFI-9850 Add support for multiple expressions to GrokReader (#5918)
    
    * NIFI-9850 Added support for multiple expressions to GrokReader
    
    - Updated Grok Expression property to support Resources
    
    * NIFI-9850 Updated documentation for Fields from Grok Expression strategy
    
    This closes #5918
    Signed-off-by: Otto Fowler <[email protected]>
---
 .../main/java/org/apache/nifi/grok/GrokReader.java | 109 +++++++----
 .../org/apache/nifi/grok/GrokRecordReader.java     |  56 +++---
 .../java/org/apache/nifi/grok/TestGrokReader.java  | 217 +++++++++++----------
 .../org/apache/nifi/grok/TestGrokRecordReader.java |  96 ++++-----
 4 files changed, 263 insertions(+), 215 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index e7459d7..6d4aa18 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -29,6 +29,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceReference;
 import org.apache.nifi.components.resource.ResourceType;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -48,6 +49,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -60,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 @Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", 
"reader", "regex", "pattern", "logstash"})
 @CapabilityDescription("Provides a mechanism for reading unstructured text 
data, such as log files, and structuring the data "
@@ -71,8 +74,7 @@ import java.util.regex.Matcher;
     + "no stack trace, it will have a NULL value for the stackTrace field 
(assuming that the schema does in fact include a stackTrace field of type 
String). "
     + "Assuming that the schema includes a '_raw' field of type String, the 
raw message will be included in the Record.")
 public class GrokReader extends SchemaRegistryService implements 
RecordReaderFactory {
-    private volatile GrokCompiler grokCompiler;
-    private volatile Grok grok;
+    private volatile List<Grok> groks;
     private volatile NoMatchStrategy noMatchStrategy;
     private volatile RecordSchema recordSchema;
     private volatile RecordSchema recordSchemaFromGrok;
@@ -87,8 +89,10 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
             "The line of text that does not match the Grok Expression will 
only be added to the _raw field.");
 
     static final AllowableValue STRING_FIELDS_FROM_GROK_EXPRESSION = new 
AllowableValue("string-fields-from-grok-expression", "Use String Fields From 
Grok Expression",
-        "The schema will be derived by using the field names present in the 
Grok Expression. All fields will be assumed to be of type String. Additionally, 
a field will be included "
-            + "with a name of 'stackTrace' and a type of String.");
+            "The schema will be derived using the field names present in all 
configured Grok Expressions. "
+            + "All schema fields will have a String type and will be marked as 
nullable. "
+            + "The schema will also include a `stackTrace` field, and a `_raw` 
field containing the input line string."
+    );
 
     static final PropertyDescriptor PATTERN_FILE = new 
PropertyDescriptor.Builder()
         .name("Grok Pattern File")
@@ -102,10 +106,14 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
     static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor.Builder()
         .name("Grok Expression")
+        .displayName("Grok Expressions")
         .description("Specifies the format of a log line in Grok format. This 
allows the Record Reader to understand how to parse each log line. "
-            + "If a line in the log file does not match this pattern, the line 
will be assumed to belong to the previous log message."
-            + "If other Grok expressions are referenced by this expression, 
they need to be supplied in the Grok Pattern File")
+            + "The property supports one or more Grok expressions. The Reader 
attempts to parse input lines according to the configured order of the 
expressions."
+            + "If a line in the log file does not match any expressions, the 
line will be assumed to belong to the previous log message."
+            + "If other Grok patterns are referenced by this expression, they 
need to be supplied in the Grok Pattern File property."
+        )
         .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .identifiesExternalResource(ResourceCardinality.SINGLE, 
ResourceType.TEXT, ResourceType.URL, ResourceType.FILE)
         .required(true)
         .build();
 
@@ -130,11 +138,10 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
     @OnEnabled
     public void preCompile(final ConfigurationContext context) throws 
GrokException, IOException {
-        grokCompiler = GrokCompiler.newInstance();
+        GrokCompiler grokCompiler = GrokCompiler.newInstance();
 
-        try (final InputStream in = 
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
-            final Reader reader = new InputStreamReader(in)) {
-            grokCompiler.register(reader);
+        try (final Reader defaultPatterns = getDefaultPatterns()) {
+            grokCompiler.register(defaultPatterns);
         }
 
         if (context.getProperty(PATTERN_FILE).isSet()) {
@@ -144,10 +151,11 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
             }
         }
 
-        grok = 
grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());
-
+        groks = readGrokExpressions(context).stream()
+                .map(grokCompiler::compile)
+                .collect(Collectors.toList());
 
-        
if(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
 {
+        if 
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()))
 {
             noMatchStrategy = NoMatchStrategy.APPEND;
         } else if 
(context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(RAW_LINE.getValue()))
 {
             noMatchStrategy = NoMatchStrategy.RAW;
@@ -155,7 +163,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
             noMatchStrategy = NoMatchStrategy.SKIP;
         }
 
-        this.recordSchemaFromGrok = createRecordSchema(grok);
+        this.recordSchemaFromGrok = createRecordSchema(groks);
 
         final String schemaAccess = 
context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if 
(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
@@ -167,42 +175,61 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        ArrayList<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
-        // validate the grok expression against configuration
-        GrokCompiler grokCompiler = GrokCompiler.newInstance();
-        String subject = GROK_EXPRESSION.getName();
-        String input = 
validationContext.getProperty(GROK_EXPRESSION).getValue();
-        GrokExpressionValidator validator;
-
-        try (final InputStream in = 
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
-             final Reader reader = new InputStreamReader(in)) {
-            grokCompiler.register(reader);
-        } catch (IOException e) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+        final GrokCompiler grokCompiler = GrokCompiler.newInstance();
+
+        final String expressionSubject = GROK_EXPRESSION.getDisplayName();
+
+        try (final Reader defaultPatterns = getDefaultPatterns()) {
+            grokCompiler.register(defaultPatterns);
+        } catch (final IOException e) {
             results.add(new ValidationResult.Builder()
-                    .input(input)
-                    .subject(subject)
+                    .input("Default Grok Patterns")
+                    .subject(expressionSubject)
                     .valid(false)
                     .explanation("Unable to load default patterns: " + 
e.getMessage())
                     .build());
         }
 
-        validator = new 
GrokExpressionValidator(validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue(),grokCompiler);
-        results.add(validator.validate(subject,input,validationContext));
+        final String patternFileName = 
validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
+        final GrokExpressionValidator validator = new 
GrokExpressionValidator(patternFileName, grokCompiler);
+
+        try {
+            final List<String> grokExpressions = 
readGrokExpressions(validationContext);
+            final List<ValidationResult> grokExpressionResults = 
grokExpressions.stream()
+                    .map(grokExpression -> 
validator.validate(expressionSubject, grokExpression, 
validationContext)).collect(Collectors.toList());
+            results.addAll(grokExpressionResults);
+        } catch (final IOException e) {
+            results.add(new ValidationResult.Builder()
+                    .input("Configured Grok Expressions")
+                    .subject(expressionSubject)
+                    .valid(false)
+                    .explanation(String.format("Read Grok Expressions failed: 
%s", e.getMessage()))
+                    .build());
+        }
+
         return results;
     }
 
-    static RecordSchema createRecordSchema(final Grok grok) {
+    private List<String> readGrokExpressions(final PropertyContext 
propertyContext) throws IOException {
+        final ResourceReference expressionsResource = 
propertyContext.getProperty(GROK_EXPRESSION).asResource();
+        try (
+                final InputStream expressionsStream = 
expressionsResource.read();
+                final BufferedReader expressionsReader = new 
BufferedReader(new InputStreamReader(expressionsStream))
+        ) {
+            return expressionsReader.lines().collect(Collectors.toList());
+        }
+    }
+
+    static RecordSchema createRecordSchema(final List<Grok> groks) {
         final Set<RecordField> fields = new LinkedHashSet<>();
 
-        String grokExpression = grok.getOriginalGrokPattern();
-        populateSchemaFieldNames(grok, grokExpression, fields);
+        groks.forEach(grok -> populateSchemaFieldNames(grok, 
grok.getOriginalGrokPattern(), fields));
 
         fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType(), true));
         fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, 
RecordFieldType.STRING.getDataType(), true));
 
-        final RecordSchema schema = new SimpleRecordSchema(new 
ArrayList<>(fields));
-
-        return schema;
+        return new SimpleRecordSchema(new ArrayList<>(fields));
     }
 
     private static void populateSchemaFieldNames(final Grok grok, String 
grokExpression, final Collection<RecordField> fields) {
@@ -267,7 +294,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
 
             @Override
-            public RecordSchema getSchema(Map<String, String> variables, 
InputStream contentStream, RecordSchema readSchema) throws 
SchemaNotFoundException {
+            public RecordSchema getSchema(Map<String, String> variables, 
InputStream contentStream, RecordSchema readSchema) {
                 return recordSchema;
             }
 
@@ -281,6 +308,14 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
     @Override
     public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger) throws IOException, SchemaNotFoundException {
         final RecordSchema schema = getSchema(variables, in, null);
-        return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, 
noMatchStrategy);
+        return new GrokRecordReader(in, groks, schema, recordSchemaFromGrok, 
noMatchStrategy);
+    }
+
+    private Reader getDefaultPatterns() throws IOException {
+        final InputStream inputStream = 
getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
+        if (inputStream == null) {
+            throw new IOException(String.format("Default Patterns [%s] not 
found", DEFAULT_PATTERN_NAME));
+        }
+        return new InputStreamReader(inputStream);
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 48317c9..51a9228 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,14 +41,13 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import io.krakens.grok.api.Grok;
-import io.krakens.grok.api.Match;
 
 public class GrokRecordReader implements RecordReader {
     private final BufferedReader reader;
-    private final Grok grok;
+    private final List<Grok> groks;
     private final NoMatchStrategy noMatchStrategy;
     private final RecordSchema schemaFromGrok;
-    private RecordSchema schema;
+    private final RecordSchema schema;
 
     private String nextLine;
     Map<String, Object> nextMap = null;
@@ -56,15 +56,19 @@ public class GrokRecordReader implements RecordReader {
     static final String RAW_MESSAGE_NAME = "_raw";
 
     private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
-        "^\\s*(?:(?:    |\\t)+at )|"
-            + "(?:(?:    |\\t)+\\[CIRCULAR REFERENCE\\:)|"
-            + "(?:Caused by\\: )|"
-            + "(?:Suppressed\\: )|"
+        "^\\s*(?:(?:\\s{4}|\\t)+at )|"
+            + "(?:(?:\\s{4}|\\t)+\\[CIRCULAR REFERENCE:)|"
+            + "(?:Caused by: )|"
+            + "(?:Suppressed: )|"
             + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
 
     public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema, final RecordSchema schemaFromGrok, final NoMatchStrategy 
noMatchStrategy) {
+        this(in, Collections.singletonList(grok), schema, schemaFromGrok, 
noMatchStrategy);
+    }
+
+    public GrokRecordReader(final InputStream in, final List<Grok> groks, 
final RecordSchema schema, final RecordSchema schemaFromGrok, final 
NoMatchStrategy noMatchStrategy) {
         this.reader = new BufferedReader(new InputStreamReader(in));
-        this.grok = grok;
+        this.groks = groks;
         this.schema = schema;
         this.noMatchStrategy = noMatchStrategy;
         this.schemaFromGrok = schemaFromGrok;
@@ -91,10 +95,8 @@ public class GrokRecordReader implements RecordReader {
                 return null;
             }
 
-            final Match match = grok.match(line);
-            valueMap = match.capture();
-
-            if((valueMap == null || valueMap.isEmpty()) && 
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
+            valueMap = capture(line);
+            if ((valueMap == null || valueMap.isEmpty()) && 
noMatchStrategy.equals(NoMatchStrategy.RAW)) {
                 break;
             }
         }
@@ -108,8 +110,7 @@ public class GrokRecordReader implements RecordReader {
         String stackTrace = null;
         final StringBuilder trailingText = new StringBuilder();
         while ((nextLine = reader.readLine()) != null) {
-            final Match nextLineMatch = grok.match(nextLine);
-            final Map<String, Object> nextValueMap = nextLineMatch.capture();
+            final Map<String, Object> nextValueMap = capture(nextLine);
             if (nextValueMap.isEmpty() && 
!noMatchStrategy.equals(NoMatchStrategy.RAW)) {
                 // next line did not match. Check if it indicates a Stack 
Trace. If so, read until
                 // the stack trace ends. Otherwise, append the next line to 
the last field in the record.
@@ -128,14 +129,13 @@ public class GrokRecordReader implements RecordReader {
             }
         }
 
-        final Record record = createRecord(valueMap, trailingText, stackTrace, 
raw.toString(), coerceTypes, dropUnknownFields);
-        return record;
+        return createRecord(valueMap, trailingText, stackTrace, 
raw.toString(), coerceTypes);
     }
 
-    private Record createRecord(final Map<String, Object> valueMap, final 
StringBuilder trailingText, final String stackTrace, final String raw, final 
boolean coerceTypes, final boolean dropUnknown) {
+    private Record createRecord(final Map<String, Object> valueMap, final 
StringBuilder trailingText, final String stackTrace, final String raw, final 
boolean coerceTypes) {
         final Map<String, Object> converted = new HashMap<>();
 
-        if(valueMap != null && !valueMap.isEmpty()) {
+        if (valueMap != null && !valueMap.isEmpty()) {
 
             for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
                 final String fieldName = entry.getKey();
@@ -203,7 +203,7 @@ public class GrokRecordReader implements RecordReader {
                     if (value == null) {
                         converted.put(lastPopulatedFieldName, 
trailingText.toString());
                     } else if (value instanceof String) { // if not a String 
it is a List and we will just drop the trailing text
-                        converted.put(lastPopulatedFieldName, (String) value + 
trailingText.toString());
+                        converted.put(lastPopulatedFieldName, value + 
trailingText.toString());
                     }
                 }
             }
@@ -238,11 +238,7 @@ public class GrokRecordReader implements RecordReader {
             return false;
         }
 
-        if (line.indexOf(" ") < index) {
-            return false;
-        }
-
-        return true;
+        return line.indexOf(" ") >= index;
     }
 
     private String readStackTrace(final String firstLine) throws IOException {
@@ -291,4 +287,16 @@ public class GrokRecordReader implements RecordReader {
         return schema;
     }
 
+    private Map<String, Object> capture(final String log) {
+        Map<String, Object> capture = Collections.emptyMap();
+
+        for (final Grok grok : groks) {
+            capture = grok.capture(log);
+            if (!capture.isEmpty()) {
+                break;
+            }
+        }
+
+        return capture;
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
index 4c29c1b..0a5f3fc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -16,90 +16,58 @@
  */
 package org.apache.nifi.grok;
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.NoOpProcessor;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class TestGrokReader {
     private TestRunner runner;
-    private List<Record> records;
 
-    private static final PropertyDescriptor READER = new 
PropertyDescriptor.Builder()
-        .name("reader")
-        .identifiesControllerService(GrokReader.class)
-        .build();
+    private static final String TIMESTAMP_FIELD = "timestamp";
+
+    private static final String LEVEL_FIELD = "level";
+
+    private static final String FACILITY_FIELD = "facility";
+
+    private static final String PROGRAM_FIELD = "program";
+
+    private static final String MESSAGE_FIELD = "message";
+
+    private static final String STACKTRACE_FIELD = "stackTrace";
+
+    private static final String RAW_FIELD = "_raw";
 
     @BeforeEach
     void setUp() {
-        Processor processor = new AbstractProcessor() {
-            Relationship SUCCESS = new Relationship.Builder()
-                .name("success")
-                .build();
-
-            @Override
-            public void onTrigger(ProcessContext context, ProcessSession 
session) throws ProcessException {
-                FlowFile flowFile = session.get();
-                final RecordReaderFactory readerFactory = 
context.getProperty(READER).asControllerService(RecordReaderFactory.class);
-
-                try (final InputStream in = session.read(flowFile);
-                     final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
-                    Record record;
-                    while ((record = reader.nextRecord()) != null) {
-                        records.add(record);
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-
-                session.transfer(flowFile, SUCCESS);
-            }
-
-            @Override
-            protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
-                return Arrays.asList(READER);
-            }
-
-            @Override
-            public Set<Relationship> getRelationships() {
-                return new HashSet<>(Arrays.asList(SUCCESS));
-            }
-        };
-
-        runner = TestRunners.newTestRunner(processor);
-
-        records = new ArrayList<>();
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
     }
 
     @Test
     void testComplexGrokExpression() throws Exception {
-        // GIVEN
         String input = "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: LogMessage" 
+ System.lineSeparator()
             + "October 19 19:13:16 127.0.0.1 nifi[1000]: LogMessage2" + 
System.lineSeparator();
 
@@ -107,65 +75,100 @@ public class TestGrokReader {
         String grokExpression = "%{LINE}";
 
         SimpleRecordSchema expectedSchema = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("timestamp", RecordFieldType.STRING.getDataType()),
-            new RecordField("facility", RecordFieldType.STRING.getDataType()),
+            new RecordField(TIMESTAMP_FIELD, 
RecordFieldType.STRING.getDataType()),
+            new RecordField(FACILITY_FIELD, 
RecordFieldType.STRING.getDataType()),
             new RecordField("priority", RecordFieldType.STRING.getDataType()),
             new RecordField("logsource", RecordFieldType.STRING.getDataType()),
-            new RecordField("program", RecordFieldType.STRING.getDataType()),
+            new RecordField(PROGRAM_FIELD, 
RecordFieldType.STRING.getDataType()),
             new RecordField("pid", RecordFieldType.STRING.getDataType()),
-            new RecordField("message", RecordFieldType.STRING.getDataType()),
-            new RecordField("stackTrace", 
RecordFieldType.STRING.getDataType()),
-            new RecordField("_raw", RecordFieldType.STRING.getDataType())
+            new RecordField(MESSAGE_FIELD, 
RecordFieldType.STRING.getDataType()),
+            new RecordField(STACKTRACE_FIELD, 
RecordFieldType.STRING.getDataType()),
+            new RecordField(RAW_FIELD, RecordFieldType.STRING.getDataType())
         ));
 
-        List<Record> expectedRecords = Arrays.asList(
-            new MapRecord(expectedSchema, new HashMap<String, Object>() {{
-                put("timestamp", "1021-09-09 09:03:06");
-                put("facility", null);
-                put("priority", null);
-                put("logsource", "127.0.0.1");
-                put("program", "nifi");
-                put("pid", "1000");
-                put("message", " LogMessage");
-                put("stackstrace", null);
-                put("_raw", "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: 
LogMessage");
-            }}),
-            new MapRecord(expectedSchema, new HashMap<String, Object>() {{
-                put("timestamp", "October 19 19:13:16");
-                put("facility", null);
-                put("priority", null);
-                put("logsource", "127.0.0.1");
-                put("program", "nifi");
-                put("pid", "1000");
-                put("message", " LogMessage2");
-                put("stackstrace", null);
-                put("_raw", "October 19 19:13:16 127.0.0.1 nifi[1000]: 
LogMessage2");
-            }})
-        );
-
-        // WHEN
-        GrokReader grokReader = new GrokReader();
-
-        runner.addControllerService("grokReader", grokReader);
-        runner.setProperty(READER, "grokReader");
-
+        final Record expectedFirstRecord = new MapRecord(expectedSchema, new 
HashMap<String, Object>() {{
+            put(TIMESTAMP_FIELD, "1021-09-09 09:03:06");
+            put(FACILITY_FIELD, null);
+            put("priority", null);
+            put("logsource", "127.0.0.1");
+            put(PROGRAM_FIELD, "nifi");
+            put("pid", "1000");
+            put("message", " LogMessage");
+            put(STACKTRACE_FIELD, null);
+            put(RAW_FIELD, "1021-09-09 09:03:06 127.0.0.1 nifi[1000]: 
LogMessage");
+        }});
+
+        final Record expectedSecondRecord = new MapRecord(expectedSchema, new 
HashMap<String, Object>() {{
+            put(TIMESTAMP_FIELD, "October 19 19:13:16");
+            put(FACILITY_FIELD, null);
+            put("priority", null);
+            put("logsource", "127.0.0.1");
+            put(PROGRAM_FIELD, "nifi");
+            put("pid", "1000");
+            put(MESSAGE_FIELD, " LogMessage2");
+            put(STACKTRACE_FIELD, null);
+            put(RAW_FIELD, "October 19 19:13:16 127.0.0.1 nifi[1000]: 
LogMessage2");
+        }});
+
+        final GrokReader grokReader = new GrokReader();
+        runner.addControllerService(GrokReader.class.getSimpleName(), 
grokReader);
         runner.setProperty(grokReader, GrokReader.PATTERN_FILE, 
grokPatternFile);
         runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, 
grokExpression);
+        runner.enableControllerService(grokReader);
+
+        final byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+        final ByteArrayInputStream inputStream = new 
ByteArrayInputStream(inputBytes);
+        final RecordReader recordReader = 
grokReader.createRecordReader(Collections.emptyMap(), inputStream, 
inputBytes.length, runner.getLogger());
+
+        final Record firstRecord = recordReader.nextRecord();
+
+        assertArrayEquals(expectedFirstRecord.getValues(), 
firstRecord.getValues());
+        assertEquals(expectedSchema, firstRecord.getSchema());
 
+        final Record secondRecord = recordReader.nextRecord();
+        assertArrayEquals(expectedSecondRecord.getValues(), 
secondRecord.getValues());
+        assertEquals(expectedSchema, secondRecord.getSchema());
+
+        assertNull(recordReader.nextRecord());
+    }
+
+    @Test
+    public void testMultipleExpressions() throws InitializationException, 
IOException, SchemaNotFoundException, MalformedRecordException {
+        final String program = "NiFi";
+        final String level = "INFO";
+        final String message = "Processing Started";
+        final String timestamp = "Jan 10 12:30:45";
+
+        final String logs = String.format("%s %s %s%n%s %s %s %s%n", program, 
level, message, timestamp, program, level, message);
+        final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
+
+        final String matchingExpression = "%{PROG:program} %{LOGLEVEL:level} 
%{GREEDYDATA:message}";
+        final String firstExpression = "%{SYSLOGTIMESTAMP:timestamp} 
%{PROG:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
+        final String expressions = String.format("%s%n%s", firstExpression, 
matchingExpression);
+
+        final GrokReader grokReader = new GrokReader();
+        runner.addControllerService(GrokReader.class.getSimpleName(), 
grokReader);
+        runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, 
expressions);
+        runner.setProperty(grokReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
         runner.enableControllerService(grokReader);
 
-        runner.enqueue(input);
-        runner.run();
+        final ByteArrayInputStream inputStream = new 
ByteArrayInputStream(bytes);
+        final RecordReader recordReader = 
grokReader.createRecordReader(Collections.emptyMap(), inputStream, 
bytes.length, runner.getLogger());
 
-        // THEN
-        List<Function<Record, Object>> propertyProviders = Arrays.asList(
-            Record::getSchema,
-            Record::getValues
-        );
+        final Record firstRecord = recordReader.nextRecord();
+        assertNotNull(firstRecord);
+        assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
+        assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
+        assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
+        assertNull(firstRecord.getValue(TIMESTAMP_FIELD));
 
-        List<EqualsWrapper<Record>> wrappedExpected = 
EqualsWrapper.wrapList(expectedRecords, propertyProviders);
-        List<EqualsWrapper<Record>> wrappedActual = 
EqualsWrapper.wrapList(records, propertyProviders);
+        final Record secondRecord = recordReader.nextRecord();
+        assertNotNull(secondRecord);
+        assertEquals(program, secondRecord.getValue(PROGRAM_FIELD));
+        assertEquals(level, secondRecord.getValue(LEVEL_FIELD));
+        assertEquals(message, secondRecord.getValue(MESSAGE_FIELD));
+        assertEquals(timestamp, secondRecord.getValue(TIMESTAMP_FIELD));
 
-        Assertions.assertEquals(wrappedExpected, wrappedActual);
+        assertNull(recordReader.nextRecord());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 84e93c7..0b34541 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,7 +45,7 @@ public class TestGrokRecordReader {
 
     @BeforeAll
     public static void beforeClass() throws Exception {
-        try (final InputStream fis = new FileInputStream(new 
File("src/main/resources/default-grok-patterns.txt"))) {
+        try (final InputStream fis = new 
FileInputStream("src/main/resources/default-grok-patterns.txt")) {
             grokCompiler = GrokCompiler.newInstance();
             grokCompiler.register(fis);
         }
@@ -58,9 +58,8 @@ public class TestGrokRecordReader {
 
     @Test
     public void testParseSingleLineLogMessages() throws GrokException, 
IOException, MalformedRecordException {
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/single-line-log-messages.txt"))) {
-            final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/grok/single-line-log-messages.txt")) {
+            final GrokRecordReader deserializer = 
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}", fis);
 
             final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", 
"FATAL", "FINE"};
             final String[] messages = new String[] {"Test Message 1", "Red", 
"Green", "Blue", "Yellow"};
@@ -85,15 +84,13 @@ public class TestGrokRecordReader {
         }
     }
 
-
     @Test
     public void testParseEmptyMessageWithStackTrace() throws GrokException, 
IOException, MalformedRecordException {
-        final Grok grok = grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} 
%{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
-
         final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election 
Notification Thread-1] o.a.n.LoggerClass \n"
             + "org.apache.nifi.exception.UnitTestException: Testing to ensure 
we are able to capture stack traces";
         final InputStream bais = new 
ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
-        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, 
GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
+
+        final GrokRecordReader deserializer = 
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", bais);
 
         final Object[] values = deserializer.nextRecord().getValues();
 
@@ -110,21 +107,18 @@ public class TestGrokRecordReader {
         deserializer.close();
     }
 
-
-
     @Test
     public void testParseNiFiSampleLog() throws IOException, GrokException, 
MalformedRecordException {
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/nifi-log-sample.log"))) {
-            final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/grok/nifi-log-sample.log")) {
+            final GrokRecordReader deserializer = 
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}", fis);
             final String[] logLevels = new String[] {"INFO", "INFO", "INFO", 
"WARN", "WARN"};
 
-            for (int i = 0; i < logLevels.length; i++) {
+            for (String logLevel : logLevels) {
                 final Object[] values = deserializer.nextRecord().getValues();
 
                 assertNotNull(values);
                 assertEquals(7, values.length); // values[] contains 6 
elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
-                assertEquals(logLevels[i], values[1]);
+                assertEquals(logLevel, values[1]);
                 assertNull(values[5]);
                 assertNotNull(values[6]);
             }
@@ -136,18 +130,17 @@ public class TestGrokRecordReader {
 
     @Test
     public void testParseNiFiSampleMultilineWithStackTrace() throws 
IOException, GrokException, MalformedRecordException {
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))) 
{
-            final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log"))
 {
+            final GrokRecordReader deserializer = 
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
\\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?", fis);
             final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", 
"WARN", "WARN"};
 
-            for (int i = 0; i < logLevels.length; i++) {
+            for (String logLevel : logLevels) {
                 final Record record = deserializer.nextRecord();
                 final Object[] values = record.getValues();
 
                 assertNotNull(values);
                 assertEquals(7, values.length); // values[] contains 6 
elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
-                assertEquals(logLevels[i], values[1]);
+                assertEquals(logLevel, values[1]);
                 if ("ERROR".equals(values[1])) {
                     final String msg = (String) values[4];
                     assertEquals("One\nTwo\nThree", msg);
@@ -166,12 +159,10 @@ public class TestGrokRecordReader {
         }
     }
 
-
     @Test
     public void testParseStackTrace() throws GrokException, IOException, 
MalformedRecordException {
-        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/grok/error-with-stack-trace.log"))) {
-            final Grok grok = 
grokCompiler.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}");
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, 
grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), 
NoMatchStrategy.APPEND);
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/grok/error-with-stack-trace.log")) {
+            final GrokRecordReader deserializer = 
getRecordReader("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} 
%{GREEDYDATA:message}", fis);
 
             final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
             final String[] messages = new String[] {"message without stack 
trace",
@@ -223,7 +214,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(msgBytes)) {
             final Grok grok = 
grokCompiler.compile("%{SYSLOGBASE}%{GREEDYDATA:message}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(9, fieldNames.size());
             assertTrue(fieldNames.contains("timestamp"));
@@ -264,7 +255,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
             final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
@@ -298,7 +289,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
             final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
@@ -311,11 +302,11 @@ public class TestGrokRecordReader {
             final GrokRecordReader deserializer = new GrokRecordReader(in, 
grok, schema, schema, NoMatchStrategy.RAW);
             Record record = deserializer.nextRecord();
 
-            assertEquals(null, record.getValue("first"));
-            assertEquals(null, record.getValue("second"));
-            assertEquals(null, record.getValue("third"));
-            assertEquals(null, record.getValue("fourth"));
-            assertEquals(null, record.getValue("fifth"));
+            assertNull(record.getValue("first"));
+            assertNull(record.getValue("second"));
+            assertNull(record.getValue("third"));
+            assertNull(record.getValue("fourth"));
+            assertNull(record.getValue("fifth"));
             assertEquals("hello there", record.getValue("_raw"));
 
             record = deserializer.nextRecord();
@@ -343,7 +334,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
             final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
@@ -381,7 +372,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
             final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
@@ -403,11 +394,11 @@ public class TestGrokRecordReader {
 
             record = deserializer.nextRecord();
 
-            assertEquals(null, record.getValue("first"));
-            assertEquals(null, record.getValue("second"));
-            assertEquals(null, record.getValue("third"));
-            assertEquals(null, record.getValue("fourth"));
-            assertEquals(null, record.getValue("fifth"));
+            assertNull(record.getValue("first"));
+            assertNull(record.getValue("second"));
+            assertNull(record.getValue("third"));
+            assertNull(record.getValue("fourth"));
+            assertNull(record.getValue("fifth"));
             assertEquals("hello there", record.getValue("_raw"));
 
             record = deserializer.nextRecord();
@@ -425,7 +416,7 @@ public class TestGrokRecordReader {
     }
 
     @Test
-    public void testRawUnmatchedRecordlast() throws GrokException, 
IOException, MalformedRecordException {
+    public void testRawUnmatchedRecordLast() throws GrokException, 
IOException, MalformedRecordException {
         final String nonMatchingRecord = "hello there";
         final String matchingRecord = "1 2 3 4 5";
 
@@ -435,7 +426,7 @@ public class TestGrokRecordReader {
         try (final InputStream in = new ByteArrayInputStream(inputBytes)) {
             final Grok grok = grokCompiler.compile("%{NUMBER:first} 
%{NUMBER:second} %{NUMBER:third} %{NUMBER:fourth} %{NUMBER:fifth}");
 
-            final RecordSchema schema = GrokReader.createRecordSchema(grok);
+            final RecordSchema schema = getRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
             assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
@@ -457,15 +448,26 @@ public class TestGrokRecordReader {
 
             record = deserializer.nextRecord();
 
-            assertEquals(null, record.getValue("first"));
-            assertEquals(null, record.getValue("second"));
-            assertEquals(null, record.getValue("third"));
-            assertEquals(null, record.getValue("fourth"));
-            assertEquals(null, record.getValue("fifth"));
+            assertNull(record.getValue("first"));
+            assertNull(record.getValue("second"));
+            assertNull(record.getValue("third"));
+            assertNull(record.getValue("fourth"));
+            assertNull(record.getValue("fifth"));
             assertEquals("hello there", record.getValue("_raw"));
 
             assertNull(deserializer.nextRecord());
             deserializer.close();
         }
     }
+
+    private RecordSchema getRecordSchema(final Grok grok) {
+        final List<Grok> groks = Collections.singletonList(grok);
+        return GrokReader.createRecordSchema(groks);
+    }
+
+    private GrokRecordReader getRecordReader(final String pattern, final 
InputStream inputStream) {
+        final Grok grok = grokCompiler.compile(pattern);
+        final RecordSchema recordSchema = getRecordSchema(grok);
+        return new GrokRecordReader(inputStream, grok, recordSchema, 
recordSchema, NoMatchStrategy.APPEND);
+    }
 }
\ No newline at end of file

Reply via email to