Repository: flume Updated Branches: refs/heads/trunk 773555c5c -> 13771c905
FLUME-1520. Timestamp interceptor should support custom headers This change adds a configuration parameter to the TimestampInterceptor for the user to be able to define the name of the timestamp header. Reviewers: Tristan Stevens, Attila Simon (Hari Shreedharan, Tristan Stevens, Attila Simon via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13771c90 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13771c90 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13771c90 Branch: refs/heads/trunk Commit: 13771c905316052d3e94aeb3b4a0d49a27c0f852 Parents: 773555c Author: Denes Arvay <[email protected]> Authored: Mon Sep 11 15:16:45 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Mon Sep 11 21:48:19 2017 +0200 ---------------------------------------------------------------------- .../flume/interceptor/TimestampInterceptor.java | 27 ++++++++----- .../interceptor/TestTimestampInterceptor.java | 41 ++++++++++++++------ flume-ng-doc/sphinx/FlumeUserGuide.rst | 19 ++++----- 3 files changed, 56 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java index 50c3695..4ed6387 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java @@ -28,18 +28,22 @@ import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*; /** * Simple Interceptor class that sets the current system timestamp on all events * that are intercepted. - * By convention, this timestamp header is named "timestamp" and its format + * By convention, this timestamp header is named "timestamp" by default and its format * is a "stringified" long timestamp in milliseconds since the UNIX epoch. + * The name of the header can be changed through the configuration using the + * config key "header". */ public class TimestampInterceptor implements Interceptor { private final boolean preserveExisting; + private final String header; /** * Only {@link TimestampInterceptor.Builder} can build me */ - private TimestampInterceptor(boolean preserveExisting) { + private TimestampInterceptor(boolean preserveExisting, String header) { this.preserveExisting = preserveExisting; + this.header = header; } @Override @@ -53,11 +57,11 @@ public class TimestampInterceptor implements Interceptor { @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); - if (preserveExisting && headers.containsKey(TIMESTAMP)) { + if (preserveExisting && headers.containsKey(header)) { // we must preserve the existing timestamp } else { long now = System.currentTimeMillis(); - headers.put(TIMESTAMP, Long.toString(now)); + headers.put(header, Long.toString(now)); } return event; } @@ -85,24 +89,27 @@ public class TimestampInterceptor implements Interceptor { */ public static class Builder implements Interceptor.Builder { - private boolean preserveExisting = PRESERVE_DFLT; + private boolean preserveExisting = DEFAULT_PRESERVE; + private String header = DEFAULT_HEADER_NAME; @Override public Interceptor build() { - return new TimestampInterceptor(preserveExisting); + return new TimestampInterceptor(preserveExisting, header); } @Override public void configure(Context context) { - preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); + preserveExisting = context.getBoolean(CONFIG_PRESERVE, DEFAULT_PRESERVE); + header = context.getString(CONFIG_HEADER_NAME, DEFAULT_HEADER_NAME); } } public static class Constants { - public static String TIMESTAMP = "timestamp"; - public static String PRESERVE = "preserveExisting"; - public static boolean PRESERVE_DFLT = false; + public static final String CONFIG_PRESERVE = "preserveExisting"; + public static final boolean DEFAULT_PRESERVE = false; + public static final String CONFIG_HEADER_NAME = "headerName"; + public static final String DEFAULT_HEADER_NAME = "timestamp"; } } http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java index 3d3eeee..06648c6 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestTimestampInterceptor.java @@ -35,17 +35,16 @@ public class TestTimestampInterceptor { public void testBasic() throws ClassNotFoundException, InstantiationException, IllegalAccessException { - InterceptorBuilderFactory factory = new InterceptorBuilderFactory(); Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( InterceptorType.TIMESTAMP.toString()); Interceptor interceptor = builder.build(); Event event = EventBuilder.withBody("test event", Charsets.UTF_8); - Assert.assertNull(event.getHeaders().get(Constants.TIMESTAMP)); + Assert.assertNull(event.getHeaders().get(Constants.DEFAULT_HEADER_NAME)); Long now = System.currentTimeMillis(); event = interceptor.intercept(event); - String timestampStr = event.getHeaders().get(Constants.TIMESTAMP); + String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME); Assert.assertNotNull(timestampStr); Assert.assertTrue(Long.parseLong(timestampStr) >= now); } @@ -60,7 +59,6 @@ public class TestTimestampInterceptor { Context ctx = new Context(); ctx.put("preserveExisting", "true"); - InterceptorBuilderFactory factory = new InterceptorBuilderFactory(); Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( InterceptorType.TIMESTAMP.toString()); builder.configure(ctx); @@ -68,13 +66,12 @@ public class TestTimestampInterceptor { long originalTs = 1L; Event event = EventBuilder.withBody("test event", Charsets.UTF_8); - event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs)); + event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs)); Assert.assertEquals(Long.toString(originalTs), - event.getHeaders().get(Constants.TIMESTAMP)); + event.getHeaders().get(Constants.DEFAULT_HEADER_NAME)); - Long now = System.currentTimeMillis(); event = interceptor.intercept(event); - String timestampStr = event.getHeaders().get(Constants.TIMESTAMP); + String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME); Assert.assertNotNull(timestampStr); Assert.assertTrue(Long.parseLong(timestampStr) == originalTs); } @@ -89,7 +86,6 @@ public class TestTimestampInterceptor { Context ctx = new Context(); ctx.put("preserveExisting", "false"); // DEFAULT BEHAVIOR - InterceptorBuilderFactory factory = new InterceptorBuilderFactory(); Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( InterceptorType.TIMESTAMP.toString()); builder.configure(ctx); @@ -97,15 +93,36 @@ public class TestTimestampInterceptor { long originalTs = 1L; Event event = EventBuilder.withBody("test event", Charsets.UTF_8); - event.getHeaders().put(Constants.TIMESTAMP, Long.toString(originalTs)); + event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs)); Assert.assertEquals(Long.toString(originalTs), - event.getHeaders().get(Constants.TIMESTAMP)); + event.getHeaders().get(Constants.DEFAULT_HEADER_NAME)); Long now = System.currentTimeMillis(); event = interceptor.intercept(event); - String timestampStr = event.getHeaders().get(Constants.TIMESTAMP); + String timestampStr = event.getHeaders().get(Constants.DEFAULT_HEADER_NAME); Assert.assertNotNull(timestampStr); Assert.assertTrue(Long.parseLong(timestampStr) >= now); } + @Test + public void testCustomHeader() throws Exception { + Context ctx = new Context(); + ctx.put(TimestampInterceptor.Constants.CONFIG_HEADER_NAME, "timestampHeader"); + Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( + InterceptorType.TIMESTAMP.toString()); + builder.configure(ctx); + Interceptor interceptor = builder.build(); + + long originalTs = 1L; + Event event = EventBuilder.withBody("test event", Charsets.UTF_8); + event.getHeaders().put(Constants.DEFAULT_HEADER_NAME, Long.toString(originalTs)); + + Long now = System.currentTimeMillis(); + event = interceptor.intercept(event); + Assert.assertEquals(Long.toString(originalTs), + event.getHeaders().get(Constants.DEFAULT_HEADER_NAME)); + String timestampStr = event.getHeaders().get("timestampHeader"); + Assert.assertNotNull(timestampStr); + Assert.assertTrue(Long.parseLong(timestampStr) >= now); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/13771c90/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index bbe9330..6183d9a 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -3956,15 +3956,16 @@ Timestamp Interceptor ~~~~~~~~~~~~~~~~~~~~~ This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor -inserts a header with key ``timestamp`` whose value is the relevant timestamp. This interceptor -can preserve an existing timestamp if it is already present in the configuration. - -================ ======= ======================================================================== -Property Name Default Description -================ ======= ======================================================================== -**type** -- The component type name, has to be ``timestamp`` or the FQCN -preserveExisting false If the timestamp already exists, should it be preserved - true or false -================ ======= ======================================================================== +inserts a header with key ``timestamp`` (or as specified by the ``header`` property) whose value is the relevant timestamp. +This interceptor can preserve an existing timestamp if it is already present in the configuration. + +================ ========= ======================================================================== +Property Name Default Description +================ ========= ======================================================================== +**type** -- The component type name, has to be ``timestamp`` or the FQCN +header timestamp The name of the header in which to place the generated timestamp. +preserveExisting false If the timestamp already exists, should it be preserved - true or false +================ ========= ======================================================================== Example for agent named a1:
