This is an automated email from the ASF dual-hosted git repository.
ppkarwasz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/logging-flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4cd0f9e8 Fix RFC 5424 timestamp parsing edge cases (#441)
d4cd0f9e8 is described below
commit d4cd0f9e8c0929268136a47bb4497ca3514baaec
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Fri Jun 5 20:03:14 2026 +0200
Fix RFC 5424 timestamp parsing edge cases (#441)
`parseRfc5424Date` mishandled malformed and extreme inputs:
- Fractional seconds were scaled with `Math.pow`, the lossy double/long
conversion CodeQL flagged, and `Long.parseLong` over the whole fraction
overflowed on long inputs. Read at most 3 digits and zero-pad with
`StringUtils.rightPad` instead; finer-than-millisecond digits are
truncated anyway.
- A fraction with no trailing timezone scanned past the end of the string
and threw `StringIndexOutOfBoundsException`; the loop is now bounded and
reports an `IllegalArgumentException`.
- A timezone not starting with `Z`, `+` or `-` was silently accepted; it
is now rejected.
- The offset is computed with `long` arithmetic to avoid int overflow.
Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
---
.../java/org/apache/flume/source/SyslogParser.java | 542 ++++++++++-----------
.../org/apache/flume/source/TestSyslogParser.java | 297 +++++------
2 files changed, 419 insertions(+), 420 deletions(-)
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index 6019573c9..e1d680a9c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,18 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-
package org.apache.flume.source;
import com.google.common.base.Preconditions;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.time.Year;
@@ -41,8 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
-
-import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
@@ -54,303 +48,287 @@ import org.slf4j.LoggerFactory;
@InterfaceStability.Evolving
public class SyslogParser {
- private static final Logger logger =
- LoggerFactory.getLogger(SyslogParser.class);
-
- private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit
- private static final Pattern TWO_SPACES = Pattern.compile(" ");
- private static final DateTimeFormatter RFC3164_FORMAT =
- new DateTimeFormatterBuilder()
- .appendPattern("MMM d HH:mm:ss")
- .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) //
Adds current year
- .toFormatter(Locale.ENGLISH); // Always use English month
names
- private static final DateTimeFormatter RFC5424_FORMAT =
- new DateTimeFormatterBuilder()
- .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses
YYYY-MM-DDTHH:mm:ss.SSSSSS
- .optionalStart() // Start of
optional timezone section
- .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
- .optionalEnd() // End of
optional timezone section
- .toFormatter();
-
- private static final int RFC3164_LEN = 15;
- private static final int RFC5424_PREFIX_LEN = 19;
-
- private LoadingCache<String, Long> timestampCache;
-
- public SyslogParser() {
- timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(
- new CacheLoader<String, Long>() {
-
- @Override
- public Long load(String key) {
- return ZonedDateTime.parse(key,
RFC5424_FORMAT.withZone(ZoneOffset.UTC)).toInstant().toEpochMilli();
- }
+ private static final Logger logger =
LoggerFactory.getLogger(SyslogParser.class);
+
+ private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit
+ private static final Pattern TWO_SPACES = Pattern.compile(" ");
+ private static final DateTimeFormatter RFC3164_FORMAT = new
DateTimeFormatterBuilder()
+ .appendPattern("MMM d HH:mm:ss")
+ .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // Adds
current year
+ .toFormatter(Locale.ENGLISH); // Always use English month names
+ private static final DateTimeFormatter RFC5424_FORMAT = new
DateTimeFormatterBuilder()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses
YYYY-MM-DDTHH:mm:ss.SSSSSS
+ .optionalStart() // Start of optional timezone section
+ .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
+ .optionalEnd() // End of optional timezone section
+ .toFormatter();
+
+ private static final int RFC3164_LEN = 15;
+ private static final int RFC5424_PREFIX_LEN = 19;
+ // Fractional seconds are kept at millisecond precision (3 digits); any
extra digits are discarded.
+ private static final int MILLISECOND_FRACTION_DIGITS = 3;
+
+ private LoadingCache<String, Long> timestampCache;
+
+ public SyslogParser() {
+ timestampCache =
CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(new
CacheLoader<String, Long>() {
+
+ @Override
+ public Long load(String key) {
+ return ZonedDateTime.parse(key,
RFC5424_FORMAT.withZone(ZoneOffset.UTC))
+ .toInstant()
+ .toEpochMilli();
+ }
});
- }
+ }
/**
- * Parses a Flume Event out of a syslog message string.
- * @param msg Syslog message, not including the newline character
- * @return Parsed Flume Event
- * @throws IllegalArgumentException if unable to successfully parse message
- */
- public Event parseMessage(String msg, Charset charset, Set<String>
keepFields) {
- Map<String, String> headers = Maps.newHashMap();
-
- int msgLen = msg.length();
-
- int curPos = 0;
-
- Preconditions.checkArgument(msg.charAt(curPos) == '<',
- "Bad format: invalid priority: cannot find open bracket '<' (%s)",
msg);
+ * Parses a Flume Event out of a syslog message string.
+ * @param msg Syslog message, not including the newline character
+ * @return Parsed Flume Event
+ * @throws IllegalArgumentException if unable to successfully parse message
+ */
+ public Event parseMessage(String msg, Charset charset, Set<String>
keepFields) {
+ Map<String, String> headers = Maps.newHashMap();
- int endBracketPos = msg.indexOf('>');
- Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6,
- "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg);
+ int msgLen = msg.length();
- String priority = msg.substring(1, endBracketPos);
- int pri = Integer.parseInt(priority);
- int facility = pri / 8;
- int severity = pri % 8;
+ int curPos = 0;
- // Remember priority
- headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
+ Preconditions.checkArgument(
+ msg.charAt(curPos) == '<', "Bad format: invalid priority:
cannot find open bracket '<' (%s)", msg);
- // put fac / sev into header
- headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
- headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
+ int endBracketPos = msg.indexOf('>');
+ Preconditions.checkArgument(
+ endBracketPos > 0 && endBracketPos <= 6,
+ "Bad format: invalid priority: cannot find end bracket '>'
(%s)",
+ msg);
- Preconditions.checkArgument(msgLen > endBracketPos + 1,
- "Bad format: no data except priority (%s)", msg);
+ String priority = msg.substring(1, endBracketPos);
+ int pri = Integer.parseInt(priority);
+ int facility = pri / 8;
+ int severity = pri % 8;
- // update parsing position
- curPos = endBracketPos + 1;
+ // Remember priority
+ headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
- // remember version string
- String version = null;
- if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2)))
{
- version = msg.substring(curPos, curPos + 1);
- headers.put(SyslogUtils.SYSLOG_VERSION, version);
- curPos += 2;
- }
+ // put fac / sev into header
+ headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
+ headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
- // now parse timestamp (handle different varieties)
+ Preconditions.checkArgument(msgLen > endBracketPos + 1, "Bad format:
no data except priority (%s)", msg);
- long ts;
- String tsString;
- char dateStartChar = msg.charAt(curPos);
+ // update parsing position
+ curPos = endBracketPos + 1;
- try {
-
- // no timestamp specified; use relay current time
- if (dateStartChar == '-') {
- tsString = Character.toString(dateStartChar);
- ts = System.currentTimeMillis();
- if (msgLen <= curPos + 2) {
- throw new IllegalArgumentException(
- "bad syslog format (missing hostname)");
+ // remember version string
+ String version = null;
+ if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos +
2))) {
+ version = msg.substring(curPos, curPos + 1);
+ headers.put(SyslogUtils.SYSLOG_VERSION, version);
+ curPos += 2;
}
- curPos += 2; // assume we skip past a space to get to the hostname
- // rfc3164 timestamp
- } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
- if (msgLen <= curPos + RFC3164_LEN) {
- throw new IllegalArgumentException("bad timestamp format");
+ // now parse timestamp (handle different varieties)
+
+ long ts;
+ String tsString;
+ char dateStartChar = msg.charAt(curPos);
+
+ try {
+
+ // no timestamp specified; use relay current time
+ if (dateStartChar == '-') {
+ tsString = Character.toString(dateStartChar);
+ ts = System.currentTimeMillis();
+ if (msgLen <= curPos + 2) {
+ throw new IllegalArgumentException("bad syslog format
(missing hostname)");
+ }
+ curPos += 2; // assume we skip past a space to get to the
hostname
+
+ // rfc3164 timestamp
+ } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
+ if (msgLen <= curPos + RFC3164_LEN) {
+ throw new IllegalArgumentException("bad timestamp format");
+ }
+ tsString = msg.substring(curPos, curPos + RFC3164_LEN);
+ ts = parseRfc3164Time(tsString);
+ curPos += RFC3164_LEN + 1;
+
+ // rfc 5424 timestamp
+ } else {
+ int nextSpace = msg.indexOf(' ', curPos);
+ if (nextSpace == -1) {
+ throw new IllegalArgumentException("bad timestamp format");
+ }
+ tsString = msg.substring(curPos, nextSpace);
+ ts = parseRfc5424Date(tsString);
+ curPos = nextSpace + 1;
+ }
+
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException("Unable to parse message: " +
msg, ex);
}
- tsString = msg.substring(curPos, curPos + RFC3164_LEN);
- ts = parseRfc3164Time(tsString);
- curPos += RFC3164_LEN + 1;
- // rfc 5424 timestamp
- } else {
+ headers.put("timestamp", String.valueOf(ts));
+
+ // parse out hostname
int nextSpace = msg.indexOf(' ', curPos);
if (nextSpace == -1) {
- throw new IllegalArgumentException("bad timestamp format");
+ throw new IllegalArgumentException("bad syslog format (missing
hostname)");
+ }
+ // copy the host string to avoid holding the message string in memory
+ // if using a memory-based queue
+ String hostname = new String(msg.substring(curPos, nextSpace));
+ headers.put("host", hostname);
+
+ // EventBuilder will do a copy of its own, so no defensive copy of the
body
+ String data = "";
+ if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
+ curPos = nextSpace + 1;
+ data = msg.substring(curPos);
+ data = SyslogUtils.addFieldsToBody(keepFields, data, priority,
version, tsString, hostname);
+ } else {
+ data = msg;
}
- tsString = msg.substring(curPos, nextSpace);
- ts = parseRfc5424Date(tsString);
- curPos = nextSpace + 1;
- }
-
- } catch (IllegalArgumentException ex) {
- throw new IllegalArgumentException("Unable to parse message: " + msg,
ex);
- }
-
- headers.put("timestamp", String.valueOf(ts));
- // parse out hostname
- int nextSpace = msg.indexOf(' ', curPos);
- if (nextSpace == -1) {
- throw new IllegalArgumentException(
- "bad syslog format (missing hostname)");
- }
- // copy the host string to avoid holding the message string in memory
- // if using a memory-based queue
- String hostname = new String(msg.substring(curPos, nextSpace));
- headers.put("host", hostname);
-
- // EventBuilder will do a copy of its own, so no defensive copy of the body
- String data = "";
- if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
- curPos = nextSpace + 1;
- data = msg.substring(curPos);
- data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version,
tsString, hostname);
- } else {
- data = msg;
- }
+ Event event = EventBuilder.withBody(data, charset, headers);
- Event event = EventBuilder.withBody(data, charset, headers);
-
- return event;
- }
-
- /**
- * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
- * multiple messages that occur in the same second.
- * @param msg
- * @return Typical (for Java) milliseconds since UNIX epoch
- */
- protected long parseRfc5424Date(String msg) {
- Long ts;
- int curPos = 0;
-
- int msgLen = msg.length();
- Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN,
- "Bad format: Not a valid RFC5424 timestamp: %s", msg);
- String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
-
- try {
- ts = timestampCache.get(timestampPrefix);
- } catch (ExecutionException | UncheckedExecutionException ex) {
- throw new IllegalArgumentException("bad timestamp format",
ex.getCause());
+ return event;
}
- curPos += RFC5424_PREFIX_LEN;
-
- // look for the optional fractional seconds
- if (msg.charAt(curPos) == '.') {
- // figure out how many numeric digits
- boolean foundEnd = false;
- int endMillisPos = curPos + 1;
-
- if (msgLen <= endMillisPos) {
- throw new IllegalArgumentException("bad timestamp format (no TZ)");
- }
-
- // FIXME: TODO: ensure we handle all bad formatting cases
- while (!foundEnd) {
- char curDigit = msg.charAt(endMillisPos);
- if (curDigit >= '0' && curDigit <= '9') {
- endMillisPos++;
- } else {
- foundEnd = true;
- }
- }
-
- // if they had a valid fractional second, append it rounded to millis
- final int fractionalPositions = endMillisPos - (curPos + 1);
- if (fractionalPositions > 0) {
- long milliseconds = Long.parseLong(msg.substring(curPos + 1,
endMillisPos));
- if (fractionalPositions > 3) {
- milliseconds /= Math.pow(10, (fractionalPositions - 3));
- } else if (fractionalPositions < 3) {
- milliseconds *= Math.pow(10, (3 - fractionalPositions));
+ /**
+ * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
+ * multiple messages that occur in the same second.
+ * @param msg
+ * @return Typical (for Java) milliseconds since UNIX epoch
+ */
+ protected long parseRfc5424Date(String msg) {
+ Long ts;
+ int curPos = 0;
+
+ int msgLen = msg.length();
+ Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, "Bad format:
Not a valid RFC5424 timestamp: %s", msg);
+ String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
+
+ try {
+ ts = timestampCache.get(timestampPrefix);
+ } catch (ExecutionException | UncheckedExecutionException ex) {
+ throw new IllegalArgumentException("bad timestamp format",
ex.getCause());
}
- ts += milliseconds;
- } else {
- throw new IllegalArgumentException(
- "Bad format: Invalid timestamp (fractional portion): " + msg);
- }
-
- curPos = endMillisPos;
- }
- // look for timezone
- char tzFirst = msg.charAt(curPos);
-
- // UTC
- if (tzFirst == 'Z') {
- // no-op
- } else if (tzFirst == '+' || tzFirst == '-') {
-
- Preconditions.checkArgument(msgLen > curPos + 5,
- "Bad format: Invalid timezone (%s)", msg);
-
- int polarity;
- if (tzFirst == '+') {
- polarity = +1;
- } else {
- polarity = -1;
- }
-
- char[] h = new char[5];
- for (int i = 0; i < 5; i++) {
- h[i] = msg.charAt(curPos + 1 + i);
- }
-
- if (h[0] >= '0' && h[0] <= '9'
- && h[1] >= '0' && h[1] <= '9'
- && h[2] == ':'
- && h[3] >= '0' && h[3] <= '9'
- && h[4] >= '0' && h[4] <= '9') {
- int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos +
3));
- int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos +
6));
- ts -= polarity * ((hourOffset * 60) + minOffset) * 60000;
- } else {
- throw new IllegalArgumentException(
- "Bad format: Invalid timezone: " + msg);
- }
+ curPos += RFC5424_PREFIX_LEN;
+
+ // look for the optional fractional seconds
+ if (msg.charAt(curPos) == '.') {
+ curPos++;
+ // figure out how many numeric digits
+ int endMillisPos = curPos;
+ while (endMillisPos < msgLen && msg.charAt(endMillisPos) >= '0' &&
msg.charAt(endMillisPos) <= '9') {
+ endMillisPos++;
+ }
+
+ // the fractional seconds must be followed by a timezone
+ if (endMillisPos >= msgLen) {
+ throw new IllegalArgumentException("bad timestamp format (no
TZ)");
+ }
+
+ // if they had a valid fractional second, truncate it to
millisecond precision and append
+ int fractionalPositions = Math.min(endMillisPos - curPos,
MILLISECOND_FRACTION_DIGITS);
+ if (fractionalPositions > 0) {
+ String fraction = msg.substring(curPos, curPos +
fractionalPositions);
+ ts += Integer.parseInt(StringUtils.rightPad(fraction,
MILLISECOND_FRACTION_DIGITS, '0'));
+ } else {
+ throw new IllegalArgumentException("Bad format: Invalid
timestamp (fractional portion): " + msg);
+ }
+
+ curPos = endMillisPos;
+ }
- }
+ // look for timezone
+ char tzFirst = msg.charAt(curPos);
+
+ // UTC
+ if (tzFirst == 'Z') {
+ return ts;
+ // numeric timezone
+ } else if (tzFirst == '+' || tzFirst == '-') {
+
+ Preconditions.checkArgument(msgLen > curPos + 5, "Bad format:
Invalid timezone (%s)", msg);
+
+ int polarity;
+ if (tzFirst == '+') {
+ polarity = +1;
+ } else {
+ polarity = -1;
+ }
+
+ char[] h = new char[5];
+ for (int i = 0; i < 5; i++) {
+ h[i] = msg.charAt(curPos + 1 + i);
+ }
+
+ if (h[0] >= '0'
+ && h[0] <= '9'
+ && h[1] >= '0'
+ && h[1] <= '9'
+ && h[2] == ':'
+ && h[3] >= '0'
+ && h[3] <= '9'
+ && h[4] >= '0'
+ && h[4] <= '9') {
+ int hourOffset = Integer.parseInt(msg.substring(curPos + 1,
curPos + 3));
+ int minOffset = Integer.parseInt(msg.substring(curPos + 4,
curPos + 6));
+ return ts - polarity * ((hourOffset * 60L) + minOffset) *
60000;
+ }
+ }
- return ts;
- }
-
- /**
- * Parse the RFC3164 date format. This is trickier than it sounds because
this
- * format does not specify a year so we get weird edge cases at year
- * boundaries. This implementation tries to "do what I mean".
- * @param ts RFC3164-compatible timestamp to be parsed
- * @return Typical (for Java) milliseconds since the UNIX epoch
- */
- protected long parseRfc3164Time(String ts) {
- LocalDateTime now = LocalDateTime.now();
- int year = now.getYear();
-
- ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
-
- LocalDateTime date;
- try {
- date = LocalDateTime.parse(ts, RFC3164_FORMAT);
- } catch (DateTimeParseException e) {
- logger.debug("rfc3164 date parse failed on (" + ts + "): invalid
format", e);
- return 0;
+ throw new IllegalArgumentException("Bad format: Invalid timezone: " +
msg);
}
- // rfc3164 dates are really dumb.
- /*
- * Some code to try and add some smarts to the year insertion as without a
year in the message
- * we need to make some educated guessing.
- * First set the "fixed" to be the timestamp with the current year.
- * If the "fixed" time is more than one month in the future then roll it
back a year.
- * If the "fixed" time is more than eleven months in the past then roll it
forward a year.
- * This gives us a 12 month rolling window (11 months in the past, 1 month
in the future) of
- * timestamps.
+ /**
+ * Parse the RFC3164 date format. This is trickier than it sounds because
this
+ * format does not specify a year so we get weird edge cases at year
+ * boundaries. This implementation tries to "do what I mean".
+ * @param ts RFC3164-compatible timestamp to be parsed
+ * @return Typical (for Java) milliseconds since the UNIX epoch
*/
+ protected long parseRfc3164Time(String ts) {
+ LocalDateTime now = LocalDateTime.now();
+ int year = now.getYear();
+
+ ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
+
+ LocalDateTime date;
+ try {
+ date = LocalDateTime.parse(ts, RFC3164_FORMAT);
+ } catch (DateTimeParseException e) {
+ logger.debug("rfc3164 date parse failed on (" + ts + "): invalid
format", e);
+ return 0;
+ }
- LocalDateTime fixed = date.withYear(year);
+ // rfc3164 dates are really dumb.
+ /*
+ * Some code to try and add some smarts to the year insertion as
without a year in the message
+ * we need to make some educated guessing.
+ * First set the "fixed" to be the timestamp with the current year.
+ * If the "fixed" time is more than one month in the future then roll
it back a year.
+ * If the "fixed" time is more than eleven months in the past then
roll it forward a year.
+ * This gives us a 12 month rolling window (11 months in the past, 1
month in the future) of
+ * timestamps.
+ */
+
+ LocalDateTime fixed = date.withYear(year);
+
+ // flume clock is ahead or there is some latency, and the year rolled
+ if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
+ fixed = date.minusYears(1);
+ // flume clock is behind and the year rolled
+ } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
+ fixed = date.plusYears(1);
+ }
+ date = fixed;
- // flume clock is ahead or there is some latency, and the year rolled
- if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
- fixed = date.minusYears(1);
- // flume clock is behind and the year rolled
- } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
- fixed = date.plusYears(1);
+ return date.toInstant(ZoneOffset.UTC).toEpochMilli();
}
- date = fixed;
-
- return date.toInstant(ZoneOffset.UTC).toEpochMilli();
- }
-
-
}
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
index 81f078490..07200383a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
@@ -1,30 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flume.source;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-import org.apache.flume.Event;
-
-import org.junit.Assert;
-import org.junit.Test;
-
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDate;
@@ -33,6 +26,9 @@ import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
public class TestSyslogParser {
private static class Entry {
@@ -45,162 +41,187 @@ public class TestSyslogParser {
this.rfc5424 = rfc5424;
this.rfc3164 = rfc3164;
this.millis = millis;
- this.localDateTime = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(millis), ZoneOffset.UTC);
+ this.localDateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC);
}
public String getRfc5424() {
return rfc5424;
}
+
public String getRfc3164() {
return rfc3164;
}
+
public LocalDateTime getLocalDateTime() {
return localDateTime;
}
+
public long getMillis() {
return millis;
}
}
private static final Entry[] VALID_TIMESTAMPS = {
- new Entry("1985-04-12T23:20:50.52Z", "Apr 12 23:20:50",
482196050520L),
- new Entry("1985-04-12T19:20:50.52-04:00", "Apr 12 23:20:50",
482196050520L),
- new Entry("2003-10-11T22:14:15.003Z", "Oct 11 22:14:15",
1065910455003L),
- new Entry("2003-08-24T05:14:15.000003-07:00", "Aug 24 12:14:15",
1061727255000L),
- new Entry("2012-04-13T08:08:08.0001+00:00", "Apr 13 08:08:08",
1334304488000L),
- new Entry("2012-04-13T08:08:08.251+00:00", "Apr 13 08:08:08",
1334304488251L),
- // The same instant with 0, 3, 6 and 9 fractional-second digits:
- // anything finer than a millisecond is truncated.
- new Entry("2003-10-11T22:14:15Z", "Oct 11 22:14:15",
1065910455000L),
- new Entry("2003-10-11T22:14:15.123Z", "Oct 11 22:14:15",
1065910455123L),
- new Entry("2003-10-11T22:14:15.123456Z", "Oct 11 22:14:15",
1065910455123L),
- new Entry("2003-10-11T22:14:15.123456789Z", "Oct 11 22:14:15",
1065910455123L)
+ new Entry("1985-04-12T23:20:50.52Z", "Apr 12 23:20:50", 482196050520L),
+ new Entry("1985-04-12T19:20:50.52-04:00", "Apr 12 23:20:50",
482196050520L),
+ new Entry("2003-10-11T22:14:15.003Z", "Oct 11 22:14:15",
1065910455003L),
+ new Entry("2003-08-24T05:14:15.000003-07:00", "Aug 24 12:14:15",
1061727255000L),
+ new Entry("2012-04-13T08:08:08.0001+00:00", "Apr 13 08:08:08",
1334304488000L),
+ new Entry("2012-04-13T08:08:08.251+00:00", "Apr 13 08:08:08",
1334304488251L),
+ new Entry("2012-04-13T09:08:08.251+01:00", "Apr 13 08:08:08",
1334304488251L),
+ new Entry("2012-04-13T07:08:08.251-01:00", "Apr 13 08:08:08",
1334304488251L),
+ // The same instant with 0, 3, 6 and 9 fractional-second digits:
+ // anything finer than a millisecond is truncated.
+ new Entry("2003-10-11T22:14:15Z", "Oct 11 22:14:15", 1065910455000L),
+ new Entry("2003-10-11T22:14:15.123Z", "Oct 11 22:14:15",
1065910455123L),
+ new Entry("2003-10-11T22:14:15.123456Z", "Oct 11 22:14:15",
1065910455123L),
+ new Entry("2003-10-11T22:14:15.123456789Z", "Oct 11 22:14:15",
1065910455123L),
+ // Fractional digits that would overflow a `long`
+ new
Entry("2003-10-11T22:14:15.1234567890123456789012345678901234567890Z", "Oct 11
22:14:15", 1065910455123L)
};
private static final String[] INVALID_RFC3164_TIMESTAMPS = {
- "", // empty
- "not a date", // not a timestamp at all
- "Foo 12 23:20:50", // invalid month name
- "Apr 12 25:61:61", // hour/minute/second out of range
+ "", // empty
+ "not a date", // not a timestamp at all
+ "Foo 12 23:20:50", // invalid month name
+ "Apr 12 25:61:61", // hour/minute/second out of range
};
private static final String[] INVALID_RFC5424_TIMESTAMPS = {
- "", // empty
- "2003-10-11T22:14", // too short to hold a full timestamp
- "2003-10-11T22:14:15.", // fractional-second marker but no
digits or TZ
- "2003-10-11T22:14:15+0", // truncated timezone offset
- "xxxx-10-11T22:14:15Z", // prefix is not a valid date-time
+ "", // empty
+ "2003-10-11T22:14", // too short to hold a full timestamp
+ "2003-10-11T22:14:15.", // fractional-second marker but no digits nor
TZ
+ "2003-10-11T22:14:15.123456789", // fractional-second marker but no TZ
+ "2003-10-11T22:14:15.Z", // fractional-second marker but no digits
+ "2003-10-11T22:14:15+0", // truncated timezone offset
+ "xxxx-10-11T22:14:15Z", // prefix is not a valid date-time
+ "2003-10-11T22:14:15+/2:34", // timezone with invalid digits
+ "2003-10-11T22:14:15+@2:34",
+ "2003-10-11T22:14:15+1/:34",
+ "2003-10-11T22:14:15+1@:34",
+ "2003-10-11T22:14:15+12/34",
+ "2003-10-11T22:14:15+12@34",
+ "2003-10-11T22:14:15+12:/4",
+ "2003-10-11T22:14:15+12:@4",
+ "2003-10-11T22:14:15+12:3/",
+ "2003-10-11T22:14:15+12:3@",
+ "2003-10-11T22:14:15A", // invalid timezone letter
};
- @Test
- public void testParseRfc3164Time() {
- SyslogParser parser = new SyslogParser();
- LocalDate now = LocalDate.now();
-
- for (Entry entry : VALID_TIMESTAMPS) {
- long time = parser.parseRfc3164Time(entry.getRfc3164());
-
- // RFC 3164 timestamps have no year, therefore we only assert the
equality
- LocalDateTime expected = entry.getLocalDateTime();
- LocalDateTime actual = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(time), ZoneOffset.UTC);
-
- String msg = entry.getRfc3164() + " was not parsed correctly (got " +
actual + ")";
- Assert.assertEquals(msg, expected.getMonth(), actual.getMonth());
- Assert.assertEquals(msg, expected.getDayOfMonth(),
actual.getDayOfMonth());
- Assert.assertEquals(msg, expected.getHour(), actual.getHour());
- Assert.assertEquals(msg, expected.getMinute(), actual.getMinute());
- Assert.assertEquals(msg, expected.getSecond(), actual.getSecond());
- Assert.assertTrue(msg, Math.abs(now.getYear() - actual.getYear()) <= 1);
+ @Test
+ public void testParseRfc3164Time() {
+ SyslogParser parser = new SyslogParser();
+ LocalDate now = LocalDate.now();
+
+ for (Entry entry : VALID_TIMESTAMPS) {
+ long time = parser.parseRfc3164Time(entry.getRfc3164());
+
+ // RFC 3164 timestamps have no year, therefore we only assert the
equality
+ LocalDateTime expected = entry.getLocalDateTime();
+ LocalDateTime actual =
LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneOffset.UTC);
+
+ String msg = entry.getRfc3164() + " was not parsed correctly (got
" + actual + ")";
+ Assert.assertEquals(msg, expected.getMonth(), actual.getMonth());
+ Assert.assertEquals(msg, expected.getDayOfMonth(),
actual.getDayOfMonth());
+ Assert.assertEquals(msg, expected.getHour(), actual.getHour());
+ Assert.assertEquals(msg, expected.getMinute(), actual.getMinute());
+ Assert.assertEquals(msg, expected.getSecond(), actual.getSecond());
+ Assert.assertTrue(msg, Math.abs(now.getYear() - actual.getYear())
<= 1);
+ }
}
- }
- @Test
- public void testParseRfc3164TimeInvalid() {
- SyslogParser parser = new SyslogParser();
+ @Test
+ public void testParseRfc3164TimeInvalid() {
+ SyslogParser parser = new SyslogParser();
- // parseRfc3164Time() signals a parse failure by returning 0 rather than
throwing.
- for (String input : INVALID_RFC3164_TIMESTAMPS) {
- Assert.assertEquals("Expected 0 for unparseable RFC 3164 timestamp '" +
input + "'",
- 0L, parser.parseRfc3164Time(input));
+ // parseRfc3164Time() signals a parse failure by returning 0 rather
than throwing.
+ for (String input : INVALID_RFC3164_TIMESTAMPS) {
+ Assert.assertEquals(
+ "Expected 0 for unparseable RFC 3164 timestamp '" + input
+ "'",
+ 0L,
+ parser.parseRfc3164Time(input));
+ }
}
- }
- @Test
- public void testParseRfc5424Time() {
- SyslogParser parser = new SyslogParser();
+ @Test
+ public void testParseRfc5424Time() {
+ SyslogParser parser = new SyslogParser();
- for (Entry entry : VALID_TIMESTAMPS) {
- long time = parser.parseRfc5424Date(entry.getRfc5424());
- Assert.assertEquals(entry.millis + " is not the same as timestamp " +
time, entry.getMillis(), time);
+ for (Entry entry : VALID_TIMESTAMPS) {
+ long time = parser.parseRfc5424Date(entry.getRfc5424());
+ Assert.assertEquals(entry.millis + " is not the same as timestamp
" + time, entry.getMillis(), time);
+ }
}
- }
- @Test
- public void testParseRfc5424TimeInvalid() {
- SyslogParser parser = new SyslogParser();
+ @Test
+ public void testParseRfc5424TimeInvalid() {
+ SyslogParser parser = new SyslogParser();
- for (String input : INVALID_RFC5424_TIMESTAMPS) {
- Assert.assertThrows("Expected parsing to fail for RFC 5424 timestamp '"
+ input + "'",
- IllegalArgumentException.class, () ->
parser.parseRfc5424Date(input));
- }
- }
-
- @Test
- public void testMessageParsing() {
- SyslogParser parser = new SyslogParser();
- Charset charset = Charsets.UTF_8;
- List<String> messages = Lists.newArrayList();
-
- // supported examples from RFC 3164
- messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
- "lonvick on /dev/pts/8");
- messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!");
- messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
- "It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # "
+
- "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
- "Conveyer1=OK, Conveyer2=OK # %%");
- messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
- "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
-
- // supported examples from RFC 5424
- messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
- "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
- messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
- "8710 - - %% It's time to make the do-nuts.");
-
- // non-standard (but common) messages (RFC3339 dates, no version digit)
- messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
- messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
-
- // test with default keepFields = false
- for (String msg : messages) {
- Set<String> keepFields = new HashSet<String>();
- Event event = parser.parseMessage(msg, charset, keepFields);
- Assert.assertNull("Failure to parse known-good syslog message",
- event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+ for (String input : INVALID_RFC5424_TIMESTAMPS) {
+ Assert.assertThrows(
+ "Expected parsing to fail for RFC 5424 timestamp '" +
input + "'",
+ IllegalArgumentException.class,
+ () -> parser.parseRfc5424Date(input));
+ }
}
- // test that priority, timestamp and hostname are preserved in event body
- for (String msg : messages) {
- Set<String> keepFields = new HashSet<String>();
- keepFields.add(SyslogUtils.KEEP_FIELDS_ALL);
- Event event = parser.parseMessage(msg, charset, keepFields);
- Assert.assertArrayEquals(event.getBody(), msg.getBytes());
- Assert.assertNull("Failure to parse known-good syslog message",
- event.getHeaders().get(SyslogUtils.EVENT_STATUS));
- }
+ @Test
+ public void testMessageParsing() {
+ SyslogParser parser = new SyslogParser();
+ Charset charset = Charsets.UTF_8;
+ List<String> messages = Lists.newArrayList();
+
+ // supported examples from RFC 3164
+ messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for "
+ "lonvick on /dev/pts/8");
+ messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!");
+ messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% "
+ + "It's time to make the do-nuts. %% Ingredients: Mix=OK,
Jelly=OK # "
+ + "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport:
"
+ + "Conveyer1=OK, Conveyer2=OK # %%");
+ messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 "
+ + "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All
Folks!");
+
+ // supported examples from RFC 5424
+ messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su
- "
+ + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+ messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc
"
+ + "8710 - - %% It's time to make the do-nuts.");
+
+ // non-standard (but common) messages (RFC3339 dates, no version digit)
+ messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+ messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+ // test with default keepFields = false
+ for (String msg : messages) {
+ Set<String> keepFields = new HashSet<String>();
+ Event event = parser.parseMessage(msg, charset, keepFields);
+ Assert.assertNull(
+ "Failure to parse known-good syslog message",
+ event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+ }
- // test that hostname is preserved in event body
- for (String msg : messages) {
- Set<String> keepFields = new HashSet<String>();
-
keepFields.add(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
- Event event = parser.parseMessage(msg, charset, keepFields);
- Assert.assertTrue("Failure to persist hostname",
- new
String(event.getBody()).contains(event.getHeaders().get("host")));
- Assert.assertNull("Failure to parse known-good syslog message",
- event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+ // test that priority, timestamp and hostname are preserved in event
body
+ for (String msg : messages) {
+ Set<String> keepFields = new HashSet<String>();
+ keepFields.add(SyslogUtils.KEEP_FIELDS_ALL);
+ Event event = parser.parseMessage(msg, charset, keepFields);
+ Assert.assertArrayEquals(event.getBody(), msg.getBytes());
+ Assert.assertNull(
+ "Failure to parse known-good syslog message",
+ event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+ }
+
+ // test that hostname is preserved in event body
+ for (String msg : messages) {
+ Set<String> keepFields = new HashSet<String>();
+
keepFields.add(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
+ Event event = parser.parseMessage(msg, charset, keepFields);
+ Assert.assertTrue(
+ "Failure to persist hostname",
+ new
String(event.getBody()).contains(event.getHeaders().get("host")));
+ Assert.assertNull(
+ "Failure to parse known-good syslog message",
+ event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+ }
}
- }
-}
\ No newline at end of file
+}