Repository: flume Updated Branches: refs/heads/trunk 8324af363 -> 846676653
FLUME-3201. Fix SyslogUtil to handle RFC3164 format in December correctly This closes #188 Reviewers: Tristan Stevens, Miklos Csanady (Ferenc Szabo via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/84667665 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/84667665 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/84667665 Branch: refs/heads/trunk Commit: 8466766538ed7e8325abf81392008b1d9de8a1f6 Parents: 8324af3 Author: Ferenc Szabo <[email protected]> Authored: Wed Dec 6 09:39:51 2017 +0100 Committer: Denes Arvay <[email protected]> Committed: Wed Dec 6 09:39:51 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flume/source/SyslogUtils.java | 36 ++++++-- .../apache/flume/source/TestSyslogUtils.java | 87 +++++++++++++------- 2 files changed, 86 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/84667665/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 43a10e1..e9d4d8f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -93,6 +94,7 @@ public class SyslogUtils { public static final Integer MIN_SIZE = 10; public static final Integer DEFAULT_SIZE = 2500; private final boolean isUdp; + private Clock clock; private boolean isBadEvent; private boolean isIncompleteEvent; private Integer maxSize; @@ -190,12 +192,21 @@ public class SyslogUtils { public SyslogUtils(boolean isUdp) { this(DEFAULT_SIZE, - new HashSet<String>(Arrays.asList(SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)), + new HashSet<>(Arrays.asList(SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS)), isUdp); } - public SyslogUtils(Integer eventSize, Set<String> keepFields, boolean isUdp) { + public SyslogUtils(Integer defaultSize, Set<String> keepFields, boolean isUdp) { + this(defaultSize, + keepFields, + isUdp, + Clock.system(Clock.systemDefaultZone().getZone()) + ); + } + + public SyslogUtils(Integer eventSize, Set<String> keepFields, boolean isUdp, Clock clock) { this.isUdp = isUdp; + this.clock = clock; isBadEvent = false; isIncompleteEvent = false; maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize; @@ -370,6 +381,7 @@ public class SyslogUtils { for (int dt = 0; dt < fmt.dateFormat.size(); dt++) { try { Date parsedDate = fmt.dateFormat.get(dt).parse(value); + /* * Some code to try and add some smarts to the year insertion. * Original code just added the current year which was okay-ish, but around @@ -384,8 +396,12 @@ public class SyslogUtils { * 1 month in the future) of timestamps. */ if (fmt.addYear) { - Calendar cal = Calendar.getInstance(); - cal.setTime(parsedDate); + // Parsing from dateformatter without year part would use system clock + // so we have to set the year part from the used clock instance + parsedDate.setYear(new Date(clock.millis()).getYear()); + + Calendar calParsed = Calendar.getInstance(); + calParsed.setTime(parsedDate); Calendar calMinusOneMonth = Calendar.getInstance(); calMinusOneMonth.setTime(parsedDate); calMinusOneMonth.add(Calendar.MONTH, -1); @@ -394,19 +410,21 @@ public class SyslogUtils { calPlusElevenMonths.setTime(parsedDate); calPlusElevenMonths.add(Calendar.MONTH, +11); - if (cal.getTimeInMillis() > System.currentTimeMillis() && - calMinusOneMonth.getTimeInMillis() > System.currentTimeMillis()) { + long currentTimeMillis = clock.millis(); + + if (calParsed.getTimeInMillis() > currentTimeMillis && + calMinusOneMonth.getTimeInMillis() > currentTimeMillis) { //Need to roll back a year Calendar c1 = Calendar.getInstance(); c1.setTime(parsedDate); c1.add(Calendar.YEAR, -1); parsedDate = c1.getTime(); - } else if (cal.getTimeInMillis() < System.currentTimeMillis() && - calPlusElevenMonths.getTimeInMillis() < System.currentTimeMillis()) { + } else if (calParsed.getTimeInMillis() < currentTimeMillis && + calPlusElevenMonths.getTimeInMillis() < currentTimeMillis) { //Need to roll forward a year Calendar c1 = Calendar.getInstance(); c1.setTime(parsedDate); - c1.add(Calendar.YEAR, -1); + c1.add(Calendar.YEAR, +1); parsedDate = c1.getTime(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/84667665/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 80d8dac..2479413 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -26,6 +26,9 @@ import org.junit.Test; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Clock; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Calendar; import java.util.Date; import java.util.HashSet; @@ -130,7 +133,7 @@ public class TestSyslogUtils { @Test public void TestHeader9() throws ParseException { - SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss"); + SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss", Locale.ENGLISH); Calendar cal = Calendar.getInstance(); String year = String.valueOf(cal.get(Calendar.YEAR)); @@ -145,7 +148,7 @@ public class TestSyslogUtils { @Test public void TestHeader10() throws ParseException { - SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss"); + SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss", Locale.ENGLISH); Calendar cal = Calendar.getInstance(); String year = String.valueOf(cal.get(Calendar.YEAR)); @@ -175,7 +178,7 @@ public class TestSyslogUtils { @Test public void TestRfc3164HeaderApacheLogWithNulls() throws ParseException { - SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss"); + SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss", Locale.ENGLISH); Calendar cal = Calendar.getInstance(); String year = String.valueOf(cal.get(Calendar.YEAR)); @@ -196,41 +199,53 @@ public class TestSyslogUtils { */ @Test public void TestRfc3164Dates() throws ParseException { - for (int i = -10; i <= 1; i++) { - SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss"); - Date date = new Date(System.currentTimeMillis()); - Calendar cal = Calendar.getInstance(); - cal.setTime(date); - cal.add(Calendar.MONTH, i); - - //Small tweak to avoid the 1 month in the future ticking over by a few seconds between now - //and when the checkHeader actually runs - if (i == 1) { - cal.add(Calendar.DAY_OF_MONTH, -1); + //We're going to run this test using a mocked clock, once for the next 13 months + for (int monthOffset = 0; monthOffset <= 13; monthOffset++) { + Clock mockClock = Clock.fixed( + LocalDateTime.now().plusMonths(monthOffset).toInstant(ZoneOffset.UTC), + Clock.systemDefaultZone().getZone() + ); + + //We're then going to try input dates (without the year) for all 12 months, starting + //10 months ago, and finishing next month (all relative to our mocked clock) + for (int i = -10; i <= 1; i++) { + SimpleDateFormat sdf = new SimpleDateFormat("MMM d hh:MM:ss", Locale.ENGLISH); + Date date = new Date(mockClock.millis()); + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + cal.add(Calendar.MONTH, i); + + //Small tweak to avoid the 1 month in the future ticking over by a few seconds between now + //and when the checkHeader actually runs + if (i == 1) { + cal.add(Calendar.DAY_OF_MONTH, -1); + } + + String stamp1 = sdf.format(cal.getTime()); + + String year = String.valueOf(cal.get(Calendar.YEAR)); + String format1 = "yyyyMMM d HH:mm:ss"; + String host1 = "ubuntu-11.cloudera.com"; + String data1 = "some msg"; + + // timestamp with 'Z' appended, translates to UTC + String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; + checkHeader(msg1, year + stamp1, format1, host1, data1, mockClock); } + } - String stamp1 = sdf.format(cal.getTime()); - - String year = String.valueOf(cal.get(Calendar.YEAR)); - String format1 = "yyyyMMM d HH:mm:ss"; - String host1 = "ubuntu-11.cloudera.com"; - String data1 = "some msg"; - // timestamp with 'Z' appended, translates to UTC - String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - checkHeader(msg1, year + stamp1, format1, host1, data1); - } } public static void checkHeader(String keepFields, String msg1, String stamp1, String format1, - String host1, String data1) throws ParseException { + String host1, String data1, Clock clock) throws ParseException { SyslogUtils util; if (keepFields == null || keepFields.isEmpty()) { - util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, new HashSet<String>(), false); + util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, new HashSet<String>(), false, clock); } else { util = new SyslogUtils(SyslogUtils.DEFAULT_SIZE, SyslogUtils.chooseFieldsToKeep(keepFields), - false); + false, clock); } ChannelBuffer buff = ChannelBuffers.buffer(200); @@ -256,10 +271,26 @@ public class TestSyslogUtils { Assert.assertEquals(data1, new String(e.getBody())); } + public static void checkHeader(String keepFields, String msg1, String stamp1, String format1, + String host1, String data1) throws ParseException { + checkHeader( + keepFields, msg1, stamp1, format1, + host1, data1, Clock.system(Clock.systemDefaultZone().getZone()) + ); + } + + public static void checkHeader(String msg1, String stamp1, String format1, + String host1, String data1, Clock clock) throws ParseException { + checkHeader("none", msg1, stamp1, format1, host1, data1, clock); + } + // Check headers for when keepFields is "none". public static void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException { - checkHeader("none", msg1, stamp1, format1, host1, data1); + checkHeader( + "none", msg1, stamp1, format1, + host1, data1, Clock.system(Clock.systemDefaultZone().getZone()) + ); } /**
