Updated Branches: refs/heads/flume-1.4 9856df98e -> 1ba41f383
FLUME-1768: Multiplexing channel selector should allow optional-only channels (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1ba41f38 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1ba41f38 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1ba41f38 Branch: refs/heads/flume-1.4 Commit: 1ba41f383bd3b0eca3e5e7fc79de298707599cbb Parents: 9856df9 Author: Brock Noland <[email protected]> Authored: Fri Dec 7 13:43:55 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Dec 7 13:44:09 2012 -0600 ---------------------------------------------------------------------- .../flume/channel/MultiplexingChannelSelector.java | 7 +- .../channel/TestMultiplexingChannelSelector.java | 171 +++++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 43 ++-- 3 files changed, 183 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java index 866d9dc..54e4b20 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java @@ -94,10 +94,6 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { defaultChannels = getChannelListFromNames( context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); - if(defaultChannels.isEmpty()){ - throw new FlumeException("Default channel list empty"); - } - Map<String, String> mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING); @@ -157,6 +153,9 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { private List<Channel> getChannelListFromNames(String channels, Map<String, Channel> channelNameMap){ List<Channel> configuredChannels = new ArrayList<Channel>(); + if(channels == null || channels.isEmpty()) { + return configuredChannels; + } String[] chNames = channels.split(" "); for (String name : chNames) { Channel ch = channelNameMap.get(name); http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java index 9dff5bb..be0bfbf 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java @@ -36,6 +36,7 @@ public class TestMultiplexingChannelSelector { private List<Channel> channels = new ArrayList<Channel>(); private ChannelSelector selector; + private Map<String, String> config = new HashMap<String, String>(); @Before public void setUp() throws Exception { @@ -43,23 +44,24 @@ public class TestMultiplexingChannelSelector { channels.add(MockChannel.createMockChannel("ch1")); channels.add(MockChannel.createMockChannel("ch2")); channels.add(MockChannel.createMockChannel("ch3")); - - Map<String, String> config = new HashMap<String, String>(); config.put("type", "multiplexing"); config.put("header", "myheader"); - config.put("mapping.foo", "ch1 ch2"); - config.put("mapping.bar", "ch2 ch3"); - config.put("mapping.xyz", "ch1 ch2 ch3"); - config.put("default", "ch1 ch3"); + config.put("optional.foo", "ch2 ch3"); config.put("optional.xyz", "ch1 ch3"); config.put("optional.zebra", "ch1 ch2"); - selector = ChannelSelectorFactory.create(channels, config); + } @Test public void testSelection() throws Exception { + + config.put("mapping.foo", "ch1 ch2"); + config.put("mapping.bar", "ch2 ch3"); + config.put("mapping.xyz", "ch1 ch2 ch3"); + config.put("default", "ch1 ch3"); + selector = ChannelSelectorFactory.create(channels, config); Assert.assertTrue(selector instanceof MultiplexingChannelSelector); Event event1 = new MockEvent(); @@ -88,7 +90,7 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(reqCh2.get(0).getName().equals("ch2")); Assert.assertTrue(reqCh2.get(1).getName().equals("ch3")); List<Channel> optCh2 = selector.getOptionalChannels(event2); - Assert.assertTrue(optCh2.size() == 0); + Assert.assertTrue(optCh2.isEmpty()); Event event3 = new MockEvent(); Map<String, String> header3 = new HashMap<String, String>(); @@ -110,8 +112,13 @@ public class TestMultiplexingChannelSelector { //it should always be mapped to the default channel(s). @Test public void testNoSelection() throws Exception { - Assert.assertTrue(selector instanceof MultiplexingChannelSelector); + config.put("mapping.foo", "ch1 ch2"); + config.put("mapping.bar", "ch2 ch3"); + config.put("mapping.xyz", "ch1 ch2 ch3"); + config.put("default", "ch1 ch3"); + selector = ChannelSelectorFactory.create(channels, config); + Assert.assertTrue(selector instanceof MultiplexingChannelSelector); Event noHeaderEvent = new MockEvent(); List<Channel> reqCh1 = selector.getRequiredChannels(noHeaderEvent); @@ -119,7 +126,7 @@ public class TestMultiplexingChannelSelector { Assert.assertEquals(2, reqCh1.size()); Assert.assertTrue(reqCh1.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh1.get(1).getName().equals("ch3")); - Assert.assertTrue(optCh1.size() == 0); + Assert.assertTrue(optCh1.isEmpty()); Map<String, String> header2 = new HashMap<String, String>(); header2.put("someheader", "foo"); @@ -131,7 +138,7 @@ public class TestMultiplexingChannelSelector { Assert.assertEquals(2, reqCh2.size()); Assert.assertTrue(reqCh2.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh2.get(1).getName().equals("ch3")); - Assert.assertTrue(optCh2.size() == 0); + Assert.assertTrue(optCh2.isEmpty()); Map<String, String> header3 = new HashMap<String, String>(); header3.put("myheader", "bar1"); @@ -143,7 +150,7 @@ public class TestMultiplexingChannelSelector { Assert.assertEquals(2, reqCh3.size()); Assert.assertTrue(reqCh3.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh3.get(1).getName().equals("ch3")); - Assert.assertTrue(optCh3.size() == 0); + Assert.assertTrue(optCh3.isEmpty()); Map<String, String> header4 = new HashMap<String, String>(); header4.put("myheader", "zebra"); @@ -155,7 +162,6 @@ public class TestMultiplexingChannelSelector { Assert.assertEquals(2, reqCh4.size()); Assert.assertTrue(reqCh4.get(0).getName().equals("ch1")); Assert.assertTrue(reqCh4.get(1).getName().equals("ch3")); - System.out.println(optCh4.size()); //Since ch1 is also in default list, it is removed. Assert.assertTrue(optCh4.size() == 1); Assert.assertTrue(optCh4.get(0).getName().equals("ch2")); @@ -166,4 +172,143 @@ public class TestMultiplexingChannelSelector { Assert.assertTrue(allChannels.get(1).getName().equals("ch2")); Assert.assertTrue(allChannels.get(2).getName().equals("ch3")); } + + @Test + public void testNoDefault() { + + config.put("mapping.foo", "ch1 ch2"); + config.put("mapping.bar", "ch2 ch3"); + config.put("mapping.xyz", "ch1 ch2 ch3"); + config.put("mapping.zebra", "ch2"); + config.put("optional.zebra", "ch1 ch3"); + selector = ChannelSelectorFactory.create(channels, config); + Assert.assertTrue(selector instanceof MultiplexingChannelSelector); + + Event event1 = new MockEvent(); + Map<String, String> header1 = new HashMap<String, String>(); + header1.put("myheader", "foo");// should match ch1 ch2 + event1.setHeaders(header1); + + List<Channel> reqCh1 = selector.getRequiredChannels(event1); + Assert.assertEquals(2, reqCh1.size()); + Assert.assertEquals("ch1", reqCh1.get(0).getName()); + Assert.assertEquals("ch2", reqCh1.get(1).getName()); + List<Channel> optCh1 = selector.getOptionalChannels(event1); + Assert.assertTrue(optCh1.size() == 1); + //ch2 should not be there -- since it is a required channel + Assert.assertEquals("ch3", optCh1.get(0).getName()); + + + + Event event2 = new MockEvent(); + Map<String, String> header2 = new HashMap<String, String>(); + header2.put("myheader", "bar"); // should match ch2 ch3 + event2.setHeaders(header2); + + List<Channel> reqCh2 = selector.getRequiredChannels(event2); + Assert.assertEquals(2, reqCh2.size()); + Assert.assertEquals("ch2", reqCh2.get(0).getName()); + Assert.assertEquals("ch3", reqCh2.get(1).getName()); + List<Channel> optCh2 = selector.getOptionalChannels(event2); + Assert.assertTrue(optCh2.isEmpty()); + + Event event3 = new MockEvent(); + Map<String, String> header3 = new HashMap<String, String>(); + header3.put("myheader", "xyz"); // should match ch1 ch2 ch3 + event3.setHeaders(header3); + + List<Channel> reqCh3 = selector.getRequiredChannels(event3); + Assert.assertEquals(3, reqCh3.size()); + Assert.assertEquals("ch1", reqCh3.get(0).getName()); + Assert.assertEquals("ch2", reqCh3.get(1).getName()); + Assert.assertEquals("ch3", reqCh3.get(2).getName()); + List<Channel> optCh3 = selector.getOptionalChannels(event3); + //All of the optional channels should go away. + Assert.assertTrue(optCh3.isEmpty()); + + Event event4 = new MockEvent(); + Map<String, String> header4 = new HashMap<String, String>(); + header4.put("myheader", "zebra"); + event4.setHeaders(header4); + + List<Channel> reqCh4 = selector.getRequiredChannels(event4); + Assert.assertEquals(1, reqCh4.size()); + Assert.assertEquals("ch2", reqCh4.get(0).getName()); + List<Channel> optCh4 = selector.getOptionalChannels(event4); + Assert.assertEquals(2, optCh4.size()); + Assert.assertEquals("ch1", optCh4.get(0).getName()); + Assert.assertEquals("ch3", optCh4.get(1).getName()); + } + + @Test + public void testNoMandatory() { + + config.put("default", "ch3"); + config.put("optional.foo", "ch1 ch2"); + config.put("optional.zebra", "ch2 ch3"); + selector = ChannelSelectorFactory.create(channels, config); + Assert.assertTrue(selector instanceof MultiplexingChannelSelector); + + Event event1 = new MockEvent(); + Map<String, String> header1 = new HashMap<String, String>(); + header1.put("myheader", "foo");// should match ch1 ch2 + event1.setHeaders(header1); + + List<Channel> reqCh1 = selector.getRequiredChannels(event1); + Assert.assertEquals(1, reqCh1.size()); + Assert.assertEquals("ch3", reqCh1.get(0).getName()); + List<Channel> optCh1 = selector.getOptionalChannels(event1); + Assert.assertEquals(2, optCh1.size()); + //ch2 should not be there -- since it is a required channel + Assert.assertEquals("ch1", optCh1.get(0).getName()); + Assert.assertEquals("ch2", optCh1.get(1).getName()); + + Event event4 = new MockEvent(); + Map<String, String> header4 = new HashMap<String, String>(); + header4.put("myheader", "zebra"); + event4.setHeaders(header4); + + List<Channel> reqCh4 = selector.getRequiredChannels(event4); + Assert.assertEquals(1, reqCh4.size()); + Assert.assertTrue(reqCh4.get(0).getName().equals("ch3")); + List<Channel> optCh4 = selector.getOptionalChannels(event4); + //ch3 was returned as a required channel, because it is default. + //So it is not returned in optional + Assert.assertEquals(1, optCh4.size()); + Assert.assertEquals("ch2", optCh4.get(0).getName()); + + } + + @Test + public void testOnlyOptional() { + config.put("optional.foo", "ch1 ch2"); + config.put("optional.zebra", "ch2 ch3"); + selector = ChannelSelectorFactory.create(channels, config); + Assert.assertTrue(selector instanceof MultiplexingChannelSelector); + + Event event1 = new MockEvent(); + Map<String, String> header1 = new HashMap<String, String>(); + header1.put("myheader", "foo");// should match ch1 ch2 + event1.setHeaders(header1); + + List<Channel> reqCh1 = selector.getRequiredChannels(event1); + Assert.assertTrue(reqCh1.isEmpty()); + List<Channel> optCh1 = selector.getOptionalChannels(event1); + Assert.assertEquals(2,optCh1.size()); + //ch2 should not be there -- since it is a required channel + + + Event event4 = new MockEvent(); + Map<String, String> header4 = new HashMap<String, String>(); + header4.put("myheader", "zebra"); + event4.setHeaders(header4); + + List<Channel> reqCh4 = selector.getRequiredChannels(event4); + Assert.assertTrue(reqCh4.isEmpty()); + List<Channel> optCh4 = selector.getOptionalChannels(event4); + Assert.assertEquals(2, optCh4.size()); + Assert.assertEquals("ch2", optCh4.get(0).getName()); + Assert.assertEquals("ch3", optCh4.get(1).getName()); + + } } http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index f8528bb..70cb285 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -183,7 +183,7 @@ This configuration lets a user generate events and subsequently logs them to the This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the -various components, then describes their types and configuration parameters. A given configuration file might define +various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest. Given this configuration file, we can start Flume as follows:: @@ -540,8 +540,7 @@ configured as default: <Agent>.sources.<Source1>.selector.default = <Channel2> -The mapping allows overlapping the channels for each value. The default must be -set for a multiplexing select which can also contain any number of channels. +The mapping allows overlapping the channels for each value. The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks: @@ -607,7 +606,9 @@ Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are -specified. +specified. If no channels are designated as default and there are no required, + the selector will attempt to write the events to the optional channels. Any +failures are simply ignored in that case. Flume Sources @@ -1248,7 +1249,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S @@ -1279,7 +1280,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 @@ -1310,7 +1311,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 @@ -1346,7 +1347,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = irc a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = irc.yourdomain.com @@ -1375,7 +1376,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume @@ -1399,7 +1400,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = null a1.sinks.k1.channel = c1 @@ -1444,7 +1445,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf @@ -1484,7 +1485,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.hbase.AsyncHBaseSink a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf @@ -1523,7 +1524,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 a1.sinks.k1.indexName = foo_index @@ -1554,7 +1555,7 @@ Example for agent named a1: .. code-block:: properties a1.channels = c1 - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1 @@ -1713,14 +1714,14 @@ Generating a key with a password seperate from the key store password: -keysize 128 -validity 9000 -keystore test.keystore \ -storetype jceks -storepass keyStorePassword -Generating a key with the password the same as the key store password: +Generating a key with the password the same as the key store password: .. code-block:: bash keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \ -keystore src/test/resources/test.keystore -storetype jceks \ -storepass keyStorePassword - + .. code-block:: properties @@ -2022,7 +2023,7 @@ Example for agent named a1: .. code-block:: properties - a1.sinks = k1 + a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume @@ -2072,7 +2073,7 @@ are named components, here is an example of how they are created through configu .. code-block:: properties a1.sources = r1 - a1.sinks = k1 + a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder @@ -2208,8 +2209,8 @@ serializers.* -- Serializer-specific properties ================================ ========== ================================================================================================= The serializers are used to map the matches to a header name and a formatted header value, by default you only need to specify -the header name and the default ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` will be used. -This serializer simply maps the matches to the specified header name and passes the value through as it was extracted by the regex. +the header name and the default ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` will be used. +This serializer simply maps the matches to the specified header name and passes the value through as it was extracted by the regex. You can plug custom serializer implementations into the extractor using the fully qualified class name (FQCN) to format the matches in anyway you like. @@ -2645,7 +2646,7 @@ org.apache.flume.ChannelSelector -- org.apache.flume.SinkProcessor default org.apache.flume.sink.DefaultSinkProcessor org.apache.flume.SinkProcessor failover org.apache.flume.sink.FailoverSinkProcessor org.apache.flume.SinkProcessor load_balance org.apache.flume.sink.LoadBalancingSinkProcessor -org.apache.flume.SinkProcessor -- +org.apache.flume.SinkProcessor -- org.apache.flume.interceptor.Interceptor timestamp org.apache.flume.interceptor.TimestampInterceptor$Builder org.apache.flume.interceptor.Interceptor host org.apache.flume.interceptor.HostInterceptor$Builder
