This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 9bd7b91514 NIFI-12163 This closes #7835. Improved Syslog 5424 Line
Handling
9bd7b91514 is described below
commit 9bd7b91514d066ee997b79fc70af367719c0a348
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Oct 3 15:26:09 2023 -0500
NIFI-12163 This closes #7835. Improved Syslog 5424 Line Handling
- Eliminated unused parseEvent method signatures from
StrictSyslog5424Parser in favor of a single String line method
- Eliminated intermediate conversion from String to byte array and back to
String for Syslog Parser
Signed-off-by: Joseph Witt <[email protected]>
---
.../syslog/parsers/StrictSyslog5424Parser.java | 83 +++----------------
.../syslog/BaseStrictSyslog5424ParserTest.java | 93 ++--------------------
.../nifi/processors/standard/ParseSyslog5424.java | 26 ++----
.../org/apache/nifi/syslog/Syslog5424Reader.java | 16 ++--
.../apache/nifi/syslog/Syslog5424RecordReader.java | 18 ++---
.../nifi/syslog/TestSyslog5424RecordReader.java | 44 +++++-----
6 files changed, 61 insertions(+), 219 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java
index ed36c9d34f..d2abe2d972 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/main/java/org/apache/nifi/syslog/parsers/StrictSyslog5424Parser.java
@@ -21,29 +21,18 @@ import com.github.palindromicity.syslog.NilPolicy;
import com.github.palindromicity.syslog.StructuredDataPolicy;
import com.github.palindromicity.syslog.SyslogParserBuilder;
import org.apache.nifi.syslog.events.Syslog5424Event;
-import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
/**
* Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance.
* For 5424 we use simple-syslog-5424 since it parsers out structured data.
*/
public class StrictSyslog5424Parser {
- private Charset charset;
- private com.github.palindromicity.syslog.SyslogParser parser;
-
- public StrictSyslog5424Parser() {
- this(StandardCharsets.UTF_8, NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
- }
+ private final com.github.palindromicity.syslog.SyslogParser parser;
- public StrictSyslog5424Parser(final Charset charset, final
NilHandlingPolicy nilPolicy,
- NifiStructuredDataPolicy
structuredDataPolicy, KeyProvider keyProvider) {
- this.charset = charset;
+ public StrictSyslog5424Parser(final NilHandlingPolicy nilPolicy,
+ final NifiStructuredDataPolicy
structuredDataPolicy, final KeyProvider keyProvider) {
parser = new SyslogParserBuilder()
.withNilPolicy(NilPolicy.valueOf(nilPolicy.name()))
.withStructuredDataPolicy(StructuredDataPolicy.valueOf(structuredDataPolicy.name()))
@@ -52,54 +41,17 @@ public class StrictSyslog5424Parser {
}
/**
- * Parses a Syslog5424Event from a {@code ByteBuffer}.
- *
- * @param buffer a {@code ByteBuffer} containing a syslog message
- * @return a Syslog5424Event parsed from the {@code {@code byte array}}
- */
- public Syslog5424Event parseEvent(final ByteBuffer buffer) {
- return parseEvent(buffer, null);
- }
-
- /**
- * Parses a Syslog5424Event from a {@code ByteBuffer}.
- *
- * @param buffer a {@code ByteBuffer} containing a syslog message
- * @param sender the hostname of the syslog server that sent the message
- * @return a Syslog5424Event parsed from the {@code byte array}
- */
- public Syslog5424Event parseEvent(final ByteBuffer buffer, final String
sender) {
- if (buffer == null) {
- return null;
- }
- return parseEvent(bufferToBytes(buffer), sender);
- }
-
- /**
- * Parses a Syslog5424Event from a {@code byte array}.
+ * Parses a Syslog5424Event from a String
*
- * @param bytes a {@code byte array} containing a syslog message
- * @param sender the hostname of the syslog server that sent the message
- * @return a Syslog5424Event parsed from the {@code byte array}
+ * @param line a {@code String} containing a syslog message
+ * @return a Syslog5424Event parsed from the input line
*/
- public Syslog5424Event parseEvent(final byte[] bytes, final String sender)
{
- if (bytes == null || bytes.length == 0) {
- return null;
- }
-
- // remove trailing new line before parsing
- int length = bytes.length;
- if (bytes[length - 1] == '\n') {
- length = length - 1;
- }
-
- final String message = new String(bytes, 0, length, charset);
-
+ public Syslog5424Event parseEvent(final String line) {
final Syslog5424Event.Builder builder = new Syslog5424Event.Builder()
-
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
+ .valid(false).fullMessage(line);
try {
- parser.parseLine(message, builder::fieldMap);
+ parser.parseLine(line, builder::fieldMap);
builder.valid(true);
} catch (Exception e) {
// this is not a valid 5424 message
@@ -110,21 +62,4 @@ public class StrictSyslog5424Parser {
// either invalid w/original msg, or fully parsed event
return builder.build();
}
-
- public String getCharsetName() {
- return charset == null ? StandardCharsets.UTF_8.name() :
charset.name();
- }
-
-
- private byte[] bufferToBytes(ByteBuffer buffer) {
- if (buffer == null) {
- return null;
- }
- if (buffer.position() != 0) {
- buffer.flip();
- }
- byte bytes[] = new byte[buffer.limit()];
- buffer.get(bytes, 0, buffer.limit());
- return bytes;
- }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java
index cb5feb49ef..81ca409448 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/src/test/java/org/apache/nifi/syslog/BaseStrictSyslog5424ParserTest.java
@@ -26,9 +26,6 @@ import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -42,27 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class BaseStrictSyslog5424ParserTest {
- private static final Charset CHARSET = Charset.forName("UTF-8");
- private static final String NIL_VALUE = "-";
private StrictSyslog5424Parser parser;
protected abstract NilHandlingPolicy getPolicy();
- protected void validateForPolicy(String expected, Object actual) {
- switch (getPolicy()) {
- case DASH:
- assertEquals(actual, NIL_VALUE);
- break;
- case OMIT:
- case NULL:
- assertNull(actual);
-
- }
- }
-
@BeforeEach
public void setup() {
- parser = new StrictSyslog5424Parser(CHARSET, getPolicy(),
NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
+ parser = new StrictSyslog5424Parser(getPolicy(),
NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
@Test
@@ -74,18 +57,12 @@ public abstract class BaseStrictSyslog5424ParserTest {
final String appName = "su";
final String procId = "-";
final String msgId = "ID17";
- final String structuredData = "-";
final String body = "'su root' failed for lonvick on /dev/pts/8";
final String message = "<" + pri + ">" + version + " " + stamp + " " +
host + " "
+ appName + " " + procId + " " + msgId + " " + "-" + " " +
body;
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
assertFalse(event.getFieldMap().isEmpty());
@@ -97,7 +74,6 @@ public abstract class BaseStrictSyslog5424ParserTest {
assertEquals(stamp,
fieldMap.get(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
assertEquals(host,
fieldMap.get(SyslogAttributes.SYSLOG_HOSTNAME.key()));
assertEquals(appName,
fieldMap.get(Syslog5424Attributes.SYSLOG_APP_NAME.key()));
- validateForPolicy(procId,
fieldMap.get(Syslog5424Attributes.SYSLOG_PROCID.key()));
assertEquals(msgId,
fieldMap.get(Syslog5424Attributes.SYSLOG_MESSAGEID.key()));
Pattern structuredPattern = new
SyslogPrefixedKeyProvider().getStructuredElementIdParamNamePattern();
@@ -121,18 +97,12 @@ public abstract class BaseStrictSyslog5424ParserTest {
final String appName = "su";
final String procId = "-";
final String msgId = "ID17";
- final String structuredData = "-";
final String body = "'su root' failed for lonvick on /dev/pts/8";
final String message = "<" + pri + ">" + version + " " + stamp + " " +
host + " "
+ appName + " " + procId + " " + msgId + " " + "-" + " " +
body;
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertFalse(event.isValid());
}
@@ -141,12 +111,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
final String message = "<34>1 2003-10-11T22:14:15.003Z
mymachine.example.com su - " +
"ID47 - 'su root' failed for lonvick on /dev/pts/8\n";
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
}
@@ -166,12 +131,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other
Application\" eventID=\"2022\"] Removing instance");
for (final String message : messages) {
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertTrue(event.isValid());
}
}
@@ -190,54 +150,22 @@ public abstract class BaseStrictSyslog5424ParserTest {
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\"
eventID=\"2022\"]");
for (final String message : messages) {
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertTrue(event.isValid());
assertNull(event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
}
-
-
}
@Test
public void testInvalidPriority() {
final String message = "10 Oct 13 14:14:43 localhost some body of the
message";
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertFalse(event.isValid());
assertEquals(message, event.getFullMessage());
}
- @Test
- public void testParseWithSender() {
- final String sender = "127.0.0.1";
- final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
- + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01"
- + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"]"
- + "[exampleSDID@32480 iut=\"4\" eventSource=\"Other
Application\" eventID=\"2022\"] Removing instance";
-
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer, sender);
- assertNotNull(event);
- assertTrue(event.isValid());
- assertEquals(sender, event.getSender());
- assertEquals("Removing instance",
event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
- }
-
@Test
public void testParseWithBOM() {
final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
@@ -245,12 +173,7 @@ public abstract class BaseStrictSyslog5424ParserTest {
+ " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"]"
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\"
eventID=\"2022\"] \uFEFFMessage with some Umlauts äöü";
- final byte[] bytes = message.getBytes(CHARSET);
- final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- ((Buffer)buffer).clear();
- buffer.put(bytes);
-
- final Syslog5424Event event = parser.parseEvent(buffer);
+ final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
assertEquals("Message with some Umlauts äöü",
event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java
index 4078402e12..0f22b8b994 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java
@@ -37,7 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
@@ -47,8 +46,6 @@ import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
-import java.io.IOException;
-import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
@@ -57,7 +54,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-
@EventDriven
@SideEffectFree
@SupportsBatching
@@ -130,6 +126,7 @@ public class ParseSyslog5424 extends AbstractProcessor {
private volatile StrictSyslog5424Parser parser;
+ private volatile Charset charset;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -150,11 +147,9 @@ public class ParseSyslog5424 extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- final String charsetName = context.getProperty(CHARSET).getValue();
+ charset = Charset.forName(context.getProperty(CHARSET).getValue());
final String nilPolicyString =
context.getProperty(NIL_POLICY).getValue();
- parser = new StrictSyslog5424Parser(Charset.forName(charsetName),
- NilHandlingPolicy.valueOf(nilPolicyString),
- NifiStructuredDataPolicy.FLATTEN,new
SyslogPrefixedKeyProvider());
+ parser = new
StrictSyslog5424Parser(NilHandlingPolicy.valueOf(nilPolicyString),
NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
@@ -170,24 +165,20 @@ public class ParseSyslog5424 extends AbstractProcessor {
}
final byte[] buffer = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, buffer);
- }
- });
+ session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+ final String line = new String(buffer, charset).trim();
final Syslog5424Event syslogEvent;
try {
- syslogEvent = parser.parseEvent(buffer, null);
+ syslogEvent = parser.parseEvent(line);
} catch (final ProcessException pe) {
- getLogger().error("Failed to parse {} as a Syslog 5424 message
due to {}; routing to failure", new Object[] {flowFile, pe});
+ getLogger().error("Failed to parse {} as a Syslog 5424 message;
routing to failure", flowFile, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}
if (syslogEvent == null || !syslogEvent.isValid()) {
- getLogger().error("Failed to parse {} as a Syslog message: it does
not conform to any of the RFC formats supported; routing to failure", new
Object[] {flowFile});
+ getLogger().error("Failed to parse {} as a Syslog message: it does
not conform to any of the RFC formats supported; routing to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -199,7 +190,6 @@ public class ParseSyslog5424 extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
}
-
private static Map<String,String> convertMap(Map<String, Object> map) {
Map<String,String> returnMap = new HashMap<>();
map.forEach((key,value) -> returnMap.put(key,(String)value));
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
index 0f988e9510..eab295b9cc 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
@@ -82,8 +82,9 @@ public class Syslog5424Reader extends SchemaRegistryService
implements RecordRea
.allowableValues("true", "false")
.build();
+ private volatile Charset charset;
private volatile StrictSyslog5424Parser parser;
- private volatile static boolean includeRaw;
+ private volatile boolean includeRaw;
private volatile RecordSchema recordSchema;
@Override
@@ -97,9 +98,9 @@ public class Syslog5424Reader extends SchemaRegistryService
implements RecordRea
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
- final String charsetName = context.getProperty(CHARSET).getValue();
+ charset = Charset.forName(context.getProperty(CHARSET).getValue());
includeRaw = context.getProperty(ADD_RAW).asBoolean();
- parser = new StrictSyslog5424Parser(Charset.forName(charsetName),
NilHandlingPolicy.NULL, NifiStructuredDataPolicy.MAP_OF_MAPS, new
SimpleKeyProvider());
+ parser = new StrictSyslog5424Parser(NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS, new SimpleKeyProvider());
recordSchema = createRecordSchema();
}
@@ -120,7 +121,7 @@ public class Syslog5424Reader extends SchemaRegistryService
implements RecordRea
return createAccessStrategy();
}
- static RecordSchema createRecordSchema() {
+ RecordSchema createRecordSchema() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField(SyslogAttributes.PRIORITY.key(),
RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(SyslogAttributes.SEVERITY.key(),
RecordFieldType.STRING.getDataType(), true));
@@ -135,13 +136,12 @@ public class Syslog5424Reader extends
SchemaRegistryService implements RecordRea
fields.add(new RecordField(Syslog5424Attributes.STRUCTURED_BASE.key(),
RecordFieldType.MAP.getMapDataType(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()))));
- if(includeRaw) {
+ if (includeRaw) {
fields.add(new RecordField(RAW_MESSAGE_NAME,
RecordFieldType.STRING.getDataType(), true));
}
SchemaIdentifier schemaIdentifier = new
StandardSchemaIdentifier.Builder().name(RFC_5424_SCHEMA_NAME).build();
- final RecordSchema schema = new
SimpleRecordSchema(fields,schemaIdentifier);
- return schema;
+ return new SimpleRecordSchema(fields,schemaIdentifier);
}
private SchemaAccessStrategy createAccessStrategy() {
@@ -164,6 +164,6 @@ public class Syslog5424Reader extends SchemaRegistryService
implements RecordRea
@Override
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new Syslog5424RecordReader(parser, includeRaw, in, schema);
+ return new Syslog5424RecordReader(parser, includeRaw, charset, in,
schema);
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
index be48f65904..28bc1d83ed 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
@@ -26,14 +26,12 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
import org.apache.nifi.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
@@ -41,15 +39,13 @@ import java.util.HashMap;
import java.util.Map;
public class Syslog5424RecordReader implements RecordReader {
- private static final Logger logger =
LoggerFactory.getLogger(Syslog5424RecordReader.class);
-
private final BufferedReader reader;
- private RecordSchema schema;
+ private final RecordSchema schema;
private final StrictSyslog5424Parser parser;
private final boolean includeRaw;
- public Syslog5424RecordReader(StrictSyslog5424Parser parser, boolean
includeRaw, InputStream in, RecordSchema schema){
- this.reader = new BufferedReader(new InputStreamReader(in));
+ public Syslog5424RecordReader(final StrictSyslog5424Parser parser, final
boolean includeRaw, final Charset charset, final InputStream in, final
RecordSchema schema) {
+ this.reader = new BufferedReader(new InputStreamReader(in, charset));
this.schema = schema;
this.parser = parser;
this.includeRaw = includeRaw;
@@ -67,15 +63,13 @@ public class Syslog5424RecordReader implements RecordReader
{
}
if (StringUtils.isBlank(line)) {
- logger.debug("Encountered empty line, will skip");
continue;
}
break;
}
-
- Syslog5424Event event =
parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
+ final Syslog5424Event event = parser.parseEvent(line);
if (!event.isValid()) {
if (event.getException() != null) {
@@ -92,7 +86,7 @@ public class Syslog5424RecordReader implements RecordReader {
final Map<String,Object> modifiedMap = new
HashMap<>(event.getFieldMap());
modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
- if(includeRaw) {
+ if (includeRaw) {
modifiedMap.put(Syslog5424Reader.RAW_MESSAGE_NAME, line);
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
index 2454f410a1..b9703c66a5 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslog5424RecordReader.java
@@ -17,7 +17,6 @@
package org.apache.nifi.syslog;
-import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.syslog.attributes.Syslog5424Attributes;
@@ -28,11 +27,11 @@ import
org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;
import org.junit.jupiter.api.Test;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
@@ -44,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSyslog5424RecordReader {
- private static final Charset CHARSET = Charset.forName("UTF-8");
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final String expectedVersion = "1";
private static final String expectedMessage = "Removing instance";
private static final String expectedAppName =
"d0602076-b14a-4c55-852a-981e7afeed38";
@@ -70,12 +69,14 @@ public class TestSyslog5424RecordReader {
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLine() throws IOException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/syslog/syslog5424/log_all.txt"))) {
- StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+ final Syslog5424Reader reader = new Syslog5424Reader();
+
+ try (final InputStream fis = new
FileInputStream("src/test/resources/syslog/syslog5424/log_all.txt")) {
+ StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
- final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, true, fis,
Syslog5424Reader.createRecordSchema());
+ final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, true, CHARSET, fis, reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
@@ -122,12 +123,14 @@ public class TestSyslog5424RecordReader {
@Test
@SuppressWarnings("unchecked")
public void testParseSingleLineSomeNulls() throws IOException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/syslog/syslog5424/log.txt"))) {
- StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+ final Syslog5424Reader reader = new Syslog5424Reader();
+
+ try (final InputStream fis = new
FileInputStream("src/test/resources/syslog/syslog5424/log.txt")) {
+ StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
- final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, fis,
Syslog5424Reader.createRecordSchema());
+ final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, CHARSET, fis,
reader.createRecordSchema());
final Record record = deserializer.nextRecord();
assertNotNull(record.getValues());
@@ -173,12 +176,14 @@ public class TestSyslog5424RecordReader {
@Test
public void testParseMultipleLine() throws IOException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/syslog/syslog5424/log_mix.txt"))) {
- StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+ final Syslog5424Reader reader = new Syslog5424Reader();
+
+ try (final InputStream fis = new
FileInputStream("src/test/resources/syslog/syslog5424/log_mix.txt")) {
+ StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
- final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, fis,
Syslog5424Reader.createRecordSchema());
+ final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, CHARSET, fis,
reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
@@ -194,12 +199,14 @@ public class TestSyslog5424RecordReader {
@Test
public void testParseMultipleLineWithError() throws IOException,
MalformedRecordException {
- try (final InputStream fis = new FileInputStream(new
File("src/test/resources/syslog/syslog5424/log_mix_in_error.txt"))) {
- StrictSyslog5424Parser parser = new StrictSyslog5424Parser(CHARSET,
+ final Syslog5424Reader reader = new Syslog5424Reader();
+
+ try (final InputStream fis = new
FileInputStream("src/test/resources/syslog/syslog5424/log_mix_in_error.txt")) {
+ StrictSyslog5424Parser parser = new StrictSyslog5424Parser(
NilHandlingPolicy.NULL,
NifiStructuredDataPolicy.MAP_OF_MAPS,
new SimpleKeyProvider());
- final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, fis,
Syslog5424Reader.createRecordSchema());
+ final Syslog5424RecordReader deserializer = new
Syslog5424RecordReader(parser, false, CHARSET, fis,
reader.createRecordSchema());
Record record = deserializer.nextRecord();
int count = 0;
@@ -218,11 +225,4 @@ public class TestSyslog5424RecordReader {
deserializer.close();
}
}
-
- public void writeSchema() {
- String s = Syslog5424Reader.createRecordSchema().toString();
- System.out.println(s);
- System.out.println(AvroTypeUtil.extractAvroSchema(
Syslog5424Reader.createRecordSchema() ).toString(true));
- }
-
}