Repository: flume Updated Branches: refs/heads/trunk 4e08bf7d3 -> 59f0b4df9
FLUME-2426. Support interceptors in the Embedded Agent (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/59f0b4df Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/59f0b4df Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/59f0b4df Branch: refs/heads/trunk Commit: 59f0b4df97231acdc0b9769dccb3211c502b36d3 Parents: 4e08bf7 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 12 11:50:57 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 12 11:50:57 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 8 ++++++- .../flume/agent/embedded/EmbeddedAgent.java | 4 +++- .../flume/agent/embedded/TestEmbeddedAgent.java | 24 +++++++++++++++++++- .../TestEmbeddedAgentConfiguration.java | 4 ++++ 4 files changed, 37 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index ec6a735..e3b60e6 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -450,7 +450,7 @@ sources, sinks, and channels are allowed. Specifically the source used is a special embedded source and events should be send to the source via the put, putAll methods on the EmbeddedAgent object. Only File Channel and Memory Channel are allowed as channels while Avro Sink is the only -supported sink. +supported sink. Interceptors are also supported by the embedded agent. Note: The embedded agent has a dependency on hadoop-core.jar. @@ -470,6 +470,8 @@ channel.* -- Configuration options for the channel ty sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port. **processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list. +source.interceptors -- Space-separated list of interceptors +source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property ==================== ================ ============================================== Below is an example of how to use the agent: @@ -487,6 +489,10 @@ Below is an example of how to use the agent: properties.put("sink2.hostname", "collector2.apache.org"); properties.put("sink2.port", "5565"); properties.put("processor.type", "load_balance"); + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "static"); + properties.put("source.interceptors.i1.key", "key1"); + properties.put("source.interceptors.i1.value", "value1"); EmbeddedAgent agent = new EmbeddedAgent("myagent"); http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java index d02f440..32c9f18 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java @@ -111,7 +111,8 @@ public class EmbeddedAgent { throw new IllegalStateException("Cannot be started before being " + "configured"); } - doStart(); + // This check needs to be done before doStart(), + // as doStart() accesses sourceRunner.getSource() Source source = Preconditions.checkNotNull(sourceRunner.getSource(), "Source runner returned null source"); if(source instanceof EmbeddedSource) { @@ -120,6 +121,7 @@ public class EmbeddedAgent { throw new IllegalStateException("Unknown source type: " + source. getClass().getName()); } + doStart(); state = State.STARTED; } /** http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java index 0d644c6..975ba8d 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java @@ -61,7 +61,6 @@ public class TestEmbeddedAgent { private Map<String, String> headers; private byte[] body; - @Before public void setUp() throws Exception { headers = Maps.newHashMap(); @@ -93,6 +92,7 @@ public class TestEmbeddedAgent { agent = new EmbeddedAgent("test-" + serialNumber.incrementAndGet()); } + @After public void tearDown() throws Exception { if(agent != null) { @@ -110,6 +110,7 @@ public class TestEmbeddedAgent { } } } + @Test(timeout = 30000L) public void testPut() throws Exception { agent.configure(properties); @@ -124,6 +125,7 @@ public class TestEmbeddedAgent { Assert.assertArrayEquals(body, event.getBody()); Assert.assertEquals(headers, event.getHeaders()); } + @Test(timeout = 30000L) public void testPutAll() throws Exception { List<Event> events = Lists.newArrayList(); @@ -141,7 +143,27 @@ public class TestEmbeddedAgent { Assert.assertEquals(headers, event.getHeaders()); } + @Test(timeout = 30000L) + public void testPutWithInterceptors() throws Exception { + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "static"); + properties.put("source.interceptors.i1.key", "key2"); + properties.put("source.interceptors.i1.value", "value2"); + + agent.configure(properties); + agent.start(); + agent.put(EventBuilder.withBody(body, headers)); + Event event; + while((event = eventCollector.poll()) == null) { + Thread.sleep(500L); + } + Assert.assertNotNull(event); + Assert.assertArrayEquals(body, event.getBody()); + Map<String, String> newHeaders = new HashMap<String, String>(headers); + newHeaders.put("key2", "value2"); + Assert.assertEquals(newHeaders, event.getHeaders()); + } static class EventCollector implements AvroSourceProtocol { private final Queue<AvroFlumeEvent> eventQueue = http://git-wip-us.apache.org/repos/asf/flume/blob/59f0b4df/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java index f70d0b1..f4a9a58 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java @@ -46,6 +46,8 @@ public class TestEmbeddedAgentConfiguration { properties.put("sink2.hostname", "sink2.host"); properties.put("sink2.port", "2"); properties.put("processor.type", "load_balance"); + properties.put("source.interceptors", "i1"); + properties.put("source.interceptors.i1.type", "timestamp"); } @@ -91,6 +93,8 @@ public class TestEmbeddedAgentConfiguration { expected.put("test1.sources.source-test1.channels", "channel-test1"); expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration. SOURCE_TYPE_EMBEDDED); + expected.put("test1.sources.source-test1.interceptors", "i1"); + expected.put("test1.sources.source-test1.interceptors.i1.type", "timestamp"); Assert.assertEquals(expected, actual); }
