Updated Branches:
  refs/heads/trunk 15af0ce12 -> 76ef54988

FLUME-1769: Replicating channel selector should support optional channels

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/76ef5498
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/76ef5498
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/76ef5498

Branch: refs/heads/trunk
Commit: 76ef54988701e405f37778b89092c1c2836f06fc
Parents: 15af0ce
Author: Brock Noland <[email protected]>
Authored: Fri Dec 7 16:10:39 2012 -0600
Committer: Brock Noland <[email protected]>
Committed: Fri Dec 7 16:10:39 2012 -0600

----------------------------------------------------------------------
 .../flume/channel/AbstractChannelSelector.java     |   41 +++++++++
 .../flume/channel/MultiplexingChannelSelector.java |   27 +------
 .../flume/channel/ReplicatingChannelSelector.java  |   34 +++++++-
 .../channel/TestReplicatingChannelSelector.java    |   68 +++++++++++++--
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   17 +++-
 5 files changed, 146 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java
 
b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java
index 939257d..d69087f 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java
@@ -18,10 +18,14 @@
  */
 package org.apache.flume.channel;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
+import org.apache.flume.FlumeException;
 
 public abstract class AbstractChannelSelector implements ChannelSelector {
 
@@ -48,4 +52,41 @@ public abstract class AbstractChannelSelector implements 
ChannelSelector {
     return name;
   }
 
+  /**
+   *
+   * @return A map of name to channel instance.
+   */
+
+  protected Map<String, Channel> getChannelNameMap() {
+    Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
+    for (Channel ch : getAllChannels()) {
+      channelNameMap.put(ch.getName(), ch);
+    }
+    return channelNameMap;
+  }
+
+  /**
+   * Given a list of channel names as space delimited string,
+   * returns list of channels.
+   * @return List of {@linkplain Channel}s represented by the names.
+   */
+  protected List<Channel> getChannelListFromNames(String channels,
+          Map<String, Channel> channelNameMap) {
+    List<Channel> configuredChannels = new ArrayList<Channel>();
+    if(channels == null || channels.isEmpty()) {
+      return configuredChannels;
+    }
+    String[] chNames = channels.split(" ");
+    for (String name : chNames) {
+      Channel ch = channelNameMap.get(name);
+      if (ch != null) {
+        configuredChannels.add(ch);
+      } else {
+        throw new FlumeException("Selector channel not found: "
+                + name);
+      }
+    }
+    return configuredChannels;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
index 54e4b20..3e32804 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -86,10 +85,7 @@ public class MultiplexingChannelSelector extends 
AbstractChannelSelector {
     this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
         DEFAULT_MULTIPLEX_HEADER);
 
-    Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
-    for (Channel ch : getAllChannels()) {
-      channelNameMap.put(ch.getName(), ch);
-    }
+    Map<String, Channel> channelNameMap = getChannelNameMap();
 
     defaultChannels = getChannelListFromNames(
         context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);
@@ -148,25 +144,4 @@ public class MultiplexingChannelSelector extends 
AbstractChannelSelector {
 
   }
 
-  //Given a list of channel names as space delimited string,
-  //returns list of channels.
-  private List<Channel> getChannelListFromNames(String channels,
-      Map<String, Channel> channelNameMap){
-    List<Channel> configuredChannels = new ArrayList<Channel>();
-    if(channels == null || channels.isEmpty()) {
-      return configuredChannels;
-    }
-    String[] chNames = channels.split(" ");
-    for (String name : chNames) {
-      Channel ch = channelNameMap.get(name);
-      if (ch != null) {
-        configuredChannels.add(ch);
-      } else {
-        throw new FlumeException("Selector channel not found: "
-            + name);
-      }
-    }
-    return configuredChannels;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java
 
b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java
index 8f22746..8a0d2bd 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java
@@ -18,8 +18,10 @@
  */
 package org.apache.flume.channel;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -31,20 +33,44 @@ import org.apache.flume.Event;
  */
 public class ReplicatingChannelSelector extends AbstractChannelSelector {
 
-  private final List<Channel> emptyList = Collections.emptyList();
+  /**
+   * Configuration to set a subset of the channels as optional.
+   */
+  public static final String CONFIG_OPTIONAL = "optional";
+  List<Channel> requiredChannels = null;
+  List<Channel> optionalChannels = new ArrayList<Channel>();
 
   @Override
   public List<Channel> getRequiredChannels(Event event) {
-    return getAllChannels();
+    /*
+     * Seems like there are lot of components within flume that do not call
+     * configure method. It is conceiveable that custom component tests too
+     * do that. So in that case, revert to old behavior.
+     */
+    if(requiredChannels == null) {
+      return getAllChannels();
+    }
+    return requiredChannels;
   }
 
   @Override
   public List<Channel> getOptionalChannels(Event event) {
-    return emptyList;
+    return optionalChannels;
   }
 
   @Override
   public void configure(Context context) {
-    // No configuration necessary
+    String optionalList = context.getString(CONFIG_OPTIONAL);
+    requiredChannels = new ArrayList<Channel>(getAllChannels());
+    Map<String, Channel> channelNameMap = getChannelNameMap();
+    if(optionalList != null && !optionalList.isEmpty()) {
+      for(String optional : optionalList.split("\\s+")) {
+        Channel optionalChannel = channelNameMap.get(optional);
+        requiredChannels.remove(optionalChannel);
+        if (!optionalChannels.contains(optionalChannel)) {
+          optionalChannels.add(optionalChannel);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java
 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java
index e671364..deaf907 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java
@@ -26,6 +26,8 @@ import junit.framework.Assert;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -41,22 +43,76 @@ public class TestReplicatingChannelSelector {
     channels.add(MockChannel.createMockChannel("ch1"));
     channels.add(MockChannel.createMockChannel("ch2"));
     channels.add(MockChannel.createMockChannel("ch3"));
-
+    channels.add(MockChannel.createMockChannel("ch4"));
     selector = ChannelSelectorFactory.create(
         channels, new HashMap<String, String>());
   }
 
   @Test
   public void testReplicatingSelector() throws Exception {
+    selector.configure(new Context());
+    List<Channel> channels = selector.getRequiredChannels(new MockEvent());
+    Assert.assertNotNull(channels);
+    Assert.assertEquals(4, channels.size());
+    Assert.assertEquals("ch1", channels.get(0).getName());
+    Assert.assertEquals("ch2", channels.get(1).getName());
+    Assert.assertEquals("ch3", channels.get(2).getName());
+    Assert.assertEquals("ch4", channels.get(3).getName());
+
+    List<Channel> optCh = selector.getOptionalChannels(new MockEvent());
+    Assert.assertEquals(0, optCh.size());
+  }
+
+  @Test
+  public void testOptionalChannels() throws Exception {
+    Context context = new Context();
+    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1");
+    Configurables.configure(selector, context);
     List<Channel> channels = selector.getRequiredChannels(new MockEvent());
     Assert.assertNotNull(channels);
-    Assert.assertTrue(channels.size() == 3);
-    Assert.assertTrue(channels.get(0).getName().equals("ch1"));
-    Assert.assertTrue(channels.get(1).getName().equals("ch2"));
-    Assert.assertTrue(channels.get(2).getName().equals("ch3"));
+    Assert.assertEquals(3, channels.size());
+    Assert.assertEquals("ch2", channels.get(0).getName());
+    Assert.assertEquals("ch3", channels.get(1).getName());
+    Assert.assertEquals("ch4", channels.get(2).getName());
 
     List<Channel> optCh = selector.getOptionalChannels(new MockEvent());
-    Assert.assertTrue(optCh.size() == 0);
+    Assert.assertEquals(1, optCh.size());
+    Assert.assertEquals("ch1", optCh.get(0).getName());
+
   }
 
+
+  @Test
+  public void testMultipleOptionalChannels() throws Exception {
+    Context context = new Context();
+    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1 ch4");
+    Configurables.configure(selector, context);
+    List<Channel> channels = selector.getRequiredChannels(new MockEvent());
+    Assert.assertNotNull(channels);
+    Assert.assertEquals(2, channels.size());
+    Assert.assertEquals("ch2", channels.get(0).getName());
+    Assert.assertEquals("ch3", channels.get(1).getName());
+
+    List<Channel> optCh = selector.getOptionalChannels(new MockEvent());
+    Assert.assertEquals(2, optCh.size());
+    Assert.assertEquals("ch1", optCh.get(0).getName());
+    Assert.assertEquals("ch4", optCh.get(1).getName());
+  }
+
+  @Test
+  public void testMultipleOptionalChannelsSameChannelTwice() throws Exception {
+    Context context = new Context();
+    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1 ch4 ch1");
+    Configurables.configure(selector, context);
+    List<Channel> channels = selector.getRequiredChannels(new MockEvent());
+    Assert.assertNotNull(channels);
+    Assert.assertEquals(2, channels.size());
+    Assert.assertEquals("ch2", channels.get(0).getName());
+    Assert.assertEquals("ch3", channels.get(1).getName());
+
+    List<Channel> optCh = selector.getOptionalChannels(new MockEvent());
+    Assert.assertEquals(2, optCh.size());
+    Assert.assertEquals("ch1", optCh.get(0).getName());
+    Assert.assertEquals("ch4", optCh.get(1).getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 70cb285..21ca5cc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1806,11 +1806,12 @@ Replicating Channel Selector (default)
 
 Required properties are in **bold**.
 
-=============  ===========  ================================================
-Property Name  Default      Description
-=============  ===========  ================================================
-selector.type  replicating  The component type name, needs to be 
``replicating``
-=============  ===========  ================================================
+==================  ===========  
====================================================
+Property Name       Default      Description
+==================  ===========  
====================================================
+selector.type       replicating  The component type name, needs to be 
``replicating``
+selector.optional   --           Set of channels to be marked as ``optional``
+==================  ===========  
====================================================
 
 Example for agent named a1 and it's source called r1:
 
@@ -1820,6 +1821,12 @@ Example for agent named a1 and it's source called r1:
   a1.channels = c1 c2 c3
   a1.source.r1.selector.type = replicating
   a1.source.r1.channels = c1 c2 c3
+  a1.source.r1.selector.optional = c3
+
+In the above configuration, c3 is an optional channel. Failure to write to c3 
is
+simply ignored. Since c1 and c2 are not marked optional, failure to write to
+those channels will cause the transaction to fail.
+
 
 Multiplexing Channel Selector
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Reply via email to