Repository: nifi Updated Branches: refs/heads/master ec7f13160 -> b10220439
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java new file mode 100644 index 0000000..fdff68b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.syslog; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.serialization.record.StandardSchemaIdentifier; +import org.apache.nifi.syslog.attributes.Syslog5424Attributes; +import org.apache.nifi.syslog.attributes.SyslogAttributes; +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider; +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy; +import org.apache.nifi.syslog.utils.NilHandlingPolicy; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"syslog 5424", "syslog", "logs", "logfiles", "parse", "text", "record", "reader"}) +@CapabilityDescription("Provides a mechanism for reading RFC 5424 compliant Syslog data, such as log files, and structuring the data" + + " so that it can be processed.") +public class Syslog5424Reader extends SchemaRegistryService implements RecordReaderFactory { + public static final String RFC_5424_SCHEMA_NAME = "default-5424-schema"; + static final AllowableValue RFC_5424_SCHEMA = new AllowableValue(RFC_5424_SCHEMA_NAME, "Use RFC 5424 Schema", + "The schema will be the default schema per RFC 5424."); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies which character set of the Syslog messages") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + private volatile StrictSyslog5424Parser parser; + private volatile RecordSchema recordSchema; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(1); + properties.add(CHARSET); + return properties; + } + + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final String charsetName = context.getProperty(CHARSET).getValue(); + parser = new StrictSyslog5424Parser(Charset.forName(charsetName), NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider()); + recordSchema = createRecordSchema(); + } + + @Override + protected List<AllowableValue> getSchemaAccessStrategyValues() { + final List<AllowableValue> allowableValues = new ArrayList<>(); + allowableValues.add(RFC_5424_SCHEMA); + return allowableValues; + } + + @Override + protected AllowableValue getDefaultSchemaAccessStrategy() { + return RFC_5424_SCHEMA; + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + return createAccessStrategy(); + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) { + return createAccessStrategy(); + } + + static RecordSchema createRecordSchema() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField(SyslogAttributes.PRIORITY.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.SEVERITY.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.FACILITY.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.VERSION.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.TIMESTAMP.key(), RecordFieldType.TIMESTAMP.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.HOSTNAME.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(SyslogAttributes.BODY.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(Syslog5424Attributes.APP_NAME.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(Syslog5424Attributes.PROCID.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(Syslog5424Attributes.MESSAGEID.key(), RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(), + RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())))); + + SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build(); + final RecordSchema schema = new SimpleRecordSchema(fields,schemaIdentifier); + return schema; + } + + private SchemaAccessStrategy createAccessStrategy() { + return new SchemaAccessStrategy() { + private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); + + + @Override + public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException { + return recordSchema; + } + + @Override + public Set<SchemaField> getSuppliedSchemaFields() { + return schemaFields; + } + }; + } + + @Override + public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + final RecordSchema schema = getSchema(variables, in, null); + return new Syslog5424RecordReader(parser, in, schema); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java new file mode 100644 index 0000000..5c5d085 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.syslog; + +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.syslog.attributes.SyslogAttributes; +import org.apache.nifi.syslog.events.Syslog5424Event; +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; +import org.apache.nifi.util.StringUtils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + +public class Syslog5424RecordReader implements RecordReader { + private final BufferedReader reader; + private RecordSchema schema; + private final StrictSyslog5424Parser parser; + + public Syslog5424RecordReader(StrictSyslog5424Parser parser, InputStream in, RecordSchema schema){ + this.reader = new BufferedReader(new InputStreamReader(in)); + this.schema = schema; + this.parser = parser; + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException { + String line = reader.readLine(); + + if ( line == null ) { + // a null return from readLine() signals the end of the stream + return null; + } + + if (StringUtils.isBlank(line)) { + // while an empty string is an error + throw new MalformedRecordException("Encountered a blank message!"); + } + + + final MalformedRecordException malformedRecordException; + Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName()))); + + if (!event.isValid()) { + if (event.getException() != null) { + malformedRecordException = new MalformedRecordException( + String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+ + "formats supported", line), event.getException()); + } else { + malformedRecordException = new MalformedRecordException( + String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" + + " formats supported", line)); + } + throw malformedRecordException; + } + + Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap()); + modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key()))); + + return new MapRecord(schema,modifiedMap); + } + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + return schema; + } + + @Override + public void close() throws IOException { + this.reader.close(); + } + + private Timestamp convertTimeStamp(String timeString) { + /* + From RFC 5424: https://tools.ietf.org/html/rfc5424#page-11 + + The TIMESTAMP field is a formalized timestamp derived from [RFC3339]. + + Whereas [RFC3339] makes allowances for multiple syntaxes, this + document imposes further restrictions. The TIMESTAMP value MUST + follow these restrictions: + + o The "T" and "Z" characters in this syntax MUST be upper case. + + o Usage of the "T" character is REQUIRED. + + o Leap seconds MUST NOT be used. + */ + + if (timeString == null) { + return null; + } + return Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(timeString))); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 3ece6c0..6375ef7 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -25,7 +25,7 @@ org.apache.nifi.csv.CSVRecordSetWriter org.apache.nifi.grok.GrokReader -org.apache.nifi.text.FreeFormTextRecordSetWriter +org.apache.nifi.syslog.Syslog5424Reader org.apache.nifi.xml.XMLReader org.apache.nifi.xml.XMLRecordSetWriter \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html new file mode 100644 index 0000000..06e1f92 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.syslog.Syslog5424Reader/additionalDetails.html @@ -0,0 +1,91 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8"/> + <title>Syslog5424Reader</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + </head> + + <body> + <p> + The Syslog5424Reader Controller Service provides a means for parsing valid <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 Syslog</a> messages. + This service produces records with a set schema to match the specification. + </p> + + <p> + The Required Property of this service is named <code>Character Set</code> and specifies the Character Set of the incoming text. + </p> + + <h2>Schemas</h2> + + <p> + When a record is parsed from incoming data, it is parsed into the RFC 5424 schema. + <h4>The RFC 5424 schema</h4> + <code><pre> + { + "type" : "record", + "name" : "nifiRecord", + "namespace" : "org.apache.nifi", + "fields" : [ { + "name" : "priority", + "type" : [ "null", "string" ] + }, { + "name" : "severity", + "type" : [ "null", "string" ] + }, { + "name" : "facility", + "type" : [ "null", "string" ] + }, { + "name" : "version", + "type" : [ "null", "string" ] + }, { + "name" : "timestamp", + "type" : [ "null", { + "type" : "long", + "logicalType" : "timestamp-millis" + } ] + }, { + "name" : "hostname", + "type" : [ "null", "string" ] + }, { + "name" : "body", + "type" : [ "null", "string" ] + }, + "name" : "appName", + "type" : [ "null", "string" ] + }, { + "name" : "procid", + "type" : [ "null", "string" ] + }, { + "name" : "messageid", + "type" : [ "null", "string" ] + }, { + "name" : "structuredData", + "type" : [ "null", { + "type" : "map", + "values" : { + "type" : "map", + "values" : "string" + } + } ] + } ] + } + </pre></code> + </p> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java new file mode 100644 index 0000000..4a9c918 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.syslog; + +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.syslog.attributes.Syslog5424Attributes; +import org.apache.nifi.syslog.attributes.SyslogAttributes; +import org.apache.nifi.syslog.keyproviders.SimpleKeyProvider; +import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser; +import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy; +import org.apache.nifi.syslog.utils.NilHandlingPolicy; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TestSyslog5424RecordReader { + private static final Charset CHARSET = Charset.forName("UTF-8"); + private static final String expectedVersion = "1"; + private static final String expectedMessage = "Removing instance"; + private static final String expectedAppName = "d0602076-b14a-4c55-852a-981e7afeed38"; + private static final String expectedHostName = "loggregator"; + private static final String expectedPri = "14"; + private static final String expectedProcId = "DEA"; + private static final Timestamp expectedTimestamp = Timestamp.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse("2014-06-20T09:14:07+00:00"))); + private static final String expectedMessageId = "MSG-01"; + private static final String expectedFacility = "1"; + private static final String expectedSeverity = "6"; + + private static final String expectedIUT1 = "3"; + private static final String expectedIUT2 = "4"; + private static final String expectedEventSource1 = "Application"; + private static final String expectedEventSource2 = "Other Application"; + private static final String expectedEventID1 = "1011"; + private static final String expectedEventID2 = "2022"; + + @Test + @SuppressWarnings("unchecked") + public void testParseSingleLine() throws IOException, MalformedRecordException { + try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_all.txt"))) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + NilHandlingPolicy.NULL, + NifiStructuredDataPolicy.MAP_OF_MAPS, + new SimpleKeyProvider()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema()); + + final Record record = deserializer.nextRecord(); + assertNotNull(record.getValues()); + + Assert.assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key())); + Assert.assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key())); + Assert.assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key())); + Assert.assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key())); + Assert.assertEquals(expectedTimestamp, (Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(expectedMessageId, record.getAsString(Syslog5424Attributes.MESSAGEID.key())); + + Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key())); + Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()); + + Assert.assertTrue(structured.containsKey("exampleSDID@32473")); + Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473"); + + Assert.assertTrue(example1.containsKey("iut")); + Assert.assertTrue(example1.containsKey("eventSource")); + Assert.assertTrue(example1.containsKey("eventID")); + Assert.assertEquals(expectedIUT1, example1.get("iut").toString()); + Assert.assertEquals(expectedEventSource1, example1.get("eventSource").toString()); + Assert.assertEquals(expectedEventID1, example1.get("eventID").toString()); + + Assert.assertTrue(structured.containsKey("exampleSDID@32480")); + Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480"); + + Assert.assertTrue(example2.containsKey("iut")); + Assert.assertTrue(example2.containsKey("eventSource")); + Assert.assertTrue(example2.containsKey("eventID")); + Assert.assertEquals(expectedIUT2, example2.get("iut").toString()); + Assert.assertEquals(expectedEventSource2, example2.get("eventSource").toString()); + Assert.assertEquals(expectedEventID2, example2.get("eventID").toString()); + assertNull(deserializer.nextRecord()); + deserializer.close(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testParseSingleLineSomeNulls() throws IOException, MalformedRecordException { + try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log.txt"))) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + NilHandlingPolicy.NULL, + NifiStructuredDataPolicy.MAP_OF_MAPS, + new SimpleKeyProvider()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema()); + + final Record record = deserializer.nextRecord(); + assertNotNull(record.getValues()); + + Assert.assertEquals(expectedVersion, record.getAsString(SyslogAttributes.VERSION.key())); + Assert.assertEquals(expectedMessage, record.getAsString(SyslogAttributes.BODY.key())); + Assert.assertEquals(expectedAppName, record.getAsString(Syslog5424Attributes.APP_NAME.key())); + Assert.assertEquals(expectedHostName, record.getAsString(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(expectedPri, record.getAsString(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(expectedSeverity, record.getAsString(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(expectedFacility, record.getAsString(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(expectedProcId, record.getAsString(Syslog5424Attributes.PROCID.key())); + Assert.assertEquals(expectedTimestamp, (Timestamp)record.getValue(SyslogAttributes.TIMESTAMP.key())); + Assert.assertNull(record.getAsString(Syslog5424Attributes.MESSAGEID.key())); + + Assert.assertNotNull(record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key())); + Map<String,Object> structured = (Map<String,Object>)record.getValue(Syslog5424Attributes.STRUCTURED_BASE.key()); + + Assert.assertTrue(structured.containsKey("exampleSDID@32473")); + Map<String, Object> example1 = (Map<String, Object>) structured.get("exampleSDID@32473"); + + Assert.assertTrue(example1.containsKey("iut")); + Assert.assertTrue(example1.containsKey("eventSource")); + Assert.assertTrue(example1.containsKey("eventID")); + Assert.assertEquals(expectedIUT1, example1.get("iut").toString()); + Assert.assertEquals(expectedEventSource1, example1.get("eventSource").toString()); + Assert.assertEquals(expectedEventID1, example1.get("eventID").toString()); + + Assert.assertTrue(structured.containsKey("exampleSDID@32480")); + Map<String, Object> example2 = (Map<String, Object>) structured.get("exampleSDID@32480"); + + Assert.assertTrue(example2.containsKey("iut")); + Assert.assertTrue(example2.containsKey("eventSource")); + Assert.assertTrue(example2.containsKey("eventID")); + Assert.assertEquals(expectedIUT2, example2.get("iut").toString()); + Assert.assertEquals(expectedEventSource2, example2.get("eventSource").toString()); + Assert.assertEquals(expectedEventID2, example2.get("eventID").toString()); + assertNull(deserializer.nextRecord()); + deserializer.close(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testParseMultipleLine() throws IOException, MalformedRecordException { + try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix.txt"))) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + NilHandlingPolicy.NULL, + NifiStructuredDataPolicy.MAP_OF_MAPS, + new SimpleKeyProvider()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema()); + + Record record = deserializer.nextRecord(); + int count = 0; + while (record != null){ + assertNotNull(record.getValues()); + count++; + record = deserializer.nextRecord(); + } + Assert.assertEquals(count, 3); + deserializer.close(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testParseMultipleLineWithError() throws IOException, MalformedRecordException { + try (final InputStream fis = new FileInputStream(new File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) { + StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET, + NilHandlingPolicy.NULL, + NifiStructuredDataPolicy.MAP_OF_MAPS, + new SimpleKeyProvider()); + final Syslog5424RecordReader deserializer = new Syslog5424RecordReader(parser, fis, Syslog5424Reader.createRecordSchema()); + + Record record = deserializer.nextRecord(); + int count = 0; + int exceptionCount = 0; + while (record != null){ + assertNotNull(record.getValues()); + try { + record = deserializer.nextRecord(); + count++; + } catch (Exception e) { + exceptionCount++; + } + } + Assert.assertEquals(count, 3); + Assert.assertEquals(exceptionCount,1); + deserializer.close(); + } + } + + public void writeSchema() { + String s = Syslog5424Reader.createRecordSchema().toString(); + System.out.println(s); + System.out.println(AvroTypeUtil.extractAvroSchema( Syslog5424Reader.createRecordSchema() ).toString(true)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt new file mode 100644 index 0000000..8d25f7c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log.txt @@ -0,0 +1 @@ +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt new file mode 100644 index 0000000..e24e87e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_all.txt @@ -0,0 +1 @@ +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt new file mode 100644 index 0000000..067277e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix.txt @@ -0,0 +1,3 @@ +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance +<14>1 2014-06-20T09:14:07Z loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1022043/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt new file mode 100644 index 0000000..c4fa3a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/syslog/syslog5424/log_mix_in_error.txt @@ -0,0 +1,4 @@ +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - - Removing instance +POISONPILL 30303030 +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance +<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] [exampleSDID@32480 iut="4" eventSource="Other Application" eventID="2022"] Removing instance \ No newline at end of file