Repository: flume Updated Branches: refs/heads/flume-1.6 49f8eb2a3 -> c458a838a
FLUME-2426. Support interceptors in the Embedded Agent (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c458a838 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c458a838 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c458a838 Branch: refs/heads/flume-1.6 Commit: c458a838ad3dc68946662050ee96353d36ee4e74 Parents: 49f8eb2 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 12 11:50:57 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 12 11:51:31 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 8 ++++++- .../flume/agent/embedded/EmbeddedAgent.java | 4 +++- .../flume/agent/embedded/TestEmbeddedAgent.java | 24 +++++++++++++++++++- .../TestEmbeddedAgentConfiguration.java | 4 ++++ 4 files changed, 37 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index ec6a735..e3b60e6 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -450,7 +450,7 @@ sources, sinks, and channels are allowed. Specifically the source used is a special embedded source and events should be send to the source via the put, putAll methods on the EmbeddedAgent object. Only File Channel and Memory Channel are allowed as channels while Avro Sink is the only -supported sink. +supported sink. Interceptors are also supported by the embedded agent. Note: The embedded agent has a dependency on hadoop-core.jar. @@ -470,6 +470,8 @@ channel.* -- Configuration options for the channel ty sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port. **processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list. +source.interceptors -- Space-separated list of interceptors +source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property ==================== ================ ============================================== Below is an example of how to use the agent: @@ -487,6 +489,10 @@ Below is an example of how to use the agent: properties.put("sink2.hostname", "collector2.apache.org"); properties.put("sink2.port", "5565"); properties.put("processor.type", "load_balance"); + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "static"); + properties.put("source.interceptors.i1.key", "key1"); + properties.put("source.interceptors.i1.value", "value1"); EmbeddedAgent agent = new EmbeddedAgent("myagent"); http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java index d02f440..32c9f18 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java @@ -111,7 +111,8 @@ public class EmbeddedAgent { throw new IllegalStateException("Cannot be started before being " + "configured"); } - doStart(); + // This check needs to be done before doStart(), + // as doStart() accesses sourceRunner.getSource() Source source = Preconditions.checkNotNull(sourceRunner.getSource(), "Source runner returned null source"); if(source instanceof EmbeddedSource) { @@ -120,6 +121,7 @@ public class EmbeddedAgent { throw new IllegalStateException("Unknown source type: " + source. getClass().getName()); } + doStart(); state = State.STARTED; } /** http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java index 0d644c6..975ba8d 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java @@ -61,7 +61,6 @@ public class TestEmbeddedAgent { private Map<String, String> headers; private byte[] body; - @Before public void setUp() throws Exception { headers = Maps.newHashMap(); @@ -93,6 +92,7 @@ public class TestEmbeddedAgent { agent = new EmbeddedAgent("test-" + serialNumber.incrementAndGet()); } + @After public void tearDown() throws Exception { if(agent != null) { @@ -110,6 +110,7 @@ public class TestEmbeddedAgent { } } } + @Test(timeout = 30000L) public void testPut() throws Exception { agent.configure(properties); @@ -124,6 +125,7 @@ public class TestEmbeddedAgent { Assert.assertArrayEquals(body, event.getBody()); Assert.assertEquals(headers, event.getHeaders()); } + @Test(timeout = 30000L) public void testPutAll() throws Exception { List<Event> events = Lists.newArrayList(); @@ -141,7 +143,27 @@ public class TestEmbeddedAgent { Assert.assertEquals(headers, event.getHeaders()); } + @Test(timeout = 30000L) + public void testPutWithInterceptors() throws Exception { + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "static"); + properties.put("source.interceptors.i1.key", "key2"); + properties.put("source.interceptors.i1.value", "value2"); + + agent.configure(properties); + agent.start(); + agent.put(EventBuilder.withBody(body, headers)); + Event event; + while((event = eventCollector.poll()) == null) { + Thread.sleep(500L); + } + Assert.assertNotNull(event); + Assert.assertArrayEquals(body, event.getBody()); + Map<String, String> newHeaders = new HashMap<String, String>(headers); + newHeaders.put("key2", "value2"); + Assert.assertEquals(newHeaders, event.getHeaders()); + } static class EventCollector implements AvroSourceProtocol { private final Queue<AvroFlumeEvent> eventQueue = http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java index f70d0b1..f4a9a58 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java @@ -46,6 +46,8 @@ public class TestEmbeddedAgentConfiguration { properties.put("sink2.hostname", "sink2.host"); properties.put("sink2.port", "2"); properties.put("processor.type", "load_balance"); + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "timestamp"); } @@ -91,6 +93,8 @@ public class TestEmbeddedAgentConfiguration { expected.put("test1.sources.source-test1.channels", "channel-test1"); expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration. SOURCE_TYPE_EMBEDDED); + expected.put("test1.sources.source-test1.interceptors", "i1"); + expected.put("test1.sources.source-test1.interceptors.i1.type", "timestamp"); Assert.assertEquals(expected, actual); }
