Repository: flume Updated Branches: refs/heads/trunk 330e57287 -> a0a50849d
FLUME-2982. Add localhost escape sequence to HDFS sink It would be useful to be able to just use %[localhost] escape sequence instead of having to pass in a header or use the host interceptor. Part of the problem of using an interceptor is the case where Flume only consists of a channel and a sink (think KafkaChannel). There is support in Flume for a sink-side interceptor at the time of this writing. Reviewers: Grant Henke, Mike Percy (Jeff Holoman via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a0a50849 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a0a50849 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a0a50849 Branch: refs/heads/trunk Commit: a0a50849d5ad33e6e6903316a3bd3dbba8547843 Parents: 330e572 Author: Jeff Holoman <[email protected]> Authored: Wed Aug 31 20:12:23 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Aug 31 20:21:35 2016 -0700 ---------------------------------------------------------------------- .../flume/formatter/output/BucketPath.java | 38 +++++++++++- .../flume/formatter/output/TestBucketPath.java | 47 +++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 61 +++++++++++--------- 3 files changed, 117 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a0a50849/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java index f640ec9..cf0fbb0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java @@ -24,6 +24,8 @@ import org.apache.flume.Clock; import org.apache.flume.SystemClock; import org.apache.flume.tools.TimestampRoundDownUtil; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; @@ -39,7 +41,7 @@ public class BucketPath { * These are useful to other classes which might want to search for tags in * strings. */ - public static final String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}"; + public static final String TAG_REGEX = "%(\\w|%)|%\\{([\\w\\.-]+)\\}|%\\[(\\w+)\\]"; public static final Pattern tagPattern = Pattern.compile(TAG_REGEX); private static Clock clock = new SystemClock(); @@ -214,6 +216,35 @@ public class BucketPath { * Not intended as a public API */ @VisibleForTesting + protected static String replaceStaticString(String key) { + String replacementString = ""; + try { + InetAddress addr = InetAddress.getLocalHost(); + switch (key.toLowerCase()) { + case "localhost": + replacementString = addr.getHostName(); + break; + case "ip": + replacementString = addr.getHostAddress(); + break; + case "fqdn": + replacementString = addr.getCanonicalHostName(); + break; + default: + throw new RuntimeException("The static escape string '" + key + "'" + + " was provided but does not match any of (localhost,IP,FQDN)"); + } + } catch (UnknownHostException e) { + throw new RuntimeException("Flume wasn't able to parse the static escape " + + " sequence '" + key + "' due to UnkownHostException.", e); + } + return replacementString; + } + + /** + * Not intended as a public API + */ + @VisibleForTesting protected static String replaceShorthand(char c, Map<String, String> headers, TimeZone timeZone, boolean needRounding, int unit, int roundDown, boolean useLocalTimestamp, long ts) { @@ -418,6 +449,11 @@ public class BucketPath { replacement = ""; // LOG.warn("Tag " + matcher.group(2) + " not found"); } + + // Group 3 is the %[...] pattern. + } else if (matcher.group(3) != null) { + replacement = replaceStaticString(matcher.group(3)); + } else { // The %x pattern. // Since we know the match is a single character, we can http://git-wip-us.apache.org/repos/asf/flume/blob/a0a50849/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java index ccc7460..989c2a4 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java +++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java @@ -26,6 +26,8 @@ import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; @@ -278,4 +280,49 @@ public class TestBucketPath { cal.set(Calendar.MILLISECOND, ms); return cal; } + + @Test + public void testStaticEscapeStrings() { + Map<String, String> staticStrings; + staticStrings = new HashMap<>(); + + try { + InetAddress addr = InetAddress.getLocalHost(); + staticStrings.put("localhost", addr.getHostName()); + staticStrings.put("IP", addr.getHostAddress()); + staticStrings.put("FQDN", addr.getCanonicalHostName()); + } catch (UnknownHostException e) { + Assert.fail("Test failed due to UnkownHostException"); + } + + TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); + String filePath = "%[localhost]/%[IP]/%[FQDN]"; + String realPath = BucketPath.escapeString(filePath, headers, + utcTimeZone, false, Calendar.HOUR_OF_DAY, 12, false); + String[] args = realPath.split("\\/"); + + Assert.assertEquals(args[0],staticStrings.get("localhost")); + Assert.assertEquals(args[1],staticStrings.get("IP")); + Assert.assertEquals(args[2],staticStrings.get("FQDN")); + + StringBuilder s = new StringBuilder(); + s.append("Expected String: ").append(staticStrings.get("localhost")); + s.append("/").append(staticStrings.get("IP")).append("/"); + s.append(staticStrings.get("FQDN")); + + System.out.println(s); + System.out.println("Escaped String: " + realPath ); + } + + @Test (expected = RuntimeException.class) + public void testStaticEscapeStringsNoKey() { + Map<String, String> staticStrings; + staticStrings = new HashMap<>(); + + TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); + String filePath = "%[abcdefg]/%[IP]/%[FQDN]"; + String realPath = BucketPath.escapeString(filePath, headers, + utcTimeZone, false, Calendar.HOUR_OF_DAY, 12, false); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/a0a50849/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 53844e3..0fecee6 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1785,34 +1785,39 @@ required. The following are the escape sequences supported: -========= ================================================= -Alias Description -========= ================================================= -%{host} Substitute value of event header named "host". Arbitrary header names are supported. -%t Unix time in milliseconds -%a locale's short weekday name (Mon, Tue, ...) -%A locale's full weekday name (Monday, Tuesday, ...) -%b locale's short month name (Jan, Feb, ...) -%B locale's long month name (January, February, ...) -%c locale's date and time (Thu Mar 3 23:05:25 2005) -%d day of month (01) -%e day of month without padding (1) -%D date; same as %m/%d/%y -%H hour (00..23) -%I hour (01..12) -%j day of year (001..366) -%k hour ( 0..23) -%m month (01..12) -%n month without padding (1..12) -%M minute (00..59) -%p locale's equivalent of am or pm -%s seconds since 1970-01-01 00:00:00 UTC -%S second (00..59) -%y last two digits of year (00..99) -%Y year (2010) -%z +hhmm numeric timezone (for example, -0400) -========= ================================================= - +=============== ================================================= +Alias Description +=============== ================================================= +%{host} Substitute value of event header named "host". Arbitrary header names are supported. +%t Unix time in milliseconds +%a locale's short weekday name (Mon, Tue, ...) +%A locale's full weekday name (Monday, Tuesday, ...) +%b locale's short month name (Jan, Feb, ...) +%B locale's long month name (January, February, ...) +%c locale's date and time (Thu Mar 3 23:05:25 2005) +%d day of month (01) +%e day of month without padding (1) +%D date; same as %m/%d/%y +%H hour (00..23) +%I hour (01..12) +%j day of year (001..366) +%k hour ( 0..23) +%m month (01..12) +%n month without padding (1..12) +%M minute (00..59) +%p locale's equivalent of am or pm +%s seconds since 1970-01-01 00:00:00 UTC +%S second (00..59) +%y last two digits of year (00..99) +%Y year (2010) +%z +hhmm numeric timezone (for example, -0400) +%[localhost] Substitute the hostname of the host where the agent is running +%[IP] Substitute the IP address of the host where the agent is running +%[FQDN] Substitute the canonical hostname of the host where the agent is running +=============== ================================================= + +Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java's ability to obtain the +hostname, which may fail in some networking environments. The file in use will have the name mangled to include ".tmp" at the end. Once the file is closed, this extension is removed. This allows excluding partially
