Repository: nifi
Updated Branches:
  refs/heads/master ec7f13160 -> b10220439


http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
new file mode 100644
index 0000000..fdff68b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.syslog;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
+import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
+import org.apache.nifi.syslog.attributes.SyslogAttributes;
+import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
+import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
+import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
+import org.apache.nifi.syslog.utils.NilHandlingPolicy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", 
"reader"})
+@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant 
Syslog data, such as log files, and structuring the data" +
+        " so that it can be processed.")
+public class Syslog5424Reader extends SchemaRegistryService implements 
RecordReaderFactory {
+    public static final String RFC_5424_SCHEMA_NAME = "default-5424-schema";
+    static final AllowableValue RFC_5424_SCHEMA = new 
AllowableValue(RFC_5424_SCHEMA_NAME, "Use RFC 5424 Schema",
+            "The schema will be the default schema per RFC 5424.");
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies which character set of the Syslog 
messages")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    private volatile StrictSyslog5424Parser parser;
+    private volatile RecordSchema recordSchema;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(1);
+        properties.add(CHARSET);
+        return properties;
+    }
+
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final String charsetName = context.getProperty(CHARSET).getValue();
+        parser = new StrictSyslog5424Parser(Charset.forName(charsetName), 
NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new 
SimpleKeyProvider());
+        recordSchema = createRecordSchema();
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>();
+        allowableValues.add(RFC_5424_SCHEMA);
+        return allowableValues;
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return RFC_5424_SCHEMA;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext 
context) {
+        return createAccessStrategy();
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) 
{
+        return createAccessStrategy();
+    }
+
+    static RecordSchema createRecordSchema() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.FACILITY.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.VERSION.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), 
RecordFieldType.TIMESTAMP.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(SyslogAttributes.BODY.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), 
RecordFieldType.STRING.getDataType(), true));
+        fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
+                
RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
+
+        SchemaIdentifier schemaIdentifier = new 
StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build();
+        final RecordSchema schema = new 
SimpleRecordSchema(fields,schemaIdentifier);
+        return schema;
+    }
+
+    private SchemaAccessStrategy createAccessStrategy() {
+        return new SchemaAccessStrategy() {
+            private final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
+
+
+            @Override
+            public RecordSchema getSchema(Map<String, String> variables, 
InputStream contentStream, RecordSchema readSchema) throws 
SchemaNotFoundException {
+                return recordSchema;
+            }
+
+            @Override
+            public Set<SchemaField> getSuppliedSchemaFields() {
+                return schemaFields;
+            }
+        };
+    }
+
+    @Override
+    public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final ComponentLog logger) throws IOException, 
SchemaNotFoundException {
+        final RecordSchema schema = getSchema(variables, in, null);
+        return new Syslog5424RecordReader(parser, in, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
new file mode 100644
index 0000000..5c5d085
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.syslog;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.syslog.attributes.SyslogAttributes;
+import org.apache.nifi.syslog.events.Syslog5424Event;
+import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Syslog5424RecordReader implements RecordReader {
+    private final BufferedReader reader;
+    private RecordSchema schema;
+    private final StrictSyslog5424Parser parser;
+
+    public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream 
in, RecordSchema schema){
+        this.reader = new BufferedReader(new InputStreamReader(in));
+        this.schema = schema;
+        this.parser = parser;
+    }
+
+    @Override
+    public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) 
throws IOException, MalformedRecordException {
+        String line = reader.readLine();
+
+        if ( line == null ) {
+            // a null return from readLine() signals the end of the stream
+            return null;
+        }
+
+        if (StringUtils.isBlank(line)) {
+            // while an empty string is an error
+            throw new MalformedRecordException("Encountered a blank message!");
+        }
+
+
+        final MalformedRecordException malformedRecordException;
+        Syslog5424Event event = 
parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
+
+        if (!event.isValid()) {
+            if (event.getException() != null) {
+                malformedRecordException = new MalformedRecordException(
+                        String.format("Failed to parse %s as a Syslog message: 
it does not conform to any of the RFC "+
+                                "formats supported", line), 
event.getException());
+            } else {
+                malformedRecordException = new MalformedRecordException(
+                        String.format("Failed to parse %s as a Syslog message: 
it does not conform to any of the RFC" +
+                                " formats supported", line));
+            }
+            throw malformedRecordException;
+        }
+
+        Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
+        
modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
+
+        return new MapRecord(schema,modifiedMap);
+    }
+
+    @Override
+    public RecordSchema getSchema() throws MalformedRecordException {
+        return schema;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.reader.close();
+    }
+
+    private Timestamp convertTimeStamp(String timeString) {
+        /*
+        From RFC 5424: https://tools.ietf.org/html/rfc5424#page-11
+
+        The TIMESTAMP field is a formalized timestamp derived from [RFC3339].
+
+                Whereas [RFC3339] makes allowances for multiple syntaxes, this
+        document imposes further restrictions.  The TIMESTAMP value MUST
+        follow these restrictions:
+
+        o  The "T" and "Z" characters in this syntax MUST be upper case.
+
+        o  Usage of the "T" character is REQUIRED.
+
+        o  Leap seconds MUST NOT be used.
+        */
+
+        if (timeString == null) {
+            return null;
+        }
+        return 
Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(timeString)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 3ece6c0..6375ef7 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -25,7 +25,7 @@ org.apache.nifi.csv.CSVRecordSetWriter
 
 org.apache.nifi.grok.GrokReader
 
-org.apache.nifi.text.FreeFormTextRecordSetWriter
+org.apache.nifi.syslog.Syslog5424Reader
 
 org.apache.nifi.xml.XMLReader
 org.apache.nifi.xml.XMLRecordSetWriter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html
new file mode 100644
index 0000000..06e1f92
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html
@@ -0,0 +1,91 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8"/>
+        <title>Syslog5424Reader</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+    </head>
+
+    <body>
+        <p>
+               The Syslog5424Reader Controller Service provides a means for 
parsing valid <a href="https://tools.ietf.org/html/rfc5424";>RFC 5424 Syslog</a> 
 messages.
+                       This service produces records with a set schema to 
match the specification.
+               </p>
+               
+               <p>
+               The Required Property of this service is named <code>Character 
Set</code> and specifies the Character Set of the incoming text.
+        </p>
+
+               <h2>Schemas</h2>
+               
+               <p>
+                       When a record is parsed from incoming data, it is 
parsed into the RFC 5424 schema.
+                       <h4>The RFC 5424 schema</h4>
+                       <code><pre>
+                               {
+                                 "type" : "record",
+                                 "name" : "nifiRecord",
+                                 "namespace" : "org.apache.nifi",
+                                 "fields" : [ {
+                                       "name" : "priority",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "severity",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "facility",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "version",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "timestamp",
+                                       "type" : [ "null", {
+                                         "type" : "long",
+                                         "logicalType" : "timestamp-millis"
+                                       } ]
+                                 }, {
+                                       "name" : "hostname",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "body",
+                                       "type" : [ "null", "string" ]
+                                 },
+                                       "name" : "appName",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "procid",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "messageid",
+                                       "type" : [ "null", "string" ]
+                                 }, {
+                                       "name" : "structuredData",
+                                       "type" : [ "null", {
+                                         "type" : "map",
+                                         "values" : {
+                                               "type" : "map",
+                                               "values" : "string"
+                                         }
+                                       } ]
+                                 } ]
+                               }
+                       </pre></code>
+               </p>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
new file mode 100644
index 0000000..4a9c918
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.syslog;
+
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
+import org.apache.nifi.syslog.attributes.SyslogAttributes;
+import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider;
+import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
+import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
+import org.apache.nifi.syslog.utils.NilHandlingPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestSyslog5424RecordReader {
+    private static final Charset CHARSET = Charset.forName("UTF-8");
+    private static final String expectedVersion = "1";
+    private static final String expectedMessage = "Removing instance";
+    private static final String expectedAppName = 
"d0602076-b14a-4c55-852a-981e7afeed38";
+    private static final String expectedHostName = "loggregator";
+    private static final String expectedPri = "14";
+    private static final String expectedProcId = "DEA";
+    private static final Timestamp expectedTimestamp = 
Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse("2014-06-20T09:14:07+00:00")));
+    private static final String expectedMessageId = "MSG-01";
+    private static final String expectedFacility = "1";
+    private static final String expectedSeverity = "6";
+
+    private static final String expectedIUT1 = "3";
+    private static final String expectedIUT2 = "4";
+    private static final String expectedEventSource1 = "Application";
+    private static final String expectedEventSource2 = "Other Application";
+    private static final String expectedEventID1 = "1011";
+    private static final String expectedEventID2 = "2022";
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testParseSingleLine() throws IOException, 
MalformedRecordException {
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/syslog/syslog5424/log_all.txt"))) {
+            StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+                    NilHandlingPolicy.NULL,
+                    NifiStructuredDataPolicy.MAP_OF_MAPS,
+                    new SimpleKeyProvider());
+            final Syslog5424RecordReader deserializer = new 
Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
+
+            final Record record = deserializer.nextRecord();
+            assertNotNull(record.getValues());
+
+            Assert.assertEquals(expectedVersion, 
record.getAsString(SyslogAttributes.VERSION.key()));
+            Assert.assertEquals(expectedMessage, 
record.getAsString(SyslogAttributes.BODY.key()));
+            Assert.assertEquals(expectedAppName, 
record.getAsString(Syslog5424Attributes.APP_NAME.key()));
+            Assert.assertEquals(expectedHostName, 
record.getAsString(SyslogAttributes.HOSTNAME.key()));
+            Assert.assertEquals(expectedPri, 
record.getAsString(SyslogAttributes.PRIORITY.key()));
+            Assert.assertEquals(expectedSeverity, 
record.getAsString(SyslogAttributes.SEVERITY.key()));
+            Assert.assertEquals(expectedFacility, 
record.getAsString(SyslogAttributes.FACILITY.key()));
+            Assert.assertEquals(expectedProcId, 
record.getAsString(Syslog5424Attributes.PROCID.key()));
+            Assert.assertEquals(expectedTimestamp, 
(Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key()));
+            Assert.assertEquals(expectedMessageId, 
record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
+
+            
Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
+            Map<String,Object> structured = 
(Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
+
+            Assert.assertTrue(structured.containsKey("exampleSDID@32473"));
+            Map<String, Object> example1 = (Map<String, Object>) 
structured.get("exampleSDID@32473");
+
+            Assert.assertTrue(example1.containsKey("iut"));
+            Assert.assertTrue(example1.containsKey("eventSource"));
+            Assert.assertTrue(example1.containsKey("eventID"));
+            Assert.assertEquals(expectedIUT1, example1.get("iut").toString());
+            Assert.assertEquals(expectedEventSource1, 
example1.get("eventSource").toString());
+            Assert.assertEquals(expectedEventID1, 
example1.get("eventID").toString());
+
+            Assert.assertTrue(structured.containsKey("exampleSDID@32480"));
+            Map<String, Object> example2 = (Map<String, Object>) 
structured.get("exampleSDID@32480");
+
+            Assert.assertTrue(example2.containsKey("iut"));
+            Assert.assertTrue(example2.containsKey("eventSource"));
+            Assert.assertTrue(example2.containsKey("eventID"));
+            Assert.assertEquals(expectedIUT2, example2.get("iut").toString());
+            Assert.assertEquals(expectedEventSource2, 
example2.get("eventSource").toString());
+            Assert.assertEquals(expectedEventID2, 
example2.get("eventID").toString());
+            assertNull(deserializer.nextRecord());
+            deserializer.close();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testParseSingleLineSomeNulls() throws IOException, 
MalformedRecordException {
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/syslog/syslog5424/log.txt"))) {
+            StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+                    NilHandlingPolicy.NULL,
+                    NifiStructuredDataPolicy.MAP_OF_MAPS,
+                    new SimpleKeyProvider());
+            final Syslog5424RecordReader deserializer = new 
Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
+
+            final Record record = deserializer.nextRecord();
+            assertNotNull(record.getValues());
+
+            Assert.assertEquals(expectedVersion, 
record.getAsString(SyslogAttributes.VERSION.key()));
+            Assert.assertEquals(expectedMessage, 
record.getAsString(SyslogAttributes.BODY.key()));
+            Assert.assertEquals(expectedAppName, 
record.getAsString(Syslog5424Attributes.APP_NAME.key()));
+            Assert.assertEquals(expectedHostName, 
record.getAsString(SyslogAttributes.HOSTNAME.key()));
+            Assert.assertEquals(expectedPri, 
record.getAsString(SyslogAttributes.PRIORITY.key()));
+            Assert.assertEquals(expectedSeverity, 
record.getAsString(SyslogAttributes.SEVERITY.key()));
+            Assert.assertEquals(expectedFacility, 
record.getAsString(SyslogAttributes.FACILITY.key()));
+            Assert.assertEquals(expectedProcId, 
record.getAsString(Syslog5424Attributes.PROCID.key()));
+            Assert.assertEquals(expectedTimestamp, 
(Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key()));
+            
Assert.assertNull(record.getAsString(Syslog5424Attributes.MESSAGEID.key()));
+
+            
Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()));
+            Map<String,Object> structured = 
(Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key());
+
+            Assert.assertTrue(structured.containsKey("exampleSDID@32473"));
+            Map<String, Object> example1 = (Map<String, Object>) 
structured.get("exampleSDID@32473");
+
+            Assert.assertTrue(example1.containsKey("iut"));
+            Assert.assertTrue(example1.containsKey("eventSource"));
+            Assert.assertTrue(example1.containsKey("eventID"));
+            Assert.assertEquals(expectedIUT1, example1.get("iut").toString());
+            Assert.assertEquals(expectedEventSource1, 
example1.get("eventSource").toString());
+            Assert.assertEquals(expectedEventID1, 
example1.get("eventID").toString());
+
+            Assert.assertTrue(structured.containsKey("exampleSDID@32480"));
+            Map<String, Object> example2 = (Map<String, Object>) 
structured.get("exampleSDID@32480");
+
+            Assert.assertTrue(example2.containsKey("iut"));
+            Assert.assertTrue(example2.containsKey("eventSource"));
+            Assert.assertTrue(example2.containsKey("eventID"));
+            Assert.assertEquals(expectedIUT2, example2.get("iut").toString());
+            Assert.assertEquals(expectedEventSource2, 
example2.get("eventSource").toString());
+            Assert.assertEquals(expectedEventID2, 
example2.get("eventID").toString());
+            assertNull(deserializer.nextRecord());
+            deserializer.close();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testParseMultipleLine() throws IOException, 
MalformedRecordException {
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/syslog/syslog5424/log_mix.txt"))) {
+            StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+                    NilHandlingPolicy.NULL,
+                    NifiStructuredDataPolicy.MAP_OF_MAPS,
+                    new SimpleKeyProvider());
+            final Syslog5424RecordReader deserializer = new 
Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
+
+            Record record = deserializer.nextRecord();
+            int count = 0;
+            while (record != null){
+                assertNotNull(record.getValues());
+                count++;
+                record = deserializer.nextRecord();
+            }
+            Assert.assertEquals(count, 3);
+            deserializer.close();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testParseMultipleLineWithError() throws IOException, 
MalformedRecordException {
+        try (final InputStream fis = new FileInputStream(new 
File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) {
+            StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+                    NilHandlingPolicy.NULL,
+                    NifiStructuredDataPolicy.MAP_OF_MAPS,
+                    new SimpleKeyProvider());
+            final Syslog5424RecordReader deserializer = new 
Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema());
+
+            Record record = deserializer.nextRecord();
+            int count = 0;
+            int exceptionCount = 0;
+            while (record != null){
+                assertNotNull(record.getValues());
+                try {
+                    record = deserializer.nextRecord();
+                    count++;
+                } catch (Exception e) {
+                    exceptionCount++;
+                }
+            }
+            Assert.assertEquals(count, 3);
+            Assert.assertEquals(exceptionCount,1);
+            deserializer.close();
+        }
+    }
+
+    public void writeSchema() {
+        String s = Syslog5424Reader.createRecordSchema().toString();
+        System.out.println(s);
+        System.out.println(AvroTypeUtil.extractAvroSchema( 
Syslog5424Reader.createRecordSchema() ).toString(true));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt
new file mode 100644
index 0000000..8d25f7c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt
@@ -0,0 +1 @@
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA - [exampleSDID@32473 iut="3" 
eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" 
eventSource="Other Application" eventID="2022"] Removing instance
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt
new file mode 100644
index 0000000..e24e87e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt
@@ -0,0 +1 @@
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" 
eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" 
eventSource="Other Application" eventID="2022"] Removing instance
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt
new file mode 100644
index 0000000..067277e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt
@@ -0,0 +1,3 @@
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance
+<14>1 2014-06-20T09:14:07Z loggregator d0602076-b14a-4c55-852a-981e7afeed38 
DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] 
[exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] 
Removing instance
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" 
eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" 
eventSource="Other Application" eventID="2022"] Removing instance
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt
new file mode 100644
index 0000000..c4fa3a4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt
@@ -0,0 +1,4 @@
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance
+POISONPILL 30303030
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" 
eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" 
eventSource="Other Application" eventID="2022"] Removing instance
+<14>1 2014-06-20T09:14:07+00:00 loggregator 
d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" 
eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" 
eventSource="Other Application" eventID="2022"] Removing instance
\ No newline at end of file

Reply via email to