Updated Branches: refs/heads/trunk c27edd0e8 -> babdb69a3
FLUME-946: Allow multiplexing channel selector to specify optional channels (Hari Shreedharan via Brock Noland Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/babdb69a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/babdb69a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/babdb69a Branch: refs/heads/trunk Commit: babdb69a3eebdd4fca7f5edd30e6de1d0310d5bd Parents: c27edd0 Author: Brock Noland <[email protected]> Authored: Mon Oct 29 16:28:38 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Mon Oct 29 16:28:38 2012 -0500 ---------------------------------------------------------------------- .../flume/channel/MultiplexingChannelSelector.java | 40 ++++++++++++++- .../channel/TestMultiplexingChannelSelector.java | 25 +++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 40 ++++++++++++++- 3 files changed, 100 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/babdb69a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java index 81dc3e8..866d9dc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java @@ -38,6 +38,8 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { "flume.selector.header"; public static final String CONFIG_PREFIX_MAPPING = "mapping."; public static final String CONFIG_DEFAULT_CHANNEL = "default"; + public static final String CONFIG_PREFIX_OPTIONAL = "optional"; + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory .getLogger(MultiplexingChannelSelector.class); @@ -48,7 +50,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { private String headerName; private Map<String, List<Channel>> channelMapping; - + private Map<String, List<Channel>> optionalChannels; private List<Channel> defaultChannels; @Override public List<Channel> getRequiredChannels(Event event) { @@ -70,7 +72,13 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { @Override public List<Channel> getOptionalChannels(Event event) { - return EMPTY_LIST; + String hdr = event.getHeaders().get(headerName); + List<Channel> channels = optionalChannels.get(hdr); + + if(channels == null) { + channels = EMPTY_LIST; + } + return channels; } @Override @@ -113,6 +121,34 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { } //If no mapping is configured, it is ok. //All events will go to the default channel(s). + Map<String, String> optionalChannelsMapping = + context.getSubProperties(CONFIG_PREFIX_OPTIONAL + "."); + + optionalChannels = new HashMap<String, List<Channel>>(); + for (String hdr : optionalChannelsMapping.keySet()) { + List<Channel> confChannels = getChannelListFromNames( + optionalChannelsMapping.get(hdr), channelNameMap); + if (confChannels.isEmpty()) { + confChannels = EMPTY_LIST; + } + //Remove channels from optional channels, which are already + //configured to be required channels. + + List<Channel> reqdChannels = channelMapping.get(hdr); + //Check if there are required channels, else defaults to default channels + if(reqdChannels == null || reqdChannels.isEmpty()) { + reqdChannels = defaultChannels; + } + for (Channel c : reqdChannels) { + if (confChannels.contains(c)) { + confChannels.remove(c); + } + } + + if (optionalChannels.put(hdr, confChannels) != null) { + throw new FlumeException("Selector channel configured twice"); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/babdb69a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java index 2626b20..9dff5bb 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java @@ -51,6 +51,9 @@ public class TestMultiplexingChannelSelector { config.put("mapping.bar", "ch2 ch3"); config.put("mapping.xyz", "ch1 ch2 ch3"); config.put("default", "ch1 ch3"); + config.put("optional.foo", "ch2 ch3"); + config.put("optional.xyz", "ch1 ch3"); + config.put("optional.zebra", "ch1 ch2"); selector = ChannelSelectorFactory.create(channels, config); } @@ -69,7 +72,11 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh1.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh1.get(1).getName().equals("ch2")); List<Channel> optCh1 = selector.getOptionalChannels(event1); - Assert.assertTrue(optCh1.size() == 0); + Assert.assertTrue(optCh1.size() == 1); + //ch2 should not be there -- since it is a required channel + Assert.assertTrue(optCh1.get(0).getName().equals("ch3")); + + Event event2 = new MockEvent(); Map<String, String> header2 = new HashMap<String, String>(); @@ -94,7 +101,9 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh3.get(1).getName().equals("ch2")); Assert.assertTrue(reqCh3.get(2).getName().equals("ch3")); List<Channel> optCh3 = selector.getOptionalChannels(event3); + //All of the optional channels should go away. Assert.assertTrue(optCh3.size() == 0); + } //If the header information cannot map the event to any of the channels @@ -136,6 +145,20 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh3.get(1).getName().equals("ch3")); Assert.assertTrue(optCh3.size() == 0); + Map<String, String> header4 = new HashMap<String, String>(); + header4.put("myheader", "zebra"); + Event zebraEvent = new MockEvent(); + zebraEvent.setHeaders(header4); + + List<Channel> reqCh4 = selector.getRequiredChannels(zebraEvent); + List<Channel> optCh4 = selector.getOptionalChannels(zebraEvent); + Assert.assertEquals(2, reqCh4.size()); + Assert.assertTrue(reqCh4.get(0).getName().equals("ch1")); + Assert.assertTrue(reqCh4.get(1).getName().equals("ch3")); + System.out.println(optCh4.size()); + //Since ch1 is also in default list, it is removed. + Assert.assertTrue(optCh4.size() == 1); + Assert.assertTrue(optCh4.get(0).getName().equals("ch2")); List<Channel> allChannels = selector.getAllChannels(); Assert.assertTrue(allChannels.size() == 3); http://git-wip-us.apache.org/repos/asf/flume/blob/babdb69a/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 0c0951b..c1303e0 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -567,6 +567,42 @@ sent to mem-channel-1, if its "AZ" then it goes to jdbc-channel-2 or if its "NY" then both. If the "State" header is not set or doesn't match any of the three, then it goes to mem-channel-1 which is designated as 'default'. +The selector also supports optional channels. To specify optional channels for +a header, the config parameter 'optional' is used in the following way: + +.. code-block:: properties + + # channel selector configuration + agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing + agent_foo.sources.avro-AppSrv-source1.selector.header = State + agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 + agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1 + +The selector will attempt to write to the required channels first and will fail +the transaction if even one of these channels fails to consume the events. The +transaction is reattempted on **all** of the channels. Once all required +channels have consumed the events, then the selector will attempt to write to +the optional channels. A failure by any of the optional channels to consume the +event is simply ignored and not retried. + +If there is an overlap between the optional channels and required channels for a +specific header, the channel is considered to be required, and a failure in the +channel will cause the entire set of required channels to be retried. For +instance, in the above example, for the header "CA" mem-channel-1 is considered +to be a required channel even though it is marked both as required and optional, + and a failure to write to this channel will cause that +event to be retried on **all** channels configured for the selector. + +Note that if a header does not have any required channels, then the event will +be written to the default channels and will be attempted to be written to the +optional channels for that header. Specifying optional channels will still cause +the event to be written to the default channels, if no required channels are +specified. + Flume Sources ------------- @@ -1345,7 +1381,7 @@ ElasticSearchSink This sink writes data to ElasticSearch. A class implementing ElasticSearchEventSerializer which is specified by the configuration is used to convert the events into XContentBuilder which detail the fields and mappings which will be indexed. These are then then written -to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with +to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with a single large index The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink Required properties are in **bold**. @@ -1360,7 +1396,7 @@ indexName flume indexType logs The type to index the document to, defaults to 'log' clusterName elasticsearch Name of the ElasticSearch cluster to connect to batchSize 100 Number of events to be written per txn. -ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, +ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted serializer org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer serializer.* -- Properties to be passed to the serializer.
