exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r793722602



##########
File path: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##########
@@ -2306,4 +2307,47 @@ public static boolean isFittingNumberType(final Object 
value, final RecordFieldT
 
         return false;
     }
+
+    public static DataType inferSimpleDataType(final String value) {
+        if (value == null || value.isEmpty()) {
+            return null;
+        }
+
+        if (NumberUtils.isParsable(value)) {

Review comment:
       As mentioned in relation to the commons-lang3 dependency declaration, 
this reference should be replaced with a direct check.  Perhaps implementing a 
simple check for null and empty, together with moving the Boolean string check 
prior to this one would be sufficient?  It would also be helpful to add a unit 
test for this method.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost 
CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = 
"src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = 
"src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = 
"src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = 
"src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = 
"src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = 
"src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = 
"src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = 
"src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = 
"src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = 
"src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String 
INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = 
"src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String 
INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = 
"src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new 
HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), 
RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));

Review comment:
       `Integer.valueOf()` is unnecessary as the compiler will handle boxing.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * CEF (Common Event Format) based implementation for {@code 
org.apache.nifi.serialization.RecordReader}. The implementation
+ * builds on top of the {@code com.fluenda.parcefone.parser.CEFParser} which 
is responsible to deal with the textual representation
+ * of the events. This implementation intends to fill the gap between the CEF 
parser's {@code com.fluenda.parcefone.event.CommonEvent}
+ * data objects and the NiFi's internal Record representation.
+ */
+final class CEFRecordReader implements RecordReader {
+    /**
+     * Currently these are not used but set the way it follows the expected 
CEF format.
+     */
+    private static final String DATE_FORMAT = "MMM dd yyyy";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String DATETIME_FORMAT = DATE_FORMAT + " " + 
TIME_FORMAT;
+
+    private static final Supplier<DateFormat> DATE_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(DATE_FORMAT);
+    private static final Supplier<DateFormat> TIME_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(TIME_FORMAT);
+    private static final Supplier<DateFormat> DATETIME_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(DATETIME_FORMAT);
+
+    private final RecordSchema schema;
+    private final BufferedReader reader;
+    private final CEFParser parser;
+    private final ComponentLog logger;
+    private final Locale locale;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    /**
+     * It would not cause any functional drawback to acquire the custom 
extensions all the time, but there are some cases
+     * (inferred schema when custom extension fields are not included) where 
we can be sure about that these fields are not
+     * required. For better performance, in these cases we do not work with 
these fields.
+     */
+    private final boolean includeCustomExtensions;
+    private final boolean acceptEmptyExtensions;
+
+    CEFRecordReader(
+        final InputStream inputStream,
+        final RecordSchema recordSchema,
+        final CEFParser parser,
+        final ComponentLog logger,
+        final Locale locale,
+        final String rawMessageField,
+        final String invalidField,
+        final boolean includeCustomExtensions,
+        final boolean acceptEmptyExtensions
+    ) {
+        this.reader = new BufferedReader(new InputStreamReader(inputStream));
+        this.schema = recordSchema;
+        this.parser = parser;
+        this.logger = logger;
+        this.locale = locale;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, 
acceptEmptyExtensions, locale);
+
+        if (event == null) {
+            logger.debug("Event parsing resulted no event");
+
+            if (invalidField != null && !invalidField.isEmpty()) {
+                return new MapRecord(schema, 
Collections.singletonMap(invalidField, line));
+            } else {
+                throw new MalformedRecordException("The following line could 
not be parsed by the CEF parser: " + line);
+            }
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+
+        try {
+            event.getHeader().entrySet().forEach(field -> 
values.put(field.getKey(), convertValue(field.getKey(), field.getValue(), 
coerceTypes)));
+            event.getExtension(true, 
includeCustomExtensions).entrySet().forEach(field -> values.put(field.getKey(), 
convertValue(field.getKey() ,field.getValue(), coerceTypes)));
+
+            for (final String fieldName : schema.getFieldNames()) {
+                if (!values.containsKey(fieldName)) {
+                    values.put(fieldName, null);
+                }
+            }
+
+        } catch (final CEFHandlingException e) {
+            throw new MalformedRecordException("Error during extracting 
information from CEF event", e);
+        }
+
+        if (rawMessageField != null) {
+            values.put(rawMessageField, line);
+        }
+
+        return new MapRecord(schema, values, true, dropUnknownFields);
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+
+        return line;
+    }
+
+    private Object convertValue(final String fieldName, final Object 
fieldValue, final boolean coerceType) {
+        final DataType dataType = 
schema.getDataType(fieldName).orElse(RecordFieldType.STRING.getDataType());
+
+        return coerceType
+            ? convert(fieldValue, dataType, fieldName)
+            : convertSimpleIfPossible(fieldValue, dataType, fieldName);
+    }
+
+    private final Object convert(final Object value, final DataType dataType, 
final String fieldName) {

Review comment:
       The `final` keyword is unnecessary on private methods.
   ```suggestion
       private Object convert(final Object value, final DataType dataType, 
final String fieldName) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCEFReader {
+    private TestRunner runner;
+    private TestCEFProcessor processor;
+    private CEFReader reader;
+
+    @BeforeEach
+    public void setUp() {
+        runner = null;
+        processor = null;
+        reader = null;
+    }
+
+    @Test
+    public void testValidatingReaderWhenRawFieldValueIsInvalid() throws 
Exception {
+        setUpReader();
+        setRawField("dst"); // Invalid because there is an extension with the 
same name
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+
+        assertReaderIsInvalid();
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFields() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFieldsAndRaw() throws Exception {
+        setUpReader();
+        setRawField(TestCEFUtil.RAW_FIELD);
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, 
Collections.singletonMap(TestCEFUtil.RAW_FIELD, TestCEFUtil.RAW_VALUE));
+    }
+
+    @Test
+    public void 
testReadingSingleRowWithExtensionFieldsWhenSchemaIsHeadersOnly() throws 
Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFieldsAsStrings() 
throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_AS_STRINGS);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, 
TestCEFUtil.EXPECTED_EXTENSION_VALUES, 
Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, "123"));
+    }
+
+    @Test
+    public void testMisformattedRowsWithoutInvalidFieldIsBeingSet() throws 
Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessorWithError(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+    }
+
+    @Test
+    public void testMisformattedRowsWithInvalidFieldIsSet() throws Exception {
+        setUpReader();
+        setInvalidField();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldIsSet(1, "invalid", "Oct 12 04:16:11 localhost 
CEF:0|nxlog.org|nxlog|2.7.1243|");
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFields() throws 
Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, 
TestCEFUtil.EXPECTED_EXTENSION_VALUES, 
Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 123));
+    }
+
+    @Test
+    public void testReadingMultipleRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_IDENTICAL_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsWithEmptyRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsStartingWithEmptyRow() throws Exception 
{
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        
triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testWithPredefinedSchema() throws Exception {
+        setUpReader();
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithEmptyExtensionFields() throws 
Exception {
+        setUpReader();
+        setAcceptEmptyExtensions();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        
triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, 
TestCEFUtil.EXPECTED_EXTENSION_VALUES, 
Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, ""));
+    }
+
+    private void setUpReader() throws InitializationException {
+        processor = new TestCEFProcessor();
+        runner = TestRunners.newTestRunner(processor);
+        reader = new CEFReader();
+        runner.addControllerService("reader", reader);
+        runner.setProperty(TestCEFProcessor.READER, "reader");
+    }
+
+    private void setAcceptEmptyExtensions() {
+        runner.setProperty(reader, CEFReader.ACCEPT_EMPTY_EXTENSIONS, "true");
+    }
+
+    private void setSchemaIsInferred(final AllowableValue value) {
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA);
+        runner.setProperty(reader, CEFReader.INFERENCE_STRATEGY, value);
+    }
+
+    private void setInvalidField() {
+        runner.setProperty(reader, CEFReader.INVALID_FIELD, "invalid");
+    }
+
+    private void setSchema(List<RecordField> fields) throws 
InitializationException {
+        final MockSchemaRegistry registry = new MockSchemaRegistry();
+        registry.addSchema("predefinedSchema", new SimpleRecordSchema(fields));
+        runner.addControllerService("registry", registry);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_NAME, 
"predefinedSchema");
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, 
"registry");
+        runner.enableControllerService(registry);
+    }
+
+    private void setRawField(final String value) {
+        runner.setProperty(reader, CEFReader.RAW_FIELD, value);
+    }
+
+    private void enableReader() {
+        runner.assertValid(reader);
+        runner.enableControllerService(reader);
+    }
+
+    private void triggerProcessor(final String input) throws 
FileNotFoundException {
+        runner.enqueue(new FileInputStream(input));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestCEFProcessor.SUCCESS);
+    }
+
+    private void triggerProcessorWithError(final String input) {
+        try {
+            runner.enqueue(new FileInputStream(input));
+            runner.run();
+            Assertions.fail();
+        } catch (final Throwable e) {
+            // the TestCEFProcessor wraps the original exception into a 
RuntimeException
+            Assertions.assertTrue(e.getCause() instanceof RuntimeException);
+            Assertions.assertTrue(e.getCause().getCause() instanceof 
IOException);
+        }

Review comment:
       This should be changed to use `assertThrows`, additional checks on the 
returned exception can use other assertions.
   
   ```suggestion
           runner.enqueue(new FileInputStream(input));
           final RuntimeException exception = 
assertThrows(RuntimeException.class, () -> runner.run());
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.validation.Validation;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static 
org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+
+@Tags({"cef", "record", "reader", "parser"})
+@CapabilityDescription("Parses CEF (Common Event Format) events, returning 
each row as a record. "
+    + "This reader allows for inferring a schema based on the first event in 
the FlowFile or providing an explicit schema for interpreting the values.")
+public final class CEFReader extends SchemaRegistryService implements 
RecordReaderFactory {
+
+    static final AllowableValue HEADERS_ONLY = new 
AllowableValue("headers-only", "Headers only", "Includes only CEF header fields 
into the inferred schema.");
+    static final AllowableValue HEADERS_AND_EXTENSIONS = new 
AllowableValue("headers-and-extensions", "Headers and extensions",
+            "Includes the CEF header and extension fields to the schema, but 
not the custom extensions.");
+    static final AllowableValue CUSTOM_EXTENSIONS_AS_STRINGS = new 
AllowableValue("custom-extensions-as-string", "With custom extensions as 
strings",
+            "Includes all fields into the inferred schema, involving custom 
extension fields as string values.");
+    static final AllowableValue CUSTOM_EXTENSIONS_INFERRED = new 
AllowableValue("custom-extensions-inferred", "With custom extensions inferred",
+            "Includes all fields into the inferred schema, involving custom 
extension fields with inferred data types. " +
+            "The inference works based on the values in the FlowFile. In some 
scenarios this might result unsatisfiable behaviour. " +
+            "In these cases it is suggested to use \"" + 
CUSTOM_EXTENSIONS_AS_STRINGS.getDisplayName() + "\" Inference Strategy or 
predefined schema.");
+
+    static final PropertyDescriptor INFERENCE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("inference-strategy")
+            .displayName("Inference Strategy")
+            .description("Defines the set of fields should be included in the 
schema and the way the fields are being interpreted.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .allowableValues(HEADERS_ONLY, HEADERS_AND_EXTENSIONS, 
CUSTOM_EXTENSIONS_AS_STRINGS, CUSTOM_EXTENSIONS_INFERRED)
+            .defaultValue(CUSTOM_EXTENSIONS_INFERRED.getValue())
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA)
+            .build();
+
+    static final PropertyDescriptor RAW_FIELD = new 
PropertyDescriptor.Builder()
+            .name("raw-message-field")
+            .displayName("Raw Message Field")
+            .description("If set the raw message will be added to the record 
using the property value as field name. This is not the same as the 
\"rawEvent\" extension field!")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INVALID_FIELD = new 
PropertyDescriptor.Builder()
+            .name("invalid-message-field")
+            .displayName("Invalid Field")
+            .description("Used when a line in the FlowFile cannot be parsed by 
the CEF parser. " +
+                    "If set, instead of failing to process the FlowFile, a 
record is being added with one field. " +
+                    "This record contains one field with the name specified by 
the property and the raw message as value.")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DATETIME_REPRESENTATION = new 
PropertyDescriptor.Builder()
+            .name("datetime-representation")
+            .displayName("DateTime Locale")
+            .description("The IETF BCP 47 representation of the Locale to be 
used when parsing date " +
+                    "fields with long or short month names (e.g. may <en-US> 
vs. mai. <fr-FR>. The default" +
+                    "value is generally safe. Only change if having issues 
parsing CEF messages")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new ValidateLocale())
+            .defaultValue("en-US")
+            .build();
+
+    static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new 
PropertyDescriptor.Builder()
+            .name("accept-empty-extensions")
+            .displayName("Accept empty extensions")
+            .description("If set to true, empty extensions will be accepted 
and will be associated to a null value.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    private final javax.validation.Validator validator = 
Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private volatile String rawMessageField;
+    private volatile String invalidField;
+    private volatile Locale parcefoneLocale;
+    private volatile boolean includeCustomExtensions;
+    private volatile boolean acceptEmptyExtensions;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RAW_FIELD);
+        properties.add(INVALID_FIELD);
+        properties.add(DATETIME_REPRESENTATION);
+        properties.add(INFERENCE_STRATEGY);
+
+        properties.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA)
+                .build());
+
+        properties.add(ACCEPT_EMPTY_EXTENSIONS);
+        return properties;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>();
+        allowableValues.addAll(super.getSchemaAccessStrategyValues());
+        allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
+        return allowableValues;
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SchemaInferenceUtil.INFER_SCHEMA;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+        if (strategy.equals(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
+            final String inferenceStrategy = 
context.getProperty(INFERENCE_STRATEGY).getValue();
+            final CEFSchemaInferenceBuilder builder = new 
CEFSchemaInferenceBuilder();
+
+            if (inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue())) {
+                builder.withExtensions();
+            } else if 
(inferenceStrategy.equals(CUSTOM_EXTENSIONS_AS_STRINGS.getValue())) {
+                
builder.withCustomExtensions(CEFCustomExtensionTypeResolver.STRING_RESOLVER);
+            } else if 
(inferenceStrategy.equals(CUSTOM_EXTENSIONS_INFERRED.getValue())) {
+                
builder.withCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+            }
+
+            if (rawMessageField != null) {
+                builder.withRawMessage(rawMessageField);
+            }
+
+            if (invalidField != null) {
+                builder.withInvalidField(invalidField);
+            }
+
+            final boolean failFast = invalidField == null || 
invalidField.isEmpty();
+            final CEFSchemaInference inference = builder.build();
+            return SchemaInferenceUtil.getSchemaAccessStrategy(
+                strategy,
+                context,
+                getLogger(),
+                (variables, in) -> new CEFRecordSource(in, parser, 
parcefoneLocale, acceptEmptyExtensions, failFast),
+                () -> inference,
+                () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, 
context));
+        }
+
+        return super.getSchemaAccessStrategy(strategy, schemaRegistry, 
context);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        rawMessageField = 
context.getProperty(RAW_FIELD).evaluateAttributeExpressions().getValue();
+        invalidField = 
context.getProperty(INVALID_FIELD).evaluateAttributeExpressions().getValue();
+        parcefoneLocale = 
Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).evaluateAttributeExpressions().getValue());
+
+        final String inferenceStrategy = 
context.getProperty(INFERENCE_STRATEGY).getValue();
+        final boolean inferenceNeedsCustomExtensions = 
!inferenceStrategy.equals(HEADERS_ONLY.getValue()) && 
!inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue());
+        final boolean isInferSchema =  
context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue().equals(SchemaInferenceUtil.INFER_SCHEMA.getValue());
+        includeCustomExtensions = !isInferSchema || (isInferSchema && 
inferenceNeedsCustomExtensions);

Review comment:
       This can be simplified as follows:
   ```suggestion
           includeCustomExtensions = !isInferSchema || 
inferenceNeedsCustomExtensions;
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.validation.Validation;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static 
org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+
+@Tags({"cef", "record", "reader", "parser"})
+@CapabilityDescription("Parses CEF (Common Event Format) events, returning 
each row as a record. "
+    + "This reader allows for inferring a schema based on the first event in 
the FlowFile or providing an explicit schema for interpreting the values.")
+public final class CEFReader extends SchemaRegistryService implements 
RecordReaderFactory {
+
+    static final AllowableValue HEADERS_ONLY = new 
AllowableValue("headers-only", "Headers only", "Includes only CEF header fields 
into the inferred schema.");
+    static final AllowableValue HEADERS_AND_EXTENSIONS = new 
AllowableValue("headers-and-extensions", "Headers and extensions",
+            "Includes the CEF header and extension fields to the schema, but 
not the custom extensions.");
+    static final AllowableValue CUSTOM_EXTENSIONS_AS_STRINGS = new 
AllowableValue("custom-extensions-as-string", "With custom extensions as 
strings",
+            "Includes all fields into the inferred schema, involving custom 
extension fields as string values.");
+    static final AllowableValue CUSTOM_EXTENSIONS_INFERRED = new 
AllowableValue("custom-extensions-inferred", "With custom extensions inferred",
+            "Includes all fields into the inferred schema, involving custom 
extension fields with inferred data types. " +
+            "The inference works based on the values in the FlowFile. In some 
scenarios this might result unsatisfiable behaviour. " +
+            "In these cases it is suggested to use \"" + 
CUSTOM_EXTENSIONS_AS_STRINGS.getDisplayName() + "\" Inference Strategy or 
predefined schema.");
+
+    static final PropertyDescriptor INFERENCE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("inference-strategy")
+            .displayName("Inference Strategy")
+            .description("Defines the set of fields should be included in the 
schema and the way the fields are being interpreted.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .allowableValues(HEADERS_ONLY, HEADERS_AND_EXTENSIONS, 
CUSTOM_EXTENSIONS_AS_STRINGS, CUSTOM_EXTENSIONS_INFERRED)
+            .defaultValue(CUSTOM_EXTENSIONS_INFERRED.getValue())
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA)
+            .build();
+
+    static final PropertyDescriptor RAW_FIELD = new 
PropertyDescriptor.Builder()
+            .name("raw-message-field")
+            .displayName("Raw Message Field")
+            .description("If set the raw message will be added to the record 
using the property value as field name. This is not the same as the 
\"rawEvent\" extension field!")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INVALID_FIELD = new 
PropertyDescriptor.Builder()
+            .name("invalid-message-field")
+            .displayName("Invalid Field")
+            .description("Used when a line in the FlowFile cannot be parsed by 
the CEF parser. " +
+                    "If set, instead of failing to process the FlowFile, a 
record is being added with one field. " +
+                    "This record contains one field with the name specified by 
the property and the raw message as value.")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DATETIME_REPRESENTATION = new 
PropertyDescriptor.Builder()
+            .name("datetime-representation")
+            .displayName("DateTime Locale")
+            .description("The IETF BCP 47 representation of the Locale to be 
used when parsing date " +
+                    "fields with long or short month names (e.g. may <en-US> 
vs. mai. <fr-FR>. The default" +
+                    "value is generally safe. Only change if having issues 
parsing CEF messages")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new ValidateLocale())
+            .defaultValue("en-US")
+            .build();
+
+    static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new 
PropertyDescriptor.Builder()
+            .name("accept-empty-extensions")
+            .displayName("Accept empty extensions")
+            .description("If set to true, empty extensions will be accepted 
and will be associated to a null value.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    private final javax.validation.Validator validator = 
Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private volatile String rawMessageField;
+    private volatile String invalidField;
+    private volatile Locale parcefoneLocale;
+    private volatile boolean includeCustomExtensions;
+    private volatile boolean acceptEmptyExtensions;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RAW_FIELD);
+        properties.add(INVALID_FIELD);
+        properties.add(DATETIME_REPRESENTATION);
+        properties.add(INFERENCE_STRATEGY);
+
+        properties.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA)
+                .build());
+
+        properties.add(ACCEPT_EMPTY_EXTENSIONS);
+        return properties;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>();
+        allowableValues.addAll(super.getSchemaAccessStrategyValues());

Review comment:
       This can be collapsed into one declaration:
   ```suggestion
           final List<AllowableValue> allowableValues = new 
ArrayList<>(super.getSchemaAccessStrategyValues());
   ```

##########
File path: nifi-commons/nifi-record/pom.xml
##########
@@ -15,6 +15,14 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.12.0</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>

Review comment:
       Reviewing the comments on this module, it should not have any external 
dependencies.  Although `NumberUtils.isParsable()` is convenient, the approach 
should be refactored to avoid this dependency.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * CEF (Common Event Format) based implementation for {@code 
org.apache.nifi.serialization.RecordReader}. The implementation
+ * builds on top of the {@code com.fluenda.parcefone.parser.CEFParser} which 
is responsible to deal with the textual representation
+ * of the events. This implementation intends to fill the gap between the CEF 
parser's {@code com.fluenda.parcefone.event.CommonEvent}
+ * data objects and the NiFi's internal Record representation.
+ */
+final class CEFRecordReader implements RecordReader {
+    /**
+     * Currently these are not used but set the way it follows the expected 
CEF format.
+     */
+    private static final String DATE_FORMAT = "MMM dd yyyy";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String DATETIME_FORMAT = DATE_FORMAT + " " + 
TIME_FORMAT;
+
+    private static final Supplier<DateFormat> DATE_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(DATE_FORMAT);
+    private static final Supplier<DateFormat> TIME_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(TIME_FORMAT);
+    private static final Supplier<DateFormat> DATETIME_FORMAT_SUPPLIER = () -> 
DataTypeUtils.getDateFormat(DATETIME_FORMAT);
+
+    private final RecordSchema schema;
+    private final BufferedReader reader;
+    private final CEFParser parser;
+    private final ComponentLog logger;
+    private final Locale locale;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    /**
+     * It would not cause any functional drawback to acquire the custom 
extensions all the time, but there are some cases
+     * (inferred schema when custom extension fields are not included) where 
we can be sure about that these fields are not
+     * required. For better performance, in these cases we do not work with 
these fields.
+     */
+    private final boolean includeCustomExtensions;
+    private final boolean acceptEmptyExtensions;
+
+    CEFRecordReader(
+        final InputStream inputStream,
+        final RecordSchema recordSchema,
+        final CEFParser parser,
+        final ComponentLog logger,
+        final Locale locale,
+        final String rawMessageField,
+        final String invalidField,
+        final boolean includeCustomExtensions,
+        final boolean acceptEmptyExtensions
+    ) {
+        this.reader = new BufferedReader(new InputStreamReader(inputStream));
+        this.schema = recordSchema;
+        this.parser = parser;
+        this.logger = logger;
+        this.locale = locale;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, 
acceptEmptyExtensions, locale);
+
+        if (event == null) {
+            logger.debug("Event parsing resulted no event");
+
+            if (invalidField != null && !invalidField.isEmpty()) {
+                return new MapRecord(schema, 
Collections.singletonMap(invalidField, line));
+            } else {
+                throw new MalformedRecordException("The following line could 
not be parsed by the CEF parser: " + line);
+            }
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+
+        try {
+            event.getHeader().entrySet().forEach(field -> 
values.put(field.getKey(), convertValue(field.getKey(), field.getValue(), 
coerceTypes)));
+            event.getExtension(true, 
includeCustomExtensions).entrySet().forEach(field -> values.put(field.getKey(), 
convertValue(field.getKey() ,field.getValue(), coerceTypes)));
+
+            for (final String fieldName : schema.getFieldNames()) {
+                if (!values.containsKey(fieldName)) {
+                    values.put(fieldName, null);
+                }
+            }
+
+        } catch (final CEFHandlingException e) {
+            throw new MalformedRecordException("Error during extracting 
information from CEF event", e);
+        }
+
+        if (rawMessageField != null) {
+            values.put(rawMessageField, line);
+        }
+
+        return new MapRecord(schema, values, true, dropUnknownFields);
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+
+        return line;
+    }
+
+    private Object convertValue(final String fieldName, final Object 
fieldValue, final boolean coerceType) {
+        final DataType dataType = 
schema.getDataType(fieldName).orElse(RecordFieldType.STRING.getDataType());
+
+        return coerceType
+            ? convert(fieldValue, dataType, fieldName)
+            : convertSimpleIfPossible(fieldValue, dataType, fieldName);
+    }
+
+    private final Object convert(final Object value, final DataType dataType, 
final String fieldName) {
+        return DataTypeUtils.convertType(prepareValue(value), dataType, 
DATE_FORMAT_SUPPLIER, TIME_FORMAT_SUPPLIER, DATETIME_FORMAT_SUPPLIER, 
fieldName);
+    }
+
+    private final Object convertSimpleIfPossible(final Object value, final 
DataType dataType, final String fieldName) {

Review comment:
       The `final` keyword is unnecessary on private methods.
   ```suggestion
       private Object convertSimpleIfPossible(final Object value, final 
DataType dataType, final String fieldName) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost 
CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = 
"src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = 
"src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = 
"src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = 
"src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = 
"src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = 
"src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = 
"src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = 
"src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = 
"src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = 
"src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = 
"src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String 
INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = 
"src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String 
INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = 
"src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new 
HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new 
RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 
RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), 
RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));
+        EXPECTED_HEADER_VALUES.put("deviceVendor", "Company");
+        EXPECTED_HEADER_VALUES.put("deviceProduct", "Product");
+        EXPECTED_HEADER_VALUES.put("deviceVersion", "1.2.3");
+        EXPECTED_HEADER_VALUES.put("deviceEventClassId", "audit-login");
+        EXPECTED_HEADER_VALUES.put("name", "Successful login");
+        EXPECTED_HEADER_VALUES.put("severity", "3");
+
+        EXPECTED_EXTENSION_VALUES.put("cn1Label", "userid");
+        EXPECTED_EXTENSION_VALUES.put("spt", Integer.valueOf(46117));
+        EXPECTED_EXTENSION_VALUES.put("cn1", Long.valueOf(99999));
+        EXPECTED_EXTENSION_VALUES.put("cfp1", Float.valueOf(1.23F));

Review comment:
       The `valueOf()` wrapping calls are unnecessary since the compiler will 
handle boxing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to