Updated Branches: refs/heads/trunk 15af0ce12 -> 76ef54988
FLUME-1769: Replicating channel selector should support optional channels (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/76ef5498 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/76ef5498 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/76ef5498 Branch: refs/heads/trunk Commit: 76ef54988701e405f37778b89092c1c2836f06fc Parents: 15af0ce Author: Brock Noland <[email protected]> Authored: Fri Dec 7 16:10:39 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Dec 7 16:10:39 2012 -0600 ---------------------------------------------------------------------- .../flume/channel/AbstractChannelSelector.java | 41 +++++++++ .../flume/channel/MultiplexingChannelSelector.java | 27 +------ .../flume/channel/ReplicatingChannelSelector.java | 34 +++++++- .../channel/TestReplicatingChannelSelector.java | 68 +++++++++++++-- flume-ng-doc/sphinx/FlumeUserGuide.rst | 17 +++- 5 files changed, 146 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java index 939257d..d69087f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java @@ -18,10 +18,14 @@ */ package org.apache.flume.channel; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; +import org.apache.flume.FlumeException; public abstract class AbstractChannelSelector implements ChannelSelector { @@ -48,4 +52,41 @@ public abstract class AbstractChannelSelector implements ChannelSelector { return name; } + /** + * + * @return A map of name to channel instance. + */ + + protected Map<String, Channel> getChannelNameMap() { + Map<String, Channel> channelNameMap = new HashMap<String, Channel>(); + for (Channel ch : getAllChannels()) { + channelNameMap.put(ch.getName(), ch); + } + return channelNameMap; + } + + /** + * Given a list of channel names as space delimited string, + * returns list of channels. + * @return List of {@linkplain Channel}s represented by the names. + */ + protected List<Channel> getChannelListFromNames(String channels, + Map<String, Channel> channelNameMap) { + List<Channel> configuredChannels = new ArrayList<Channel>(); + if(channels == null || channels.isEmpty()) { + return configuredChannels; + } + String[] chNames = channels.split(" "); + for (String name : chNames) { + Channel ch = channelNameMap.get(name); + if (ch != null) { + configuredChannels.add(ch); + } else { + throw new FlumeException("Selector channel not found: " + + name); + } + } + return configuredChannels; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java index 54e4b20..3e32804 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java @@ -18,7 +18,6 @@ */ package org.apache.flume.channel; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -86,10 +85,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER); - Map<String, Channel> channelNameMap = new HashMap<String, Channel>(); - for (Channel ch : getAllChannels()) { - channelNameMap.put(ch.getName(), ch); - } + Map<String, Channel> channelNameMap = getChannelNameMap(); defaultChannels = getChannelListFromNames( context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); @@ -148,25 +144,4 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { } - //Given a list of channel names as space delimited string, - //returns list of channels. - private List<Channel> getChannelListFromNames(String channels, - Map<String, Channel> channelNameMap){ - List<Channel> configuredChannels = new ArrayList<Channel>(); - if(channels == null || channels.isEmpty()) { - return configuredChannels; - } - String[] chNames = channels.split(" "); - for (String name : chNames) { - Channel ch = channelNameMap.get(name); - if (ch != null) { - configuredChannels.add(ch); - } else { - throw new FlumeException("Selector channel not found: " - + name); - } - } - return configuredChannels; - } - } http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java index 8f22746..8a0d2bd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java @@ -18,8 +18,10 @@ */ package org.apache.flume.channel; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -31,20 +33,44 @@ import org.apache.flume.Event; */ public class ReplicatingChannelSelector extends AbstractChannelSelector { - private final List<Channel> emptyList = Collections.emptyList(); + /** + * Configuration to set a subset of the channels as optional. + */ + public static final String CONFIG_OPTIONAL = "optional"; + List<Channel> requiredChannels = null; + List<Channel> optionalChannels = new ArrayList<Channel>(); @Override public List<Channel> getRequiredChannels(Event event) { - return getAllChannels(); + /* + * Seems like there are lot of components within flume that do not call + * configure method. It is conceiveable that custom component tests too + * do that. So in that case, revert to old behavior. + */ + if(requiredChannels == null) { + return getAllChannels(); + } + return requiredChannels; } @Override public List<Channel> getOptionalChannels(Event event) { - return emptyList; + return optionalChannels; } @Override public void configure(Context context) { - // No configuration necessary + String optionalList = context.getString(CONFIG_OPTIONAL); + requiredChannels = new ArrayList<Channel>(getAllChannels()); + Map<String, Channel> channelNameMap = getChannelNameMap(); + if(optionalList != null && !optionalList.isEmpty()) { + for(String optional : optionalList.split("\\s+")) { + Channel optionalChannel = channelNameMap.get(optional); + requiredChannels.remove(optionalChannel); + if (!optionalChannels.contains(optionalChannel)) { + optionalChannels.add(optionalChannel); + } + } + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java index e671364..deaf907 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestReplicatingChannelSelector.java @@ -26,6 +26,8 @@ import junit.framework.Assert; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.conf.Configurables; import org.junit.Before; import org.junit.Test; @@ -41,22 +43,76 @@ public class TestReplicatingChannelSelector { channels.add(MockChannel.createMockChannel("ch1")); channels.add(MockChannel.createMockChannel("ch2")); channels.add(MockChannel.createMockChannel("ch3")); - + channels.add(MockChannel.createMockChannel("ch4")); selector = ChannelSelectorFactory.create( channels, new HashMap<String, String>()); } @Test public void testReplicatingSelector() throws Exception { + selector.configure(new Context()); + List<Channel> channels = selector.getRequiredChannels(new MockEvent()); + Assert.assertNotNull(channels); + Assert.assertEquals(4, channels.size()); + Assert.assertEquals("ch1", channels.get(0).getName()); + Assert.assertEquals("ch2", channels.get(1).getName()); + Assert.assertEquals("ch3", channels.get(2).getName()); + Assert.assertEquals("ch4", channels.get(3).getName()); + + List<Channel> optCh = selector.getOptionalChannels(new MockEvent()); + Assert.assertEquals(0, optCh.size()); + } + + @Test + public void testOptionalChannels() throws Exception { + Context context = new Context(); + context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1"); + Configurables.configure(selector, context); List<Channel> channels = selector.getRequiredChannels(new MockEvent()); Assert.assertNotNull(channels); - Assert.assertTrue(channels.size() == 3); - Assert.assertTrue(channels.get(0).getName().equals("ch1")); - Assert.assertTrue(channels.get(1).getName().equals("ch2")); - Assert.assertTrue(channels.get(2).getName().equals("ch3")); + Assert.assertEquals(3, channels.size()); + Assert.assertEquals("ch2", channels.get(0).getName()); + Assert.assertEquals("ch3", channels.get(1).getName()); + Assert.assertEquals("ch4", channels.get(2).getName()); List<Channel> optCh = selector.getOptionalChannels(new MockEvent()); - Assert.assertTrue(optCh.size() == 0); + Assert.assertEquals(1, optCh.size()); + Assert.assertEquals("ch1", optCh.get(0).getName()); + } + + @Test + public void testMultipleOptionalChannels() throws Exception { + Context context = new Context(); + context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1 ch4"); + Configurables.configure(selector, context); + List<Channel> channels = selector.getRequiredChannels(new MockEvent()); + Assert.assertNotNull(channels); + Assert.assertEquals(2, channels.size()); + Assert.assertEquals("ch2", channels.get(0).getName()); + Assert.assertEquals("ch3", channels.get(1).getName()); + + List<Channel> optCh = selector.getOptionalChannels(new MockEvent()); + Assert.assertEquals(2, optCh.size()); + Assert.assertEquals("ch1", optCh.get(0).getName()); + Assert.assertEquals("ch4", optCh.get(1).getName()); + } + + @Test + public void testMultipleOptionalChannelsSameChannelTwice() throws Exception { + Context context = new Context(); + context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1 ch4 ch1"); + Configurables.configure(selector, context); + List<Channel> channels = selector.getRequiredChannels(new MockEvent()); + Assert.assertNotNull(channels); + Assert.assertEquals(2, channels.size()); + Assert.assertEquals("ch2", channels.get(0).getName()); + Assert.assertEquals("ch3", channels.get(1).getName()); + + List<Channel> optCh = selector.getOptionalChannels(new MockEvent()); + Assert.assertEquals(2, optCh.size()); + Assert.assertEquals("ch1", optCh.get(0).getName()); + Assert.assertEquals("ch4", optCh.get(1).getName()); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/76ef5498/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 70cb285..21ca5cc 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1806,11 +1806,12 @@ Replicating Channel Selector (default) Required properties are in **bold**. -============= =========== ================================================ -Property Name Default Description -============= =========== ================================================ -selector.type replicating The component type name, needs to be ``replicating`` -============= =========== ================================================ +================== =========== ==================================================== +Property Name Default Description +================== =========== ==================================================== +selector.type replicating The component type name, needs to be ``replicating`` +selector.optional -- Set of channels to be marked as ``optional`` +================== =========== ==================================================== Example for agent named a1 and it's source called r1: @@ -1820,6 +1821,12 @@ Example for agent named a1 and it's source called r1: a1.channels = c1 c2 c3 a1.source.r1.selector.type = replicating a1.source.r1.channels = c1 c2 c3 + a1.source.r1.selector.optional = c3 + +In the above configuration, c3 is an optional channel. Failure to write to c3 is +simply ignored. Since c1 and c2 are not marked optional, failure to write to +those channels will cause the transaction to fail. + Multiplexing Channel Selector ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
