This is an automated email from the ASF dual-hosted git repository.

ppkarwasz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/logging-flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d4cd0f9e8 Fix RFC 5424 timestamp parsing edge cases (#441)
d4cd0f9e8 is described below

commit d4cd0f9e8c0929268136a47bb4497ca3514baaec
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Fri Jun 5 20:03:14 2026 +0200

    Fix RFC 5424 timestamp parsing edge cases (#441)
    
    `parseRfc5424Date` mishandled malformed and extreme inputs:
    
    - Fractional seconds were scaled with `Math.pow`, the lossy double/long
      conversion CodeQL flagged, and `Long.parseLong` over the whole fraction
      overflowed on long inputs. Read at most 3 digits and zero-pad with
      `StringUtils.rightPad` instead; finer-than-millisecond digits are
      truncated anyway.
    - A fraction with no trailing timezone scanned past the end of the string
      and threw `StringIndexOutOfBoundsException`; the loop is now bounded and
      reports an `IllegalArgumentException`.
    - A timezone not starting with `Z`, `+` or `-` was silently accepted; it
      is now rejected.
    - The offset is computed with `long` arithmetic to avoid int overflow.
    
    Assisted-By: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../java/org/apache/flume/source/SyslogParser.java | 542 ++++++++++-----------
 .../org/apache/flume/source/TestSyslogParser.java  | 297 +++++------
 2 files changed, 419 insertions(+), 420 deletions(-)

diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index 6019573c9..e1d680a9c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-
 package org.apache.flume.source;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.LoadingCache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.charset.Charset;
 import java.time.LocalDateTime;
 import java.time.Year;
@@ -41,8 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
-
-import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
@@ -54,303 +48,287 @@ import org.slf4j.LoggerFactory;
 @InterfaceStability.Evolving
 public class SyslogParser {
 
-  private static final Logger logger =
-      LoggerFactory.getLogger(SyslogParser.class);
-
-  private static final int TS_CACHE_MAX = 1000;  // timestamp cache size limit
-  private static final Pattern TWO_SPACES = Pattern.compile("  ");
-  private static final DateTimeFormatter RFC3164_FORMAT =
-          new DateTimeFormatterBuilder()
-                  .appendPattern("MMM d HH:mm:ss")
-                  .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // 
Adds current year
-                  .toFormatter(Locale.ENGLISH); // Always use English month 
names
-  private static final DateTimeFormatter RFC5424_FORMAT =
-          new DateTimeFormatterBuilder()
-                  .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses 
YYYY-MM-DDTHH:mm:ss.SSSSSS
-                  .optionalStart()                               // Start of 
optional timezone section
-                  .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
-                  .optionalEnd()                                 // End of 
optional timezone section
-                  .toFormatter();
-
-  private static final int RFC3164_LEN = 15;
-  private static final int RFC5424_PREFIX_LEN = 19;
-
-  private LoadingCache<String, Long> timestampCache;
-
-  public SyslogParser() {
-    timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(
-        new CacheLoader<String, Long>() {
-
-          @Override
-          public Long load(String key) {
-              return ZonedDateTime.parse(key, 
RFC5424_FORMAT.withZone(ZoneOffset.UTC)).toInstant().toEpochMilli();
-          }
+    private static final Logger logger = 
LoggerFactory.getLogger(SyslogParser.class);
+
+    private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit
+    private static final Pattern TWO_SPACES = Pattern.compile("  ");
+    private static final DateTimeFormatter RFC3164_FORMAT = new 
DateTimeFormatterBuilder()
+            .appendPattern("MMM d HH:mm:ss")
+            .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // Adds 
current year
+            .toFormatter(Locale.ENGLISH); // Always use English month names
+    private static final DateTimeFormatter RFC5424_FORMAT = new 
DateTimeFormatterBuilder()
+            .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses 
YYYY-MM-DDTHH:mm:ss.SSSSSS
+            .optionalStart() // Start of optional timezone section
+            .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
+            .optionalEnd() // End of optional timezone section
+            .toFormatter();
+
+    private static final int RFC3164_LEN = 15;
+    private static final int RFC5424_PREFIX_LEN = 19;
+    // Fractional seconds are kept at millisecond precision (3 digits); any 
extra digits are discarded.
+    private static final int MILLISECOND_FRACTION_DIGITS = 3;
+
+    private LoadingCache<String, Long> timestampCache;
+
+    public SyslogParser() {
+        timestampCache = 
CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(new 
CacheLoader<String, Long>() {
+
+            @Override
+            public Long load(String key) {
+                return ZonedDateTime.parse(key, 
RFC5424_FORMAT.withZone(ZoneOffset.UTC))
+                        .toInstant()
+                        .toEpochMilli();
+            }
         });
-  }
+    }
 
     /**
-   * Parses a Flume Event out of a syslog message string.
-   * @param msg Syslog message, not including the newline character
-   * @return Parsed Flume Event
-   * @throws IllegalArgumentException if unable to successfully parse message
-   */
-  public Event parseMessage(String msg, Charset charset, Set<String> 
keepFields) {
-    Map<String, String> headers = Maps.newHashMap();
-
-    int msgLen = msg.length();
-
-    int curPos = 0;
-
-    Preconditions.checkArgument(msg.charAt(curPos) == '<',
-        "Bad format: invalid priority: cannot find open bracket '<' (%s)", 
msg);
+     * Parses a Flume Event out of a syslog message string.
+     * @param msg Syslog message, not including the newline character
+     * @return Parsed Flume Event
+     * @throws IllegalArgumentException if unable to successfully parse message
+     */
+    public Event parseMessage(String msg, Charset charset, Set<String> 
keepFields) {
+        Map<String, String> headers = Maps.newHashMap();
 
-    int endBracketPos = msg.indexOf('>');
-    Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6,
-        "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg);
+        int msgLen = msg.length();
 
-    String priority = msg.substring(1, endBracketPos);
-    int pri = Integer.parseInt(priority);
-    int facility = pri / 8;
-    int severity = pri % 8;
+        int curPos = 0;
 
-    // Remember priority
-    headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
+        Preconditions.checkArgument(
+                msg.charAt(curPos) == '<', "Bad format: invalid priority: 
cannot find open bracket '<' (%s)", msg);
 
-    // put fac / sev into header
-    headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
-    headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
+        int endBracketPos = msg.indexOf('>');
+        Preconditions.checkArgument(
+                endBracketPos > 0 && endBracketPos <= 6,
+                "Bad format: invalid priority: cannot find end bracket '>' 
(%s)",
+                msg);
 
-    Preconditions.checkArgument(msgLen > endBracketPos + 1,
-        "Bad format: no data except priority (%s)", msg);
+        String priority = msg.substring(1, endBracketPos);
+        int pri = Integer.parseInt(priority);
+        int facility = pri / 8;
+        int severity = pri % 8;
 
-    // update parsing position
-    curPos = endBracketPos + 1;
+        // Remember priority
+        headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
 
-    // remember version string
-    String version = null;
-    if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) 
{
-      version = msg.substring(curPos, curPos + 1);
-      headers.put(SyslogUtils.SYSLOG_VERSION, version);
-      curPos += 2;
-    }
+        // put fac / sev into header
+        headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
+        headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
 
-    // now parse timestamp (handle different varieties)
+        Preconditions.checkArgument(msgLen > endBracketPos + 1, "Bad format: 
no data except priority (%s)", msg);
 
-    long ts;
-    String tsString;
-    char dateStartChar = msg.charAt(curPos);
+        // update parsing position
+        curPos = endBracketPos + 1;
 
-    try {
-
-      // no timestamp specified; use relay current time
-      if (dateStartChar == '-') {
-        tsString = Character.toString(dateStartChar);
-        ts = System.currentTimeMillis();
-        if (msgLen <= curPos + 2) {
-          throw new IllegalArgumentException(
-              "bad syslog format (missing hostname)");
+        // remember version string
+        String version = null;
+        if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 
2))) {
+            version = msg.substring(curPos, curPos + 1);
+            headers.put(SyslogUtils.SYSLOG_VERSION, version);
+            curPos += 2;
         }
-        curPos += 2; // assume we skip past a space to get to the hostname
 
-      // rfc3164 timestamp
-      } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
-        if (msgLen <= curPos + RFC3164_LEN) {
-          throw new IllegalArgumentException("bad timestamp format");
+        // now parse timestamp (handle different varieties)
+
+        long ts;
+        String tsString;
+        char dateStartChar = msg.charAt(curPos);
+
+        try {
+
+            // no timestamp specified; use relay current time
+            if (dateStartChar == '-') {
+                tsString = Character.toString(dateStartChar);
+                ts = System.currentTimeMillis();
+                if (msgLen <= curPos + 2) {
+                    throw new IllegalArgumentException("bad syslog format 
(missing hostname)");
+                }
+                curPos += 2; // assume we skip past a space to get to the 
hostname
+
+                // rfc3164 timestamp
+            } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
+                if (msgLen <= curPos + RFC3164_LEN) {
+                    throw new IllegalArgumentException("bad timestamp format");
+                }
+                tsString = msg.substring(curPos, curPos + RFC3164_LEN);
+                ts = parseRfc3164Time(tsString);
+                curPos += RFC3164_LEN + 1;
+
+                // rfc 5424 timestamp
+            } else {
+                int nextSpace = msg.indexOf(' ', curPos);
+                if (nextSpace == -1) {
+                    throw new IllegalArgumentException("bad timestamp format");
+                }
+                tsString = msg.substring(curPos, nextSpace);
+                ts = parseRfc5424Date(tsString);
+                curPos = nextSpace + 1;
+            }
+
+        } catch (IllegalArgumentException ex) {
+            throw new IllegalArgumentException("Unable to parse message: " + 
msg, ex);
         }
-        tsString = msg.substring(curPos, curPos + RFC3164_LEN);
-        ts = parseRfc3164Time(tsString);
-        curPos += RFC3164_LEN + 1;
 
-      // rfc 5424 timestamp
-      } else {
+        headers.put("timestamp", String.valueOf(ts));
+
+        // parse out hostname
         int nextSpace = msg.indexOf(' ', curPos);
         if (nextSpace == -1) {
-          throw new IllegalArgumentException("bad timestamp format");
+            throw new IllegalArgumentException("bad syslog format (missing 
hostname)");
+        }
+        // copy the host string to avoid holding the message string in memory
+        // if using a memory-based queue
+        String hostname = new String(msg.substring(curPos, nextSpace));
+        headers.put("host", hostname);
+
+        // EventBuilder will do a copy of its own, so no defensive copy of the 
body
+        String data = "";
+        if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
+            curPos = nextSpace + 1;
+            data = msg.substring(curPos);
+            data = SyslogUtils.addFieldsToBody(keepFields, data, priority, 
version, tsString, hostname);
+        } else {
+            data = msg;
         }
-        tsString = msg.substring(curPos, nextSpace);
-        ts = parseRfc5424Date(tsString);
-        curPos = nextSpace + 1;
-      }
-
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException("Unable to parse message: " + msg, 
ex);
-    }
-
-    headers.put("timestamp", String.valueOf(ts));
 
-    // parse out hostname
-    int nextSpace = msg.indexOf(' ', curPos);
-    if (nextSpace == -1) {
-      throw new IllegalArgumentException(
-          "bad syslog format (missing hostname)");
-    }
-    // copy the host string to avoid holding the message string in memory
-    // if using a memory-based queue
-    String hostname = new String(msg.substring(curPos, nextSpace));
-    headers.put("host", hostname);
-
-    // EventBuilder will do a copy of its own, so no defensive copy of the body
-    String data = "";
-    if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
-      curPos = nextSpace + 1;
-      data = msg.substring(curPos);
-      data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version, 
tsString, hostname);
-    } else {
-      data = msg;
-    }
+        Event event = EventBuilder.withBody(data, charset, headers);
 
-    Event event = EventBuilder.withBody(data, charset, headers);
-
-    return event;
-  }
-
-  /**
-   * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
-   * multiple messages that occur in the same second.
-   * @param msg
-   * @return Typical (for Java) milliseconds since UNIX epoch
-   */
-  protected long parseRfc5424Date(String msg) {
-    Long ts;
-    int curPos = 0;
-
-    int msgLen = msg.length();
-    Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN,
-        "Bad format: Not a valid RFC5424 timestamp: %s", msg);
-    String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
-
-    try {
-      ts = timestampCache.get(timestampPrefix);
-    } catch (ExecutionException | UncheckedExecutionException ex) {
-      throw new IllegalArgumentException("bad timestamp format", 
ex.getCause());
+        return event;
     }
 
-    curPos += RFC5424_PREFIX_LEN;
-
-    // look for the optional fractional seconds
-    if (msg.charAt(curPos) == '.') {
-      // figure out how many numeric digits
-      boolean foundEnd = false;
-      int endMillisPos = curPos + 1;
-
-      if (msgLen <= endMillisPos) {
-        throw new IllegalArgumentException("bad timestamp format (no TZ)");
-      }
-
-      // FIXME: TODO: ensure we handle all bad formatting cases
-      while (!foundEnd) {
-        char curDigit = msg.charAt(endMillisPos);
-        if (curDigit >= '0' && curDigit <= '9') {
-          endMillisPos++;
-        } else {
-          foundEnd = true;
-        }
-      }
-
-      // if they had a valid fractional second, append it rounded to millis
-      final int fractionalPositions = endMillisPos - (curPos + 1);
-      if (fractionalPositions > 0) {
-        long milliseconds = Long.parseLong(msg.substring(curPos + 1, 
endMillisPos));
-        if (fractionalPositions > 3) {
-          milliseconds /= Math.pow(10, (fractionalPositions - 3));
-        } else if (fractionalPositions < 3) {
-          milliseconds *= Math.pow(10, (3 - fractionalPositions));
+    /**
+     * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
+     * multiple messages that occur in the same second.
+     * @param msg
+     * @return Typical (for Java) milliseconds since UNIX epoch
+     */
+    protected long parseRfc5424Date(String msg) {
+        Long ts;
+        int curPos = 0;
+
+        int msgLen = msg.length();
+        Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, "Bad format: 
Not a valid RFC5424 timestamp: %s", msg);
+        String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
+
+        try {
+            ts = timestampCache.get(timestampPrefix);
+        } catch (ExecutionException | UncheckedExecutionException ex) {
+            throw new IllegalArgumentException("bad timestamp format", 
ex.getCause());
         }
-        ts += milliseconds;
-      } else {
-        throw new IllegalArgumentException(
-            "Bad format: Invalid timestamp (fractional portion): " + msg);
-      }
-
-      curPos = endMillisPos;
-    }
 
-    // look for timezone
-    char tzFirst = msg.charAt(curPos);
-
-    // UTC
-    if (tzFirst == 'Z') {
-      // no-op
-    } else if (tzFirst == '+' || tzFirst == '-') {
-
-      Preconditions.checkArgument(msgLen > curPos + 5,
-          "Bad format: Invalid timezone (%s)", msg);
-
-      int polarity;
-      if (tzFirst == '+') {
-        polarity = +1;
-      } else {
-        polarity = -1;
-      }
-
-      char[] h = new char[5];
-      for (int i = 0; i < 5; i++) {
-        h[i] = msg.charAt(curPos + 1 + i);
-      }
-
-      if (h[0] >= '0' && h[0] <= '9'
-          && h[1] >= '0' && h[1] <= '9'
-          && h[2] == ':'
-          && h[3] >= '0' && h[3] <= '9'
-          && h[4] >= '0' && h[4] <= '9') {
-        int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 
3));
-        int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 
6));
-        ts -= polarity * ((hourOffset * 60) + minOffset) * 60000;
-      } else {
-        throw new IllegalArgumentException(
-            "Bad format: Invalid timezone: " + msg);
-      }
+        curPos += RFC5424_PREFIX_LEN;
+
+        // look for the optional fractional seconds
+        if (msg.charAt(curPos) == '.') {
+            curPos++;
+            // figure out how many numeric digits
+            int endMillisPos = curPos;
+            while (endMillisPos < msgLen && msg.charAt(endMillisPos) >= '0' && 
msg.charAt(endMillisPos) <= '9') {
+                endMillisPos++;
+            }
+
+            // the fractional seconds must be followed by a timezone
+            if (endMillisPos >= msgLen) {
+                throw new IllegalArgumentException("bad timestamp format (no 
TZ)");
+            }
+
+            // if they had a valid fractional second, truncate it to 
millisecond precision and append
+            int fractionalPositions = Math.min(endMillisPos - curPos, 
MILLISECOND_FRACTION_DIGITS);
+            if (fractionalPositions > 0) {
+                String fraction = msg.substring(curPos, curPos + 
fractionalPositions);
+                ts += Integer.parseInt(StringUtils.rightPad(fraction, 
MILLISECOND_FRACTION_DIGITS, '0'));
+            } else {
+                throw new IllegalArgumentException("Bad format: Invalid 
timestamp (fractional portion): " + msg);
+            }
+
+            curPos = endMillisPos;
+        }
 
-    }
+        // look for timezone
+        char tzFirst = msg.charAt(curPos);
+
+        // UTC
+        if (tzFirst == 'Z') {
+            return ts;
+        // numeric timezone
+        } else if (tzFirst == '+' || tzFirst == '-') {
+
+            Preconditions.checkArgument(msgLen > curPos + 5, "Bad format: 
Invalid timezone (%s)", msg);
+
+            int polarity;
+            if (tzFirst == '+') {
+                polarity = +1;
+            } else {
+                polarity = -1;
+            }
+
+            char[] h = new char[5];
+            for (int i = 0; i < 5; i++) {
+                h[i] = msg.charAt(curPos + 1 + i);
+            }
+
+            if (h[0] >= '0'
+                    && h[0] <= '9'
+                    && h[1] >= '0'
+                    && h[1] <= '9'
+                    && h[2] == ':'
+                    && h[3] >= '0'
+                    && h[3] <= '9'
+                    && h[4] >= '0'
+                    && h[4] <= '9') {
+                int hourOffset = Integer.parseInt(msg.substring(curPos + 1, 
curPos + 3));
+                int minOffset = Integer.parseInt(msg.substring(curPos + 4, 
curPos + 6));
+                return ts - polarity * ((hourOffset * 60L) + minOffset) * 
60000;
+            }
+        }
 
-    return ts;
-  }
-
-  /**
-   * Parse the RFC3164 date format. This is trickier than it sounds because 
this
-   * format does not specify a year so we get weird edge cases at year
-   * boundaries. This implementation tries to "do what I mean".
-   * @param ts RFC3164-compatible timestamp to be parsed
-   * @return Typical (for Java) milliseconds since the UNIX epoch
-   */
-  protected long parseRfc3164Time(String ts) {
-    LocalDateTime now = LocalDateTime.now();
-    int year = now.getYear();
-
-    ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
-
-    LocalDateTime date;
-    try {
-      date = LocalDateTime.parse(ts, RFC3164_FORMAT);
-    } catch (DateTimeParseException e) {
-      logger.debug("rfc3164 date parse failed on (" + ts + "): invalid 
format", e);
-      return 0;
+        throw new IllegalArgumentException("Bad format: Invalid timezone: " + 
msg);
     }
 
-    // rfc3164 dates are really dumb.
-    /*
-     * Some code to try and add some smarts to the year insertion as without a 
year in the message
-     * we need to make some educated guessing.
-     * First set the "fixed" to be the timestamp with the current year.
-     * If the "fixed" time is more than one month in the future then roll it 
back a year.
-     * If the "fixed" time is more than eleven months in the past then roll it 
forward a year.
-     * This gives us a 12 month rolling window (11 months in the past, 1 month 
in the future) of
-     * timestamps.
+    /**
+     * Parse the RFC3164 date format. This is trickier than it sounds because 
this
+     * format does not specify a year so we get weird edge cases at year
+     * boundaries. This implementation tries to "do what I mean".
+     * @param ts RFC3164-compatible timestamp to be parsed
+     * @return Typical (for Java) milliseconds since the UNIX epoch
      */
+    protected long parseRfc3164Time(String ts) {
+        LocalDateTime now = LocalDateTime.now();
+        int year = now.getYear();
+
+        ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
+
+        LocalDateTime date;
+        try {
+            date = LocalDateTime.parse(ts, RFC3164_FORMAT);
+        } catch (DateTimeParseException e) {
+            logger.debug("rfc3164 date parse failed on (" + ts + "): invalid 
format", e);
+            return 0;
+        }
 
-    LocalDateTime fixed = date.withYear(year);
+        // rfc3164 dates are really dumb.
+        /*
+         * Some code to try and add some smarts to the year insertion as 
without a year in the message
+         * we need to make some educated guessing.
+         * First set the "fixed" to be the timestamp with the current year.
+         * If the "fixed" time is more than one month in the future then roll 
it back a year.
+         * If the "fixed" time is more than eleven months in the past then 
roll it forward a year.
+         * This gives us a 12 month rolling window (11 months in the past, 1 
month in the future) of
+         * timestamps.
+         */
+
+        LocalDateTime fixed = date.withYear(year);
+
+        // flume clock is ahead or there is some latency, and the year rolled
+        if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
+            fixed = date.minusYears(1);
+            // flume clock is behind and the year rolled
+        } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
+            fixed = date.plusYears(1);
+        }
+        date = fixed;
 
-    // flume clock is ahead or there is some latency, and the year rolled
-    if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
-      fixed = date.minusYears(1);
-    // flume clock is behind and the year rolled
-    } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
-      fixed = date.plusYears(1);
+        return date.toInstant(ZoneOffset.UTC).toEpochMilli();
     }
-    date = fixed;
-
-    return date.toInstant(ZoneOffset.UTC).toEpochMilli();
-  }
-
-
 }
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
index 81f078490..07200383a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
@@ -1,30 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flume.source;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-import org.apache.flume.Event;
-
-import org.junit.Assert;
-import org.junit.Test;
-
 import java.nio.charset.Charset;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -33,6 +26,9 @@ import java.time.ZoneOffset;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class TestSyslogParser {
     private static class Entry {
@@ -45,162 +41,187 @@ public class TestSyslogParser {
             this.rfc5424 = rfc5424;
             this.rfc3164 = rfc3164;
             this.millis = millis;
-            this.localDateTime = LocalDateTime.ofInstant(
-                    Instant.ofEpochMilli(millis), ZoneOffset.UTC);
+            this.localDateTime = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC);
         }
 
         public String getRfc5424() {
             return rfc5424;
         }
+
         public String getRfc3164() {
             return rfc3164;
         }
+
         public LocalDateTime getLocalDateTime() {
             return localDateTime;
         }
+
         public long getMillis() {
             return millis;
         }
     }
 
     private static final Entry[] VALID_TIMESTAMPS = {
-            new Entry("1985-04-12T23:20:50.52Z", "Apr 12 23:20:50", 
482196050520L),
-            new Entry("1985-04-12T19:20:50.52-04:00", "Apr 12 23:20:50", 
482196050520L),
-            new Entry("2003-10-11T22:14:15.003Z", "Oct 11 22:14:15", 
1065910455003L),
-            new Entry("2003-08-24T05:14:15.000003-07:00", "Aug 24 12:14:15", 
1061727255000L),
-            new Entry("2012-04-13T08:08:08.0001+00:00", "Apr 13 08:08:08", 
1334304488000L),
-            new Entry("2012-04-13T08:08:08.251+00:00", "Apr 13 08:08:08", 
1334304488251L),
-            // The same instant with 0, 3, 6 and 9 fractional-second digits:
-            // anything finer than a millisecond is truncated.
-            new Entry("2003-10-11T22:14:15Z", "Oct 11 22:14:15", 
1065910455000L),
-            new Entry("2003-10-11T22:14:15.123Z", "Oct 11 22:14:15", 
1065910455123L),
-            new Entry("2003-10-11T22:14:15.123456Z", "Oct 11 22:14:15", 
1065910455123L),
-            new Entry("2003-10-11T22:14:15.123456789Z", "Oct 11 22:14:15", 
1065910455123L)
+        new Entry("1985-04-12T23:20:50.52Z", "Apr 12 23:20:50", 482196050520L),
+        new Entry("1985-04-12T19:20:50.52-04:00", "Apr 12 23:20:50", 
482196050520L),
+        new Entry("2003-10-11T22:14:15.003Z", "Oct 11 22:14:15", 
1065910455003L),
+        new Entry("2003-08-24T05:14:15.000003-07:00", "Aug 24 12:14:15", 
1061727255000L),
+        new Entry("2012-04-13T08:08:08.0001+00:00", "Apr 13 08:08:08", 
1334304488000L),
+        new Entry("2012-04-13T08:08:08.251+00:00", "Apr 13 08:08:08", 
1334304488251L),
+        new Entry("2012-04-13T09:08:08.251+01:00", "Apr 13 08:08:08", 
1334304488251L),
+        new Entry("2012-04-13T07:08:08.251-01:00", "Apr 13 08:08:08", 
1334304488251L),
+        // The same instant with 0, 3, 6 and 9 fractional-second digits:
+        // anything finer than a millisecond is truncated.
+        new Entry("2003-10-11T22:14:15Z", "Oct 11 22:14:15", 1065910455000L),
+        new Entry("2003-10-11T22:14:15.123Z", "Oct 11 22:14:15", 
1065910455123L),
+        new Entry("2003-10-11T22:14:15.123456Z", "Oct 11 22:14:15", 
1065910455123L),
+        new Entry("2003-10-11T22:14:15.123456789Z", "Oct 11 22:14:15", 
1065910455123L),
+        // Fractional digits that would overflow a `long`
+        new 
Entry("2003-10-11T22:14:15.1234567890123456789012345678901234567890Z", "Oct 11 
22:14:15", 1065910455123L)
     };
 
     private static final String[] INVALID_RFC3164_TIMESTAMPS = {
-            "",                 // empty
-            "not a date",       // not a timestamp at all
-            "Foo 12 23:20:50",  // invalid month name
-            "Apr 12 25:61:61",  // hour/minute/second out of range
+        "", // empty
+        "not a date", // not a timestamp at all
+        "Foo 12 23:20:50", // invalid month name
+        "Apr 12 25:61:61", // hour/minute/second out of range
     };
 
     private static final String[] INVALID_RFC5424_TIMESTAMPS = {
-            "",                       // empty
-            "2003-10-11T22:14",       // too short to hold a full timestamp
-            "2003-10-11T22:14:15.",   // fractional-second marker but no 
digits or TZ
-            "2003-10-11T22:14:15+0",  // truncated timezone offset
-            "xxxx-10-11T22:14:15Z",   // prefix is not a valid date-time
+        "", // empty
+        "2003-10-11T22:14", // too short to hold a full timestamp
+        "2003-10-11T22:14:15.", // fractional-second marker but no digits nor 
TZ
+        "2003-10-11T22:14:15.123456789", // fractional-second marker but no TZ
+        "2003-10-11T22:14:15.Z", // fractional-second marker but no digits
+        "2003-10-11T22:14:15+0", // truncated timezone offset
+        "xxxx-10-11T22:14:15Z", // prefix is not a valid date-time
+        "2003-10-11T22:14:15+/2:34", // timezone with invalid digits
+        "2003-10-11T22:14:15+@2:34",
+        "2003-10-11T22:14:15+1/:34",
+        "2003-10-11T22:14:15+1@:34",
+        "2003-10-11T22:14:15+12/34",
+        "2003-10-11T22:14:15+12@34",
+        "2003-10-11T22:14:15+12:/4",
+        "2003-10-11T22:14:15+12:@4",
+        "2003-10-11T22:14:15+12:3/",
+        "2003-10-11T22:14:15+12:3@",
+        "2003-10-11T22:14:15A", // invalid timezone letter
     };
 
-  @Test
-  public void testParseRfc3164Time() {
-    SyslogParser parser = new SyslogParser();
-    LocalDate now = LocalDate.now();
-
-    for (Entry entry : VALID_TIMESTAMPS) {
-      long time = parser.parseRfc3164Time(entry.getRfc3164());
-
-      // RFC 3164 timestamps have no year, therefore we only assert the 
equality
-      LocalDateTime expected = entry.getLocalDateTime();
-      LocalDateTime actual = LocalDateTime.ofInstant(
-          Instant.ofEpochMilli(time), ZoneOffset.UTC);
-
-      String msg = entry.getRfc3164() + " was not parsed correctly (got " + 
actual + ")";
-      Assert.assertEquals(msg, expected.getMonth(), actual.getMonth());
-      Assert.assertEquals(msg, expected.getDayOfMonth(), 
actual.getDayOfMonth());
-      Assert.assertEquals(msg, expected.getHour(), actual.getHour());
-      Assert.assertEquals(msg, expected.getMinute(), actual.getMinute());
-      Assert.assertEquals(msg, expected.getSecond(), actual.getSecond());
-      Assert.assertTrue(msg, Math.abs(now.getYear() - actual.getYear()) <= 1);
+    @Test
+    public void testParseRfc3164Time() {
+        SyslogParser parser = new SyslogParser();
+        LocalDate now = LocalDate.now();
+
+        for (Entry entry : VALID_TIMESTAMPS) {
+            long time = parser.parseRfc3164Time(entry.getRfc3164());
+
+            // RFC 3164 timestamps have no year, therefore we only assert the 
equality
+            LocalDateTime expected = entry.getLocalDateTime();
+            LocalDateTime actual = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneOffset.UTC);
+
+            String msg = entry.getRfc3164() + " was not parsed correctly (got 
" + actual + ")";
+            Assert.assertEquals(msg, expected.getMonth(), actual.getMonth());
+            Assert.assertEquals(msg, expected.getDayOfMonth(), 
actual.getDayOfMonth());
+            Assert.assertEquals(msg, expected.getHour(), actual.getHour());
+            Assert.assertEquals(msg, expected.getMinute(), actual.getMinute());
+            Assert.assertEquals(msg, expected.getSecond(), actual.getSecond());
+            Assert.assertTrue(msg, Math.abs(now.getYear() - actual.getYear()) 
<= 1);
+        }
     }
-  }
 
-  @Test
-  public void testParseRfc3164TimeInvalid() {
-    SyslogParser parser = new SyslogParser();
+    @Test
+    public void testParseRfc3164TimeInvalid() {
+        SyslogParser parser = new SyslogParser();
 
-    // parseRfc3164Time() signals a parse failure by returning 0 rather than 
throwing.
-    for (String input : INVALID_RFC3164_TIMESTAMPS) {
-      Assert.assertEquals("Expected 0 for unparseable RFC 3164 timestamp '" + 
input + "'",
-          0L, parser.parseRfc3164Time(input));
+        // parseRfc3164Time() signals a parse failure by returning 0 rather 
than throwing.
+        for (String input : INVALID_RFC3164_TIMESTAMPS) {
+            Assert.assertEquals(
+                    "Expected 0 for unparseable RFC 3164 timestamp '" + input 
+ "'",
+                    0L,
+                    parser.parseRfc3164Time(input));
+        }
     }
-  }
 
-  @Test
-  public void testParseRfc5424Time() {
-    SyslogParser parser = new SyslogParser();
+    @Test
+    public void testParseRfc5424Time() {
+        SyslogParser parser = new SyslogParser();
 
-    for (Entry entry : VALID_TIMESTAMPS) {
-        long time = parser.parseRfc5424Date(entry.getRfc5424());
-        Assert.assertEquals(entry.millis + " is not the same as timestamp " + 
time, entry.getMillis(), time);
+        for (Entry entry : VALID_TIMESTAMPS) {
+            long time = parser.parseRfc5424Date(entry.getRfc5424());
+            Assert.assertEquals(entry.millis + " is not the same as timestamp 
" + time, entry.getMillis(), time);
+        }
     }
-  }
 
-  @Test
-  public void testParseRfc5424TimeInvalid() {
-    SyslogParser parser = new SyslogParser();
+    @Test
+    public void testParseRfc5424TimeInvalid() {
+        SyslogParser parser = new SyslogParser();
 
-    for (String input : INVALID_RFC5424_TIMESTAMPS) {
-      Assert.assertThrows("Expected parsing to fail for RFC 5424 timestamp '" 
+ input + "'",
-          IllegalArgumentException.class, () -> 
parser.parseRfc5424Date(input));
-    }
-  }
-
-  @Test
-  public void testMessageParsing() {
-    SyslogParser parser = new SyslogParser();
-    Charset charset = Charsets.UTF_8;
-    List<String> messages = Lists.newArrayList();
-
-    // supported examples from RFC 3164
-    messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
-        "lonvick on /dev/pts/8");
-    messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
-    messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
-        "It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # " 
+
-         "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
-         "Conveyer1=OK, Conveyer2=OK # %%");
-    messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
-         "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
-
-    // supported examples from RFC 5424
-    messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
-        "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
-    messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
-        "8710 - - %% It's time to make the do-nuts.");
-
-    // non-standard (but common) messages (RFC3339 dates, no version digit)
-    messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
-    messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
-
-    // test with default keepFields = false
-    for (String msg : messages) {
-      Set<String> keepFields = new HashSet<String>();
-      Event event = parser.parseMessage(msg, charset, keepFields);
-      Assert.assertNull("Failure to parse known-good syslog message",
-                        event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+        for (String input : INVALID_RFC5424_TIMESTAMPS) {
+            Assert.assertThrows(
+                    "Expected parsing to fail for RFC 5424 timestamp '" + 
input + "'",
+                    IllegalArgumentException.class,
+                    () -> parser.parseRfc5424Date(input));
+        }
     }
 
-    // test that priority, timestamp and hostname are preserved in event body
-    for (String msg : messages) {
-      Set<String> keepFields = new HashSet<String>();
-      keepFields.add(SyslogUtils.KEEP_FIELDS_ALL);
-      Event event = parser.parseMessage(msg, charset, keepFields);
-      Assert.assertArrayEquals(event.getBody(), msg.getBytes());
-      Assert.assertNull("Failure to parse known-good syslog message",
-          event.getHeaders().get(SyslogUtils.EVENT_STATUS));
-    }
+    @Test
+    public void testMessageParsing() {
+        SyslogParser parser = new SyslogParser();
+        Charset charset = Charsets.UTF_8;
+        List<String> messages = Lists.newArrayList();
+
+        // supported examples from RFC 3164
+        messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " 
+ "lonvick on /dev/pts/8");
+        messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
+        messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% "
+                + "It's time to make the do-nuts.  %%  Ingredients: Mix=OK, 
Jelly=OK # "
+                + "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: 
"
+                + "Conveyer1=OK, Conveyer2=OK # %%");
+        messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 "
+                + "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All 
Folks!");
+
+        // supported examples from RFC 5424
+        messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su 
- "
+                + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+        messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 
"
+                + "8710 - - %% It's time to make the do-nuts.");
+
+        // non-standard (but common) messages (RFC3339 dates, no version digit)
+        messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+        messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+        // test with default keepFields = false
+        for (String msg : messages) {
+            Set<String> keepFields = new HashSet<String>();
+            Event event = parser.parseMessage(msg, charset, keepFields);
+            Assert.assertNull(
+                    "Failure to parse known-good syslog message",
+                    event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+        }
 
-    // test that hostname is preserved in event body
-    for (String msg : messages) {
-      Set<String> keepFields = new HashSet<String>();
-      
keepFields.add(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
-      Event event = parser.parseMessage(msg, charset, keepFields);
-      Assert.assertTrue("Failure to persist hostname",
-          new 
String(event.getBody()).contains(event.getHeaders().get("host")));
-      Assert.assertNull("Failure to parse known-good syslog message",
-          event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+        // test that priority, timestamp and hostname are preserved in event 
body
+        for (String msg : messages) {
+            Set<String> keepFields = new HashSet<String>();
+            keepFields.add(SyslogUtils.KEEP_FIELDS_ALL);
+            Event event = parser.parseMessage(msg, charset, keepFields);
+            Assert.assertArrayEquals(event.getBody(), msg.getBytes());
+            Assert.assertNull(
+                    "Failure to parse known-good syslog message",
+                    event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+        }
+
+        // test that hostname is preserved in event body
+        for (String msg : messages) {
+            Set<String> keepFields = new HashSet<String>();
+            
keepFields.add(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
+            Event event = parser.parseMessage(msg, charset, keepFields);
+            Assert.assertTrue(
+                    "Failure to persist hostname",
+                    new 
String(event.getBody()).contains(event.getHeaders().get("host")));
+            Assert.assertNull(
+                    "Failure to parse known-good syslog message",
+                    event.getHeaders().get(SyslogUtils.EVENT_STATUS));
+        }
     }
-  }
-}
\ No newline at end of file
+}


Reply via email to