This is an automated email from the ASF dual-hosted git repository. ppkarwasz pushed a commit to branch fix/syslog-parser in repository https://gitbox.apache.org/repos/asf/logging-flume.git
commit 75dba7c4b4808e40d61ac869f38ff07c38dae521 Author: Piotr P. Karwasz <[email protected]> AuthorDate: Fri Jun 5 18:20:46 2026 +0200 Format `SyslogParser` --- .../java/org/apache/flume/source/SyslogParser.java | 551 ++++++++++----------- 1 file changed, 269 insertions(+), 282 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java index 6019573c9..e0b261a2c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,18 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ - package org.apache.flume.source; import com.google.common.base.Preconditions; -import com.google.common.cache.LoadingCache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.nio.charset.Charset; import java.time.LocalDateTime; import java.time.Year; @@ -41,8 +36,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; - -import com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; @@ -54,303 +47,297 @@ import org.slf4j.LoggerFactory; @InterfaceStability.Evolving public class SyslogParser { - private static final Logger logger = - LoggerFactory.getLogger(SyslogParser.class); - - private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit - private static final Pattern TWO_SPACES = Pattern.compile(" "); - private static final DateTimeFormatter RFC3164_FORMAT = - new DateTimeFormatterBuilder() - .appendPattern("MMM d HH:mm:ss") - .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // Adds current year - .toFormatter(Locale.ENGLISH); // Always use English month names - private static final DateTimeFormatter RFC5424_FORMAT = - new DateTimeFormatterBuilder() - .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses YYYY-MM-DDTHH:mm:ss.SSSSSS - .optionalStart() // Start of optional timezone section - .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z - .optionalEnd() // End of optional timezone section - .toFormatter(); - - private static final int RFC3164_LEN = 15; - private static final int RFC5424_PREFIX_LEN = 19; - - private LoadingCache<String, Long> timestampCache; - - public SyslogParser() { - timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build( - new CacheLoader<String, Long>() { - - @Override - public Long load(String key) { - return ZonedDateTime.parse(key, RFC5424_FORMAT.withZone(ZoneOffset.UTC)).toInstant().toEpochMilli(); - } + private static final Logger logger = LoggerFactory.getLogger(SyslogParser.class); + + private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit + private static final Pattern TWO_SPACES = Pattern.compile(" "); + private static final DateTimeFormatter RFC3164_FORMAT = new DateTimeFormatterBuilder() + .appendPattern("MMM d HH:mm:ss") + .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // Adds current year + .toFormatter(Locale.ENGLISH); // Always use English month names + private static final DateTimeFormatter RFC5424_FORMAT = new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses YYYY-MM-DDTHH:mm:ss.SSSSSS + .optionalStart() // Start of optional timezone section + .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z + .optionalEnd() // End of optional timezone section + .toFormatter(); + + private static final int RFC3164_LEN = 15; + private static final int RFC5424_PREFIX_LEN = 19; + + private LoadingCache<String, Long> timestampCache; + + public SyslogParser() { + timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(new CacheLoader<String, Long>() { + + @Override + public Long load(String key) { + return ZonedDateTime.parse(key, RFC5424_FORMAT.withZone(ZoneOffset.UTC)) + .toInstant() + .toEpochMilli(); + } }); - } + } /** - * Parses a Flume Event out of a syslog message string. - * @param msg Syslog message, not including the newline character - * @return Parsed Flume Event - * @throws IllegalArgumentException if unable to successfully parse message - */ - public Event parseMessage(String msg, Charset charset, Set<String> keepFields) { - Map<String, String> headers = Maps.newHashMap(); - - int msgLen = msg.length(); - - int curPos = 0; - - Preconditions.checkArgument(msg.charAt(curPos) == '<', - "Bad format: invalid priority: cannot find open bracket '<' (%s)", msg); + * Parses a Flume Event out of a syslog message string. + * @param msg Syslog message, not including the newline character + * @return Parsed Flume Event + * @throws IllegalArgumentException if unable to successfully parse message + */ + public Event parseMessage(String msg, Charset charset, Set<String> keepFields) { + Map<String, String> headers = Maps.newHashMap(); - int endBracketPos = msg.indexOf('>'); - Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6, - "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg); + int msgLen = msg.length(); - String priority = msg.substring(1, endBracketPos); - int pri = Integer.parseInt(priority); - int facility = pri / 8; - int severity = pri % 8; + int curPos = 0; - // Remember priority - headers.put(SyslogUtils.SYSLOG_PRIORITY, priority); + Preconditions.checkArgument( + msg.charAt(curPos) == '<', "Bad format: invalid priority: cannot find open bracket '<' (%s)", msg); - // put fac / sev into header - headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility)); - headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity)); + int endBracketPos = msg.indexOf('>'); + Preconditions.checkArgument( + endBracketPos > 0 && endBracketPos <= 6, + "Bad format: invalid priority: cannot find end bracket '>' (%s)", + msg); - Preconditions.checkArgument(msgLen > endBracketPos + 1, - "Bad format: no data except priority (%s)", msg); + String priority = msg.substring(1, endBracketPos); + int pri = Integer.parseInt(priority); + int facility = pri / 8; + int severity = pri % 8; - // update parsing position - curPos = endBracketPos + 1; + // Remember priority + headers.put(SyslogUtils.SYSLOG_PRIORITY, priority); - // remember version string - String version = null; - if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) { - version = msg.substring(curPos, curPos + 1); - headers.put(SyslogUtils.SYSLOG_VERSION, version); - curPos += 2; - } + // put fac / sev into header + headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility)); + headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity)); - // now parse timestamp (handle different varieties) + Preconditions.checkArgument(msgLen > endBracketPos + 1, "Bad format: no data except priority (%s)", msg); - long ts; - String tsString; - char dateStartChar = msg.charAt(curPos); + // update parsing position + curPos = endBracketPos + 1; - try { - - // no timestamp specified; use relay current time - if (dateStartChar == '-') { - tsString = Character.toString(dateStartChar); - ts = System.currentTimeMillis(); - if (msgLen <= curPos + 2) { - throw new IllegalArgumentException( - "bad syslog format (missing hostname)"); + // remember version string + String version = null; + if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) { + version = msg.substring(curPos, curPos + 1); + headers.put(SyslogUtils.SYSLOG_VERSION, version); + curPos += 2; } - curPos += 2; // assume we skip past a space to get to the hostname - // rfc3164 timestamp - } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') { - if (msgLen <= curPos + RFC3164_LEN) { - throw new IllegalArgumentException("bad timestamp format"); + // now parse timestamp (handle different varieties) + + long ts; + String tsString; + char dateStartChar = msg.charAt(curPos); + + try { + + // no timestamp specified; use relay current time + if (dateStartChar == '-') { + tsString = Character.toString(dateStartChar); + ts = System.currentTimeMillis(); + if (msgLen <= curPos + 2) { + throw new IllegalArgumentException("bad syslog format (missing hostname)"); + } + curPos += 2; // assume we skip past a space to get to the hostname + + // rfc3164 timestamp + } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') { + if (msgLen <= curPos + RFC3164_LEN) { + throw new IllegalArgumentException("bad timestamp format"); + } + tsString = msg.substring(curPos, curPos + RFC3164_LEN); + ts = parseRfc3164Time(tsString); + curPos += RFC3164_LEN + 1; + + // rfc 5424 timestamp + } else { + int nextSpace = msg.indexOf(' ', curPos); + if (nextSpace == -1) { + throw new IllegalArgumentException("bad timestamp format"); + } + tsString = msg.substring(curPos, nextSpace); + ts = parseRfc5424Date(tsString); + curPos = nextSpace + 1; + } + + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to parse message: " + msg, ex); } - tsString = msg.substring(curPos, curPos + RFC3164_LEN); - ts = parseRfc3164Time(tsString); - curPos += RFC3164_LEN + 1; - // rfc 5424 timestamp - } else { + headers.put("timestamp", String.valueOf(ts)); + + // parse out hostname int nextSpace = msg.indexOf(' ', curPos); if (nextSpace == -1) { - throw new IllegalArgumentException("bad timestamp format"); + throw new IllegalArgumentException("bad syslog format (missing hostname)"); + } + // copy the host string to avoid holding the message string in memory + // if using a memory-based queue + String hostname = new String(msg.substring(curPos, nextSpace)); + headers.put("host", hostname); + + // EventBuilder will do a copy of its own, so no defensive copy of the body + String data = ""; + if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) { + curPos = nextSpace + 1; + data = msg.substring(curPos); + data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version, tsString, hostname); + } else { + data = msg; } - tsString = msg.substring(curPos, nextSpace); - ts = parseRfc5424Date(tsString); - curPos = nextSpace + 1; - } - - } catch (IllegalArgumentException ex) { - throw new IllegalArgumentException("Unable to parse message: " + msg, ex); - } - - headers.put("timestamp", String.valueOf(ts)); - // parse out hostname - int nextSpace = msg.indexOf(' ', curPos); - if (nextSpace == -1) { - throw new IllegalArgumentException( - "bad syslog format (missing hostname)"); - } - // copy the host string to avoid holding the message string in memory - // if using a memory-based queue - String hostname = new String(msg.substring(curPos, nextSpace)); - headers.put("host", hostname); - - // EventBuilder will do a copy of its own, so no defensive copy of the body - String data = ""; - if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) { - curPos = nextSpace + 1; - data = msg.substring(curPos); - data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version, tsString, hostname); - } else { - data = msg; - } + Event event = EventBuilder.withBody(data, charset, headers); - Event event = EventBuilder.withBody(data, charset, headers); - - return event; - } - - /** - * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for - * multiple messages that occur in the same second. - * @param msg - * @return Typical (for Java) milliseconds since UNIX epoch - */ - protected long parseRfc5424Date(String msg) { - Long ts; - int curPos = 0; - - int msgLen = msg.length(); - Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, - "Bad format: Not a valid RFC5424 timestamp: %s", msg); - String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN); - - try { - ts = timestampCache.get(timestampPrefix); - } catch (ExecutionException | UncheckedExecutionException ex) { - throw new IllegalArgumentException("bad timestamp format", ex.getCause()); + return event; } - curPos += RFC5424_PREFIX_LEN; - - // look for the optional fractional seconds - if (msg.charAt(curPos) == '.') { - // figure out how many numeric digits - boolean foundEnd = false; - int endMillisPos = curPos + 1; - - if (msgLen <= endMillisPos) { - throw new IllegalArgumentException("bad timestamp format (no TZ)"); - } - - // FIXME: TODO: ensure we handle all bad formatting cases - while (!foundEnd) { - char curDigit = msg.charAt(endMillisPos); - if (curDigit >= '0' && curDigit <= '9') { - endMillisPos++; - } else { - foundEnd = true; - } - } - - // if they had a valid fractional second, append it rounded to millis - final int fractionalPositions = endMillisPos - (curPos + 1); - if (fractionalPositions > 0) { - long milliseconds = Long.parseLong(msg.substring(curPos + 1, endMillisPos)); - if (fractionalPositions > 3) { - milliseconds /= Math.pow(10, (fractionalPositions - 3)); - } else if (fractionalPositions < 3) { - milliseconds *= Math.pow(10, (3 - fractionalPositions)); + /** + * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for + * multiple messages that occur in the same second. + * @param msg + * @return Typical (for Java) milliseconds since UNIX epoch + */ + protected long parseRfc5424Date(String msg) { + Long ts; + int curPos = 0; + + int msgLen = msg.length(); + Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, "Bad format: Not a valid RFC5424 timestamp: %s", msg); + String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN); + + try { + ts = timestampCache.get(timestampPrefix); + } catch (ExecutionException | UncheckedExecutionException ex) { + throw new IllegalArgumentException("bad timestamp format", ex.getCause()); } - ts += milliseconds; - } else { - throw new IllegalArgumentException( - "Bad format: Invalid timestamp (fractional portion): " + msg); - } - - curPos = endMillisPos; - } - // look for timezone - char tzFirst = msg.charAt(curPos); - - // UTC - if (tzFirst == 'Z') { - // no-op - } else if (tzFirst == '+' || tzFirst == '-') { - - Preconditions.checkArgument(msgLen > curPos + 5, - "Bad format: Invalid timezone (%s)", msg); - - int polarity; - if (tzFirst == '+') { - polarity = +1; - } else { - polarity = -1; - } - - char[] h = new char[5]; - for (int i = 0; i < 5; i++) { - h[i] = msg.charAt(curPos + 1 + i); - } - - if (h[0] >= '0' && h[0] <= '9' - && h[1] >= '0' && h[1] <= '9' - && h[2] == ':' - && h[3] >= '0' && h[3] <= '9' - && h[4] >= '0' && h[4] <= '9') { - int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 3)); - int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 6)); - ts -= polarity * ((hourOffset * 60) + minOffset) * 60000; - } else { - throw new IllegalArgumentException( - "Bad format: Invalid timezone: " + msg); - } + curPos += RFC5424_PREFIX_LEN; + + // look for the optional fractional seconds + if (msg.charAt(curPos) == '.') { + // figure out how many numeric digits + boolean foundEnd = false; + int endMillisPos = curPos + 1; + + if (msgLen <= endMillisPos) { + throw new IllegalArgumentException("bad timestamp format (no TZ)"); + } + + // FIXME: TODO: ensure we handle all bad formatting cases + while (!foundEnd) { + char curDigit = msg.charAt(endMillisPos); + if (curDigit >= '0' && curDigit <= '9') { + endMillisPos++; + } else { + foundEnd = true; + } + } + + // if they had a valid fractional second, append it rounded to millis + final int fractionalPositions = endMillisPos - (curPos + 1); + if (fractionalPositions > 0) { + long milliseconds = Long.parseLong(msg.substring(curPos + 1, endMillisPos)); + if (fractionalPositions > 3) { + milliseconds /= Math.pow(10, (fractionalPositions - 3)); + } else if (fractionalPositions < 3) { + milliseconds *= Math.pow(10, (3 - fractionalPositions)); + } + ts += milliseconds; + } else { + throw new IllegalArgumentException("Bad format: Invalid timestamp (fractional portion): " + msg); + } + + curPos = endMillisPos; + } - } + // look for timezone + char tzFirst = msg.charAt(curPos); + + // UTC + if (tzFirst == 'Z') { + // no-op + } else if (tzFirst == '+' || tzFirst == '-') { + + Preconditions.checkArgument(msgLen > curPos + 5, "Bad format: Invalid timezone (%s)", msg); + + int polarity; + if (tzFirst == '+') { + polarity = +1; + } else { + polarity = -1; + } + + char[] h = new char[5]; + for (int i = 0; i < 5; i++) { + h[i] = msg.charAt(curPos + 1 + i); + } + + if (h[0] >= '0' + && h[0] <= '9' + && h[1] >= '0' + && h[1] <= '9' + && h[2] == ':' + && h[3] >= '0' + && h[3] <= '9' + && h[4] >= '0' + && h[4] <= '9') { + int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 3)); + int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 6)); + ts -= polarity * ((hourOffset * 60) + minOffset) * 60000; + } else { + throw new IllegalArgumentException("Bad format: Invalid timezone: " + msg); + } + } - return ts; - } - - /** - * Parse the RFC3164 date format. This is trickier than it sounds because this - * format does not specify a year so we get weird edge cases at year - * boundaries. This implementation tries to "do what I mean". - * @param ts RFC3164-compatible timestamp to be parsed - * @return Typical (for Java) milliseconds since the UNIX epoch - */ - protected long parseRfc3164Time(String ts) { - LocalDateTime now = LocalDateTime.now(); - int year = now.getYear(); - - ts = TWO_SPACES.matcher(ts).replaceFirst(" "); - - LocalDateTime date; - try { - date = LocalDateTime.parse(ts, RFC3164_FORMAT); - } catch (DateTimeParseException e) { - logger.debug("rfc3164 date parse failed on (" + ts + "): invalid format", e); - return 0; + return ts; } - // rfc3164 dates are really dumb. - /* - * Some code to try and add some smarts to the year insertion as without a year in the message - * we need to make some educated guessing. - * First set the "fixed" to be the timestamp with the current year. - * If the "fixed" time is more than one month in the future then roll it back a year. - * If the "fixed" time is more than eleven months in the past then roll it forward a year. - * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of - * timestamps. + /** + * Parse the RFC3164 date format. This is trickier than it sounds because this + * format does not specify a year so we get weird edge cases at year + * boundaries. This implementation tries to "do what I mean". + * @param ts RFC3164-compatible timestamp to be parsed + * @return Typical (for Java) milliseconds since the UNIX epoch */ + protected long parseRfc3164Time(String ts) { + LocalDateTime now = LocalDateTime.now(); + int year = now.getYear(); + + ts = TWO_SPACES.matcher(ts).replaceFirst(" "); + + LocalDateTime date; + try { + date = LocalDateTime.parse(ts, RFC3164_FORMAT); + } catch (DateTimeParseException e) { + logger.debug("rfc3164 date parse failed on (" + ts + "): invalid format", e); + return 0; + } - LocalDateTime fixed = date.withYear(year); + // rfc3164 dates are really dumb. + /* + * Some code to try and add some smarts to the year insertion as without a year in the message + * we need to make some educated guessing. + * First set the "fixed" to be the timestamp with the current year. + * If the "fixed" time is more than one month in the future then roll it back a year. + * If the "fixed" time is more than eleven months in the past then roll it forward a year. + * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of + * timestamps. + */ + + LocalDateTime fixed = date.withYear(year); + + // flume clock is ahead or there is some latency, and the year rolled + if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { + fixed = date.minusYears(1); + // flume clock is behind and the year rolled + } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { + fixed = date.plusYears(1); + } + date = fixed; - // flume clock is ahead or there is some latency, and the year rolled - if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { - fixed = date.minusYears(1); - // flume clock is behind and the year rolled - } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { - fixed = date.plusYears(1); + return date.toInstant(ZoneOffset.UTC).toEpochMilli(); } - date = fixed; - - return date.toInstant(ZoneOffset.UTC).toEpochMilli(); - } - - }
