This is an automated email from the ASF dual-hosted git repository.

ppkarwasz pushed a commit to branch fix/syslog-parser
in repository https://gitbox.apache.org/repos/asf/logging-flume.git

commit 75dba7c4b4808e40d61ac869f38ff07c38dae521
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Fri Jun 5 18:20:46 2026 +0200

    Format `SyslogParser`
---
 .../java/org/apache/flume/source/SyslogParser.java | 551 ++++++++++-----------
 1 file changed, 269 insertions(+), 282 deletions(-)

diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index 6019573c9..e0b261a2c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-
 package org.apache.flume.source;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.LoadingCache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.charset.Charset;
 import java.time.LocalDateTime;
 import java.time.Year;
@@ -41,8 +36,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
-
-import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
@@ -54,303 +47,297 @@ import org.slf4j.LoggerFactory;
 @InterfaceStability.Evolving
 public class SyslogParser {
 
-  private static final Logger logger =
-      LoggerFactory.getLogger(SyslogParser.class);
-
-  private static final int TS_CACHE_MAX = 1000;  // timestamp cache size limit
-  private static final Pattern TWO_SPACES = Pattern.compile("  ");
-  private static final DateTimeFormatter RFC3164_FORMAT =
-          new DateTimeFormatterBuilder()
-                  .appendPattern("MMM d HH:mm:ss")
-                  .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // 
Adds current year
-                  .toFormatter(Locale.ENGLISH); // Always use English month 
names
-  private static final DateTimeFormatter RFC5424_FORMAT =
-          new DateTimeFormatterBuilder()
-                  .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses 
YYYY-MM-DDTHH:mm:ss.SSSSSS
-                  .optionalStart()                               // Start of 
optional timezone section
-                  .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
-                  .optionalEnd()                                 // End of 
optional timezone section
-                  .toFormatter();
-
-  private static final int RFC3164_LEN = 15;
-  private static final int RFC5424_PREFIX_LEN = 19;
-
-  private LoadingCache<String, Long> timestampCache;
-
-  public SyslogParser() {
-    timestampCache = CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(
-        new CacheLoader<String, Long>() {
-
-          @Override
-          public Long load(String key) {
-              return ZonedDateTime.parse(key, 
RFC5424_FORMAT.withZone(ZoneOffset.UTC)).toInstant().toEpochMilli();
-          }
+    private static final Logger logger = 
LoggerFactory.getLogger(SyslogParser.class);
+
+    private static final int TS_CACHE_MAX = 1000; // timestamp cache size limit
+    private static final Pattern TWO_SPACES = Pattern.compile("  ");
+    private static final DateTimeFormatter RFC3164_FORMAT = new 
DateTimeFormatterBuilder()
+            .appendPattern("MMM d HH:mm:ss")
+            .parseDefaulting(ChronoField.YEAR, Year.now().getValue()) // Adds 
current year
+            .toFormatter(Locale.ENGLISH); // Always use English month names
+    private static final DateTimeFormatter RFC5424_FORMAT = new 
DateTimeFormatterBuilder()
+            .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) // Parses 
YYYY-MM-DDTHH:mm:ss.SSSSSS
+            .optionalStart() // Start of optional timezone section
+            .appendOffset("+HH:MM", "Z") // Handles +00:00, -05:00, or Z
+            .optionalEnd() // End of optional timezone section
+            .toFormatter();
+
+    private static final int RFC3164_LEN = 15;
+    private static final int RFC5424_PREFIX_LEN = 19;
+
+    private LoadingCache<String, Long> timestampCache;
+
+    public SyslogParser() {
+        timestampCache = 
CacheBuilder.newBuilder().maximumSize(TS_CACHE_MAX).build(new 
CacheLoader<String, Long>() {
+
+            @Override
+            public Long load(String key) {
+                return ZonedDateTime.parse(key, 
RFC5424_FORMAT.withZone(ZoneOffset.UTC))
+                        .toInstant()
+                        .toEpochMilli();
+            }
         });
-  }
+    }
 
     /**
-   * Parses a Flume Event out of a syslog message string.
-   * @param msg Syslog message, not including the newline character
-   * @return Parsed Flume Event
-   * @throws IllegalArgumentException if unable to successfully parse message
-   */
-  public Event parseMessage(String msg, Charset charset, Set<String> 
keepFields) {
-    Map<String, String> headers = Maps.newHashMap();
-
-    int msgLen = msg.length();
-
-    int curPos = 0;
-
-    Preconditions.checkArgument(msg.charAt(curPos) == '<',
-        "Bad format: invalid priority: cannot find open bracket '<' (%s)", 
msg);
+     * Parses a Flume Event out of a syslog message string.
+     * @param msg Syslog message, not including the newline character
+     * @return Parsed Flume Event
+     * @throws IllegalArgumentException if unable to successfully parse message
+     */
+    public Event parseMessage(String msg, Charset charset, Set<String> 
keepFields) {
+        Map<String, String> headers = Maps.newHashMap();
 
-    int endBracketPos = msg.indexOf('>');
-    Preconditions.checkArgument(endBracketPos > 0 && endBracketPos <= 6,
-        "Bad format: invalid priority: cannot find end bracket '>' (%s)", msg);
+        int msgLen = msg.length();
 
-    String priority = msg.substring(1, endBracketPos);
-    int pri = Integer.parseInt(priority);
-    int facility = pri / 8;
-    int severity = pri % 8;
+        int curPos = 0;
 
-    // Remember priority
-    headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
+        Preconditions.checkArgument(
+                msg.charAt(curPos) == '<', "Bad format: invalid priority: 
cannot find open bracket '<' (%s)", msg);
 
-    // put fac / sev into header
-    headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
-    headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
+        int endBracketPos = msg.indexOf('>');
+        Preconditions.checkArgument(
+                endBracketPos > 0 && endBracketPos <= 6,
+                "Bad format: invalid priority: cannot find end bracket '>' 
(%s)",
+                msg);
 
-    Preconditions.checkArgument(msgLen > endBracketPos + 1,
-        "Bad format: no data except priority (%s)", msg);
+        String priority = msg.substring(1, endBracketPos);
+        int pri = Integer.parseInt(priority);
+        int facility = pri / 8;
+        int severity = pri % 8;
 
-    // update parsing position
-    curPos = endBracketPos + 1;
+        // Remember priority
+        headers.put(SyslogUtils.SYSLOG_PRIORITY, priority);
 
-    // remember version string
-    String version = null;
-    if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) 
{
-      version = msg.substring(curPos, curPos + 1);
-      headers.put(SyslogUtils.SYSLOG_VERSION, version);
-      curPos += 2;
-    }
+        // put fac / sev into header
+        headers.put(SyslogUtils.SYSLOG_FACILITY, String.valueOf(facility));
+        headers.put(SyslogUtils.SYSLOG_SEVERITY, String.valueOf(severity));
 
-    // now parse timestamp (handle different varieties)
+        Preconditions.checkArgument(msgLen > endBracketPos + 1, "Bad format: 
no data except priority (%s)", msg);
 
-    long ts;
-    String tsString;
-    char dateStartChar = msg.charAt(curPos);
+        // update parsing position
+        curPos = endBracketPos + 1;
 
-    try {
-
-      // no timestamp specified; use relay current time
-      if (dateStartChar == '-') {
-        tsString = Character.toString(dateStartChar);
-        ts = System.currentTimeMillis();
-        if (msgLen <= curPos + 2) {
-          throw new IllegalArgumentException(
-              "bad syslog format (missing hostname)");
+        // remember version string
+        String version = null;
+        if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 
2))) {
+            version = msg.substring(curPos, curPos + 1);
+            headers.put(SyslogUtils.SYSLOG_VERSION, version);
+            curPos += 2;
         }
-        curPos += 2; // assume we skip past a space to get to the hostname
 
-      // rfc3164 timestamp
-      } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
-        if (msgLen <= curPos + RFC3164_LEN) {
-          throw new IllegalArgumentException("bad timestamp format");
+        // now parse timestamp (handle different varieties)
+
+        long ts;
+        String tsString;
+        char dateStartChar = msg.charAt(curPos);
+
+        try {
+
+            // no timestamp specified; use relay current time
+            if (dateStartChar == '-') {
+                tsString = Character.toString(dateStartChar);
+                ts = System.currentTimeMillis();
+                if (msgLen <= curPos + 2) {
+                    throw new IllegalArgumentException("bad syslog format 
(missing hostname)");
+                }
+                curPos += 2; // assume we skip past a space to get to the 
hostname
+
+                // rfc3164 timestamp
+            } else if (dateStartChar >= 'A' && dateStartChar <= 'Z') {
+                if (msgLen <= curPos + RFC3164_LEN) {
+                    throw new IllegalArgumentException("bad timestamp format");
+                }
+                tsString = msg.substring(curPos, curPos + RFC3164_LEN);
+                ts = parseRfc3164Time(tsString);
+                curPos += RFC3164_LEN + 1;
+
+                // rfc 5424 timestamp
+            } else {
+                int nextSpace = msg.indexOf(' ', curPos);
+                if (nextSpace == -1) {
+                    throw new IllegalArgumentException("bad timestamp format");
+                }
+                tsString = msg.substring(curPos, nextSpace);
+                ts = parseRfc5424Date(tsString);
+                curPos = nextSpace + 1;
+            }
+
+        } catch (IllegalArgumentException ex) {
+            throw new IllegalArgumentException("Unable to parse message: " + 
msg, ex);
         }
-        tsString = msg.substring(curPos, curPos + RFC3164_LEN);
-        ts = parseRfc3164Time(tsString);
-        curPos += RFC3164_LEN + 1;
 
-      // rfc 5424 timestamp
-      } else {
+        headers.put("timestamp", String.valueOf(ts));
+
+        // parse out hostname
         int nextSpace = msg.indexOf(' ', curPos);
         if (nextSpace == -1) {
-          throw new IllegalArgumentException("bad timestamp format");
+            throw new IllegalArgumentException("bad syslog format (missing 
hostname)");
+        }
+        // copy the host string to avoid holding the message string in memory
+        // if using a memory-based queue
+        String hostname = new String(msg.substring(curPos, nextSpace));
+        headers.put("host", hostname);
+
+        // EventBuilder will do a copy of its own, so no defensive copy of the 
body
+        String data = "";
+        if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
+            curPos = nextSpace + 1;
+            data = msg.substring(curPos);
+            data = SyslogUtils.addFieldsToBody(keepFields, data, priority, 
version, tsString, hostname);
+        } else {
+            data = msg;
         }
-        tsString = msg.substring(curPos, nextSpace);
-        ts = parseRfc5424Date(tsString);
-        curPos = nextSpace + 1;
-      }
-
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException("Unable to parse message: " + msg, 
ex);
-    }
-
-    headers.put("timestamp", String.valueOf(ts));
 
-    // parse out hostname
-    int nextSpace = msg.indexOf(' ', curPos);
-    if (nextSpace == -1) {
-      throw new IllegalArgumentException(
-          "bad syslog format (missing hostname)");
-    }
-    // copy the host string to avoid holding the message string in memory
-    // if using a memory-based queue
-    String hostname = new String(msg.substring(curPos, nextSpace));
-    headers.put("host", hostname);
-
-    // EventBuilder will do a copy of its own, so no defensive copy of the body
-    String data = "";
-    if (msgLen > nextSpace + 1 && !SyslogUtils.keepAllFields(keepFields)) {
-      curPos = nextSpace + 1;
-      data = msg.substring(curPos);
-      data = SyslogUtils.addFieldsToBody(keepFields, data, priority, version, 
tsString, hostname);
-    } else {
-      data = msg;
-    }
+        Event event = EventBuilder.withBody(data, charset, headers);
 
-    Event event = EventBuilder.withBody(data, charset, headers);
-
-    return event;
-  }
-
-  /**
-   * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
-   * multiple messages that occur in the same second.
-   * @param msg
-   * @return Typical (for Java) milliseconds since UNIX epoch
-   */
-  protected long parseRfc5424Date(String msg) {
-    Long ts;
-    int curPos = 0;
-
-    int msgLen = msg.length();
-    Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN,
-        "Bad format: Not a valid RFC5424 timestamp: %s", msg);
-    String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
-
-    try {
-      ts = timestampCache.get(timestampPrefix);
-    } catch (ExecutionException | UncheckedExecutionException ex) {
-      throw new IllegalArgumentException("bad timestamp format", 
ex.getCause());
+        return event;
     }
 
-    curPos += RFC5424_PREFIX_LEN;
-
-    // look for the optional fractional seconds
-    if (msg.charAt(curPos) == '.') {
-      // figure out how many numeric digits
-      boolean foundEnd = false;
-      int endMillisPos = curPos + 1;
-
-      if (msgLen <= endMillisPos) {
-        throw new IllegalArgumentException("bad timestamp format (no TZ)");
-      }
-
-      // FIXME: TODO: ensure we handle all bad formatting cases
-      while (!foundEnd) {
-        char curDigit = msg.charAt(endMillisPos);
-        if (curDigit >= '0' && curDigit <= '9') {
-          endMillisPos++;
-        } else {
-          foundEnd = true;
-        }
-      }
-
-      // if they had a valid fractional second, append it rounded to millis
-      final int fractionalPositions = endMillisPos - (curPos + 1);
-      if (fractionalPositions > 0) {
-        long milliseconds = Long.parseLong(msg.substring(curPos + 1, 
endMillisPos));
-        if (fractionalPositions > 3) {
-          milliseconds /= Math.pow(10, (fractionalPositions - 3));
-        } else if (fractionalPositions < 3) {
-          milliseconds *= Math.pow(10, (3 - fractionalPositions));
+    /**
+     * Parse date in RFC 5424 format. Uses an LRU cache to speed up parsing for
+     * multiple messages that occur in the same second.
+     * @param msg
+     * @return Typical (for Java) milliseconds since UNIX epoch
+     */
+    protected long parseRfc5424Date(String msg) {
+        Long ts;
+        int curPos = 0;
+
+        int msgLen = msg.length();
+        Preconditions.checkArgument(msgLen > RFC5424_PREFIX_LEN, "Bad format: 
Not a valid RFC5424 timestamp: %s", msg);
+        String timestampPrefix = msg.substring(curPos, RFC5424_PREFIX_LEN);
+
+        try {
+            ts = timestampCache.get(timestampPrefix);
+        } catch (ExecutionException | UncheckedExecutionException ex) {
+            throw new IllegalArgumentException("bad timestamp format", 
ex.getCause());
         }
-        ts += milliseconds;
-      } else {
-        throw new IllegalArgumentException(
-            "Bad format: Invalid timestamp (fractional portion): " + msg);
-      }
-
-      curPos = endMillisPos;
-    }
 
-    // look for timezone
-    char tzFirst = msg.charAt(curPos);
-
-    // UTC
-    if (tzFirst == 'Z') {
-      // no-op
-    } else if (tzFirst == '+' || tzFirst == '-') {
-
-      Preconditions.checkArgument(msgLen > curPos + 5,
-          "Bad format: Invalid timezone (%s)", msg);
-
-      int polarity;
-      if (tzFirst == '+') {
-        polarity = +1;
-      } else {
-        polarity = -1;
-      }
-
-      char[] h = new char[5];
-      for (int i = 0; i < 5; i++) {
-        h[i] = msg.charAt(curPos + 1 + i);
-      }
-
-      if (h[0] >= '0' && h[0] <= '9'
-          && h[1] >= '0' && h[1] <= '9'
-          && h[2] == ':'
-          && h[3] >= '0' && h[3] <= '9'
-          && h[4] >= '0' && h[4] <= '9') {
-        int hourOffset = Integer.parseInt(msg.substring(curPos + 1, curPos + 
3));
-        int minOffset = Integer.parseInt(msg.substring(curPos + 4, curPos + 
6));
-        ts -= polarity * ((hourOffset * 60) + minOffset) * 60000;
-      } else {
-        throw new IllegalArgumentException(
-            "Bad format: Invalid timezone: " + msg);
-      }
+        curPos += RFC5424_PREFIX_LEN;
+
+        // look for the optional fractional seconds
+        if (msg.charAt(curPos) == '.') {
+            // figure out how many numeric digits
+            boolean foundEnd = false;
+            int endMillisPos = curPos + 1;
+
+            if (msgLen <= endMillisPos) {
+                throw new IllegalArgumentException("bad timestamp format (no 
TZ)");
+            }
+
+            // FIXME: TODO: ensure we handle all bad formatting cases
+            while (!foundEnd) {
+                char curDigit = msg.charAt(endMillisPos);
+                if (curDigit >= '0' && curDigit <= '9') {
+                    endMillisPos++;
+                } else {
+                    foundEnd = true;
+                }
+            }
+
+            // if they had a valid fractional second, append it rounded to 
millis
+            final int fractionalPositions = endMillisPos - (curPos + 1);
+            if (fractionalPositions > 0) {
+                long milliseconds = Long.parseLong(msg.substring(curPos + 1, 
endMillisPos));
+                if (fractionalPositions > 3) {
+                    milliseconds /= Math.pow(10, (fractionalPositions - 3));
+                } else if (fractionalPositions < 3) {
+                    milliseconds *= Math.pow(10, (3 - fractionalPositions));
+                }
+                ts += milliseconds;
+            } else {
+                throw new IllegalArgumentException("Bad format: Invalid 
timestamp (fractional portion): " + msg);
+            }
+
+            curPos = endMillisPos;
+        }
 
-    }
+        // look for timezone
+        char tzFirst = msg.charAt(curPos);
+
+        // UTC
+        if (tzFirst == 'Z') {
+            // no-op
+        } else if (tzFirst == '+' || tzFirst == '-') {
+
+            Preconditions.checkArgument(msgLen > curPos + 5, "Bad format: 
Invalid timezone (%s)", msg);
+
+            int polarity;
+            if (tzFirst == '+') {
+                polarity = +1;
+            } else {
+                polarity = -1;
+            }
+
+            char[] h = new char[5];
+            for (int i = 0; i < 5; i++) {
+                h[i] = msg.charAt(curPos + 1 + i);
+            }
+
+            if (h[0] >= '0'
+                    && h[0] <= '9'
+                    && h[1] >= '0'
+                    && h[1] <= '9'
+                    && h[2] == ':'
+                    && h[3] >= '0'
+                    && h[3] <= '9'
+                    && h[4] >= '0'
+                    && h[4] <= '9') {
+                int hourOffset = Integer.parseInt(msg.substring(curPos + 1, 
curPos + 3));
+                int minOffset = Integer.parseInt(msg.substring(curPos + 4, 
curPos + 6));
+                ts -= polarity * ((hourOffset * 60) + minOffset) * 60000;
+            } else {
+                throw new IllegalArgumentException("Bad format: Invalid 
timezone: " + msg);
+            }
+        }
 
-    return ts;
-  }
-
-  /**
-   * Parse the RFC3164 date format. This is trickier than it sounds because 
this
-   * format does not specify a year so we get weird edge cases at year
-   * boundaries. This implementation tries to "do what I mean".
-   * @param ts RFC3164-compatible timestamp to be parsed
-   * @return Typical (for Java) milliseconds since the UNIX epoch
-   */
-  protected long parseRfc3164Time(String ts) {
-    LocalDateTime now = LocalDateTime.now();
-    int year = now.getYear();
-
-    ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
-
-    LocalDateTime date;
-    try {
-      date = LocalDateTime.parse(ts, RFC3164_FORMAT);
-    } catch (DateTimeParseException e) {
-      logger.debug("rfc3164 date parse failed on (" + ts + "): invalid 
format", e);
-      return 0;
+        return ts;
     }
 
-    // rfc3164 dates are really dumb.
-    /*
-     * Some code to try and add some smarts to the year insertion as without a 
year in the message
-     * we need to make some educated guessing.
-     * First set the "fixed" to be the timestamp with the current year.
-     * If the "fixed" time is more than one month in the future then roll it 
back a year.
-     * If the "fixed" time is more than eleven months in the past then roll it 
forward a year.
-     * This gives us a 12 month rolling window (11 months in the past, 1 month 
in the future) of
-     * timestamps.
+    /**
+     * Parse the RFC3164 date format. This is trickier than it sounds because 
this
+     * format does not specify a year so we get weird edge cases at year
+     * boundaries. This implementation tries to "do what I mean".
+     * @param ts RFC3164-compatible timestamp to be parsed
+     * @return Typical (for Java) milliseconds since the UNIX epoch
      */
+    protected long parseRfc3164Time(String ts) {
+        LocalDateTime now = LocalDateTime.now();
+        int year = now.getYear();
+
+        ts = TWO_SPACES.matcher(ts).replaceFirst(" ");
+
+        LocalDateTime date;
+        try {
+            date = LocalDateTime.parse(ts, RFC3164_FORMAT);
+        } catch (DateTimeParseException e) {
+            logger.debug("rfc3164 date parse failed on (" + ts + "): invalid 
format", e);
+            return 0;
+        }
 
-    LocalDateTime fixed = date.withYear(year);
+        // rfc3164 dates are really dumb.
+        /*
+         * Some code to try and add some smarts to the year insertion as 
without a year in the message
+         * we need to make some educated guessing.
+         * First set the "fixed" to be the timestamp with the current year.
+         * If the "fixed" time is more than one month in the future then roll 
it back a year.
+         * If the "fixed" time is more than eleven months in the past then 
roll it forward a year.
+         * This gives us a 12 month rolling window (11 months in the past, 1 
month in the future) of
+         * timestamps.
+         */
+
+        LocalDateTime fixed = date.withYear(year);
+
+        // flume clock is ahead or there is some latency, and the year rolled
+        if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
+            fixed = date.minusYears(1);
+            // flume clock is behind and the year rolled
+        } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
+            fixed = date.plusYears(1);
+        }
+        date = fixed;
 
-    // flume clock is ahead or there is some latency, and the year rolled
-    if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
-      fixed = date.minusYears(1);
-    // flume clock is behind and the year rolled
-    } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
-      fixed = date.plusYears(1);
+        return date.toInstant(ZoneOffset.UTC).toEpochMilli();
     }
-    date = fixed;
-
-    return date.toInstant(ZoneOffset.UTC).toEpochMilli();
-  }
-
-
 }

Reply via email to