Updated Branches:
  refs/heads/flume-1.4 9856df98e -> 1ba41f383

FLUME-1768: Multiplexing channel selector should allow optional-only channels

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1ba41f38
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1ba41f38
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1ba41f38

Branch: refs/heads/flume-1.4
Commit: 1ba41f383bd3b0eca3e5e7fc79de298707599cbb
Parents: 9856df9
Author: Brock Noland <[email protected]>
Authored: Fri Dec 7 13:43:55 2012 -0600
Committer: Brock Noland <[email protected]>
Committed: Fri Dec 7 13:44:09 2012 -0600

----------------------------------------------------------------------
 .../flume/channel/MultiplexingChannelSelector.java |    7 +-
 .../channel/TestMultiplexingChannelSelector.java   |  171 +++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   43 ++--
 3 files changed, 183 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
index 866d9dc..54e4b20 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
@@ -94,10 +94,6 @@ public class MultiplexingChannelSelector extends 
AbstractChannelSelector {
     defaultChannels = getChannelListFromNames(
         context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);
 
-    if(defaultChannels.isEmpty()){
-      throw new FlumeException("Default channel list empty");
-    }
-
     Map<String, String> mapConfig =
         context.getSubProperties(CONFIG_PREFIX_MAPPING);
 
@@ -157,6 +153,9 @@ public class MultiplexingChannelSelector extends 
AbstractChannelSelector {
   private List<Channel> getChannelListFromNames(String channels,
       Map<String, Channel> channelNameMap){
     List<Channel> configuredChannels = new ArrayList<Channel>();
+    if(channels == null || channels.isEmpty()) {
+      return configuredChannels;
+    }
     String[] chNames = channels.split(" ");
     for (String name : chNames) {
       Channel ch = channelNameMap.get(name);

http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
index 9dff5bb..be0bfbf 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMultiplexingChannelSelector.java
@@ -36,6 +36,7 @@ public class TestMultiplexingChannelSelector {
   private List<Channel> channels = new ArrayList<Channel>();
 
   private ChannelSelector selector;
+  private Map<String, String> config = new HashMap<String, String>();
 
   @Before
   public void setUp() throws Exception {
@@ -43,23 +44,24 @@ public class TestMultiplexingChannelSelector {
     channels.add(MockChannel.createMockChannel("ch1"));
     channels.add(MockChannel.createMockChannel("ch2"));
     channels.add(MockChannel.createMockChannel("ch3"));
-
-    Map<String, String> config = new HashMap<String, String>();
     config.put("type", "multiplexing");
     config.put("header", "myheader");
-    config.put("mapping.foo", "ch1 ch2");
-    config.put("mapping.bar", "ch2 ch3");
-    config.put("mapping.xyz", "ch1 ch2 ch3");
-    config.put("default", "ch1 ch3");
+
     config.put("optional.foo", "ch2 ch3");
     config.put("optional.xyz", "ch1 ch3");
     config.put("optional.zebra", "ch1 ch2");
 
-    selector = ChannelSelectorFactory.create(channels, config);
+
   }
 
   @Test
   public void testSelection() throws Exception {
+
+    config.put("mapping.foo", "ch1 ch2");
+    config.put("mapping.bar", "ch2 ch3");
+    config.put("mapping.xyz", "ch1 ch2 ch3");
+    config.put("default", "ch1 ch3");
+    selector = ChannelSelectorFactory.create(channels, config);
     Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
 
     Event event1 = new MockEvent();
@@ -88,7 +90,7 @@ public class TestMultiplexingChannelSelector {
     Assert.assertTrue(reqCh2.get(0).getName().equals("ch2"));
     Assert.assertTrue(reqCh2.get(1).getName().equals("ch3"));
     List<Channel> optCh2 = selector.getOptionalChannels(event2);
-    Assert.assertTrue(optCh2.size() == 0);
+    Assert.assertTrue(optCh2.isEmpty());
 
     Event event3 = new MockEvent();
     Map<String, String> header3 = new HashMap<String, String>();
@@ -110,8 +112,13 @@ public class TestMultiplexingChannelSelector {
   //it should always be mapped to the default channel(s).
   @Test
   public void testNoSelection() throws Exception {
-    Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
 
+    config.put("mapping.foo", "ch1 ch2");
+    config.put("mapping.bar", "ch2 ch3");
+    config.put("mapping.xyz", "ch1 ch2 ch3");
+    config.put("default", "ch1 ch3");
+    selector = ChannelSelectorFactory.create(channels, config);
+    Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
     Event noHeaderEvent = new MockEvent();
 
     List<Channel> reqCh1 = selector.getRequiredChannels(noHeaderEvent);
@@ -119,7 +126,7 @@ public class TestMultiplexingChannelSelector {
     Assert.assertEquals(2, reqCh1.size());
     Assert.assertTrue(reqCh1.get(0).getName().equals("ch1"));
     Assert.assertTrue(reqCh1.get(1).getName().equals("ch3"));
-    Assert.assertTrue(optCh1.size() == 0);
+    Assert.assertTrue(optCh1.isEmpty());
 
     Map<String, String> header2 = new HashMap<String, String>();
     header2.put("someheader", "foo");
@@ -131,7 +138,7 @@ public class TestMultiplexingChannelSelector {
     Assert.assertEquals(2, reqCh2.size());
     Assert.assertTrue(reqCh2.get(0).getName().equals("ch1"));
     Assert.assertTrue(reqCh2.get(1).getName().equals("ch3"));
-    Assert.assertTrue(optCh2.size() == 0);
+    Assert.assertTrue(optCh2.isEmpty());
 
     Map<String, String> header3 = new HashMap<String, String>();
     header3.put("myheader", "bar1");
@@ -143,7 +150,7 @@ public class TestMultiplexingChannelSelector {
     Assert.assertEquals(2, reqCh3.size());
     Assert.assertTrue(reqCh3.get(0).getName().equals("ch1"));
     Assert.assertTrue(reqCh3.get(1).getName().equals("ch3"));
-    Assert.assertTrue(optCh3.size() == 0);
+    Assert.assertTrue(optCh3.isEmpty());
 
     Map<String, String> header4 = new HashMap<String, String>();
     header4.put("myheader", "zebra");
@@ -155,7 +162,6 @@ public class TestMultiplexingChannelSelector {
     Assert.assertEquals(2, reqCh4.size());
     Assert.assertTrue(reqCh4.get(0).getName().equals("ch1"));
     Assert.assertTrue(reqCh4.get(1).getName().equals("ch3"));
-    System.out.println(optCh4.size());
     //Since ch1 is also in default list, it is removed.
     Assert.assertTrue(optCh4.size() == 1);
     Assert.assertTrue(optCh4.get(0).getName().equals("ch2"));
@@ -166,4 +172,143 @@ public class TestMultiplexingChannelSelector {
     Assert.assertTrue(allChannels.get(1).getName().equals("ch2"));
     Assert.assertTrue(allChannels.get(2).getName().equals("ch3"));
   }
+
+  @Test
+  public void testNoDefault() {
+
+    config.put("mapping.foo", "ch1 ch2");
+    config.put("mapping.bar", "ch2 ch3");
+    config.put("mapping.xyz", "ch1 ch2 ch3");
+    config.put("mapping.zebra", "ch2");
+    config.put("optional.zebra", "ch1 ch3");
+    selector = ChannelSelectorFactory.create(channels, config);
+    Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
+
+    Event event1 = new MockEvent();
+    Map<String, String> header1 = new HashMap<String, String>();
+    header1.put("myheader", "foo");// should match ch1 ch2
+    event1.setHeaders(header1);
+
+    List<Channel> reqCh1 = selector.getRequiredChannels(event1);
+    Assert.assertEquals(2, reqCh1.size());
+    Assert.assertEquals("ch1", reqCh1.get(0).getName());
+    Assert.assertEquals("ch2", reqCh1.get(1).getName());
+    List<Channel> optCh1 = selector.getOptionalChannels(event1);
+    Assert.assertTrue(optCh1.size() == 1);
+    //ch2 should not be there -- since it is a required channel
+    Assert.assertEquals("ch3", optCh1.get(0).getName());
+
+
+
+    Event event2 = new MockEvent();
+    Map<String, String> header2 = new HashMap<String, String>();
+    header2.put("myheader", "bar"); // should match ch2 ch3
+    event2.setHeaders(header2);
+
+    List<Channel> reqCh2 = selector.getRequiredChannels(event2);
+    Assert.assertEquals(2, reqCh2.size());
+    Assert.assertEquals("ch2", reqCh2.get(0).getName());
+    Assert.assertEquals("ch3", reqCh2.get(1).getName());
+    List<Channel> optCh2 = selector.getOptionalChannels(event2);
+    Assert.assertTrue(optCh2.isEmpty());
+
+    Event event3 = new MockEvent();
+    Map<String, String> header3 = new HashMap<String, String>();
+    header3.put("myheader", "xyz"); // should match ch1 ch2 ch3
+    event3.setHeaders(header3);
+
+    List<Channel> reqCh3 = selector.getRequiredChannels(event3);
+    Assert.assertEquals(3, reqCh3.size());
+    Assert.assertEquals("ch1", reqCh3.get(0).getName());
+    Assert.assertEquals("ch2", reqCh3.get(1).getName());
+    Assert.assertEquals("ch3", reqCh3.get(2).getName());
+    List<Channel> optCh3 = selector.getOptionalChannels(event3);
+    //All of the optional channels should go away.
+    Assert.assertTrue(optCh3.isEmpty());
+
+    Event event4 = new MockEvent();
+    Map<String, String> header4 = new HashMap<String, String>();
+    header4.put("myheader", "zebra");
+    event4.setHeaders(header4);
+
+    List<Channel> reqCh4 = selector.getRequiredChannels(event4);
+    Assert.assertEquals(1, reqCh4.size());
+    Assert.assertEquals("ch2", reqCh4.get(0).getName());
+    List<Channel> optCh4 = selector.getOptionalChannels(event4);
+    Assert.assertEquals(2, optCh4.size());
+    Assert.assertEquals("ch1", optCh4.get(0).getName());
+    Assert.assertEquals("ch3", optCh4.get(1).getName());
+  }
+
+  @Test
+  public void testNoMandatory() {
+
+    config.put("default", "ch3");
+    config.put("optional.foo", "ch1 ch2");
+    config.put("optional.zebra", "ch2 ch3");
+    selector = ChannelSelectorFactory.create(channels, config);
+    Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
+
+    Event event1 = new MockEvent();
+    Map<String, String> header1 = new HashMap<String, String>();
+    header1.put("myheader", "foo");// should match ch1 ch2
+    event1.setHeaders(header1);
+
+    List<Channel> reqCh1 = selector.getRequiredChannels(event1);
+    Assert.assertEquals(1, reqCh1.size());
+    Assert.assertEquals("ch3", reqCh1.get(0).getName());
+    List<Channel> optCh1 = selector.getOptionalChannels(event1);
+    Assert.assertEquals(2, optCh1.size());
+    //ch2 should not be there -- since it is a required channel
+    Assert.assertEquals("ch1", optCh1.get(0).getName());
+    Assert.assertEquals("ch2", optCh1.get(1).getName());
+
+    Event event4 = new MockEvent();
+    Map<String, String> header4 = new HashMap<String, String>();
+    header4.put("myheader", "zebra");
+    event4.setHeaders(header4);
+
+    List<Channel> reqCh4 = selector.getRequiredChannels(event4);
+    Assert.assertEquals(1, reqCh4.size());
+    Assert.assertTrue(reqCh4.get(0).getName().equals("ch3"));
+    List<Channel> optCh4 = selector.getOptionalChannels(event4);
+    //ch3 was returned as a required channel, because it is default.
+    //So it is not returned in optional
+    Assert.assertEquals(1, optCh4.size());
+    Assert.assertEquals("ch2", optCh4.get(0).getName());
+
+  }
+
+  @Test
+  public void testOnlyOptional() {
+    config.put("optional.foo", "ch1 ch2");
+    config.put("optional.zebra", "ch2 ch3");
+    selector = ChannelSelectorFactory.create(channels, config);
+    Assert.assertTrue(selector instanceof MultiplexingChannelSelector);
+
+    Event event1 = new MockEvent();
+    Map<String, String> header1 = new HashMap<String, String>();
+    header1.put("myheader", "foo");// should match ch1 ch2
+    event1.setHeaders(header1);
+
+    List<Channel> reqCh1 = selector.getRequiredChannels(event1);
+    Assert.assertTrue(reqCh1.isEmpty());
+    List<Channel> optCh1 = selector.getOptionalChannels(event1);
+    Assert.assertEquals(2,optCh1.size());
+    //ch2 should not be there -- since it is a required channel
+
+
+    Event event4 = new MockEvent();
+    Map<String, String> header4 = new HashMap<String, String>();
+    header4.put("myheader", "zebra");
+    event4.setHeaders(header4);
+
+    List<Channel> reqCh4 = selector.getRequiredChannels(event4);
+    Assert.assertTrue(reqCh4.isEmpty());
+    List<Channel> optCh4 = selector.getOptionalChannels(event4);
+    Assert.assertEquals(2, optCh4.size());
+    Assert.assertEquals("ch2", optCh4.get(0).getName());
+    Assert.assertEquals("ch3", optCh4.get(1).getName());
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1ba41f38/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f8528bb..70cb285 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -183,7 +183,7 @@ This configuration lets a user generate events and 
subsequently logs them to the
 
 This configuration defines a single agent named a1. a1 has a source that 
listens for data on port 44444, a channel
 that buffers event data in memory, and a sink that logs event data to the 
console. The configuration file names the
-various components, then describes their types and configuration parameters. A 
given configuration file might define 
+various components, then describes their types and configuration parameters. A 
given configuration file might define
 several named agents; when a given Flume process is launched a flag is passed 
telling it which named agent to manifest.
 
 Given this configuration file, we can start Flume as follows::
@@ -540,8 +540,7 @@ configured as default:
 
   <Agent>.sources.<Source1>.selector.default = <Channel2>
 
-The mapping allows overlapping the channels for each value. The default must be
-set for a multiplexing select which can also contain any number of channels.
+The mapping allows overlapping the channels for each value.
 
 The following example has a single flow that multiplexed to two paths. The
 agent named agent_foo has a single avro source and two channels linked to two 
sinks:
@@ -607,7 +606,9 @@ Note that if a header does not have any required channels, 
then the event will
 be written to the default channels and will be attempted to be written to the
 optional channels for that header. Specifying optional channels will still 
cause
 the event to be written to the default channels, if no required channels are
-specified.
+specified. If no channels are designated as default and there are no required,
+ the selector will attempt to write the events to the optional channels. Any
+failures are simply ignored in that case.
 
 
 Flume Sources
@@ -1248,7 +1249,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = hdfs
   a1.sinks.k1.channel = c1
   a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
@@ -1279,7 +1280,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = logger
   a1.sinks.k1.channel = c1
 
@@ -1310,7 +1311,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = avro
   a1.sinks.k1.channel = c1
   a1.sinks.k1.hostname = 10.10.10.10
@@ -1346,7 +1347,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = irc
   a1.sinks.k1.channel = c1
   a1.sinks.k1.hostname = irc.yourdomain.com
@@ -1375,7 +1376,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = file_roll
   a1.sinks.k1.channel = c1
   a1.sinks.k1.sink.directory = /var/log/flume
@@ -1399,7 +1400,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = null
   a1.sinks.k1.channel = c1
 
@@ -1444,7 +1445,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
   a1.sinks.k1.table = foo_table
   a1.sinks.k1.columnFamily = bar_cf
@@ -1484,7 +1485,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
   a1.sinks.k1.table = foo_table
   a1.sinks.k1.columnFamily = bar_cf
@@ -1523,7 +1524,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
   a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
   a1.sinks.k1.indexName = foo_index
@@ -1554,7 +1555,7 @@ Example for agent named a1:
 .. code-block:: properties
 
   a1.channels = c1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = org.example.MySink
   a1.sinks.k1.channel = c1
 
@@ -1713,14 +1714,14 @@ Generating a key with a password seperate from the key 
store password:
    -keysize 128 -validity 9000 -keystore test.keystore \
    -storetype jceks -storepass keyStorePassword
 
-Generating a key with the password the same as the key store password:      
+Generating a key with the password the same as the key store password:
 
 .. code-block:: bash
 
   keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
     -keystore src/test/resources/test.keystore -storetype jceks \
     -storepass keyStorePassword
-      
+
 
 .. code-block:: properties
 
@@ -2022,7 +2023,7 @@ Example for agent named a1:
 
 .. code-block:: properties
 
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.sinks.k1.type = file_roll
   a1.sinks.k1.channel = c1
   a1.sinks.k1.sink.directory = /var/log/flume
@@ -2072,7 +2073,7 @@ are named components, here is an example of how they are 
created through configu
 .. code-block:: properties
 
   a1.sources = r1
-  a1.sinks = k1 
+  a1.sinks = k1
   a1.channels = c1
   a1.sources.r1.interceptors = i1 i2
   a1.sources.r1.interceptors.i1.type = 
org.apache.flume.interceptor.HostInterceptor$Builder
@@ -2208,8 +2209,8 @@ serializers.*                    --         
Serializer-specific properties
 ================================ ========== 
=================================================================================================
 
 The serializers are used to map the matches to a header name and a formatted 
header value, by default you only need to specify
-the header name and the default 
``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` 
will be used. 
-This serializer simply maps the matches to the specified header name and 
passes the value through as it was extracted by the regex. 
+the header name and the default 
``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` 
will be used.
+This serializer simply maps the matches to the specified header name and 
passes the value through as it was extracted by the regex.
 You can plug custom serializer implementations into the extractor using the 
fully qualified class name (FQCN) to format the matches
 in anyway you like.
 
@@ -2645,7 +2646,7 @@ org.apache.flume.ChannelSelector                          
    --
 org.apache.flume.SinkProcessor                                default          
       org.apache.flume.sink.DefaultSinkProcessor
 org.apache.flume.SinkProcessor                                failover         
       org.apache.flume.sink.FailoverSinkProcessor
 org.apache.flume.SinkProcessor                                load_balance     
       org.apache.flume.sink.LoadBalancingSinkProcessor
-org.apache.flume.SinkProcessor                                --               
       
+org.apache.flume.SinkProcessor                                --
 
 org.apache.flume.interceptor.Interceptor                      timestamp        
       org.apache.flume.interceptor.TimestampInterceptor$Builder
 org.apache.flume.interceptor.Interceptor                      host             
       org.apache.flume.interceptor.HostInterceptor$Builder

Reply via email to