Updated Branches: refs/heads/trunk 123c7bf6d -> 5fdf673d1
FLUME-1772. AbstractConfigurationProvider should remove component which throws exception from configure method. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5fdf673d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5fdf673d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5fdf673d Branch: refs/heads/trunk Commit: 5fdf673d1a9cbab9144286a3afc335f8fcf7cd6c Parents: 123c7bf Author: Hari Shreedharan <[email protected]> Authored: Wed Dec 19 20:53:29 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Dec 19 20:54:17 2012 -0800 ---------------------------------------------------------------------- .../flume/node/AbstractConfigurationProvider.java | 312 ++++++++++----- .../node/TestAbstractConfigurationProvider.java | 113 +++++- 2 files changed, 330 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5fdf673d/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index daef76b..e63c601 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -19,6 +19,7 @@ package org.apache.flume.node; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,9 +57,11 @@ import org.apache.flume.source.DefaultSourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public abstract class AbstractConfigurationProvider implements ConfigurationProvider { @@ -71,7 +74,8 @@ public abstract class AbstractConfigurationProvider implements private final SinkFactory sinkFactory; private final ChannelFactory channelFactory; - private final Map<Class<? extends Channel>, Map<String, Channel>> channels; + + private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache; public AbstractConfigurationProvider(String agentName) { super(); @@ -80,7 +84,7 @@ public abstract class AbstractConfigurationProvider implements this.sinkFactory = new DefaultSinkFactory(); this.channelFactory = new DefaultChannelFactory(); - channels = new HashMap<Class<? extends Channel>, Map<String, Channel>>(); + channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>(); } protected abstract FlumeConfiguration getFlumeConfiguration(); @@ -90,12 +94,45 @@ public abstract class AbstractConfigurationProvider implements FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { + Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); + Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); + Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { - loadChannels(agentConf, conf); - loadSources(agentConf, conf); - loadSinks(agentConf, conf); + loadChannels(agentConf, channelComponentMap); + loadSources(agentConf, channelComponentMap, sourceRunnerMap); + loadSinks(agentConf, channelComponentMap, sinkRunnerMap); + Set<String> channelNames = + new HashSet<String>(channelComponentMap.keySet()); + for(String channelName : channelNames) { + ChannelComponent channelComponent = channelComponentMap. + get(channelName); + if(channelComponent.components.isEmpty()) { + LOGGER.warn(String.format("Channel %s has no components connected" + + " and has been removed.", channelName)); + channelComponentMap.remove(channelName); + Map<String, Channel> nameChannelMap = channelCache. + get(channelComponent.channel.getClass()); + if(nameChannelMap != null) { + nameChannelMap.remove(channelName); + } + } else { + LOGGER.info(String.format("Channel %s connected to %s", + channelName, channelComponent.components.toString())); + conf.addChannel(channelName, channelComponent.channel); + } + } + for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { + conf.addSourceRunner(entry.getKey(), entry.getValue()); + } + for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { + conf.addSinkRunner(entry.getKey(), entry.getValue()); + } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); + } finally { + channelComponentMap.clear(); + sourceRunnerMap.clear(); + sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); @@ -107,9 +144,9 @@ public abstract class AbstractConfigurationProvider implements return agentName; } - private void loadChannels(AgentConfiguration agentConf, - MaterializedConfiguration conf) throws InstantiationException { + Map<String, ChannelComponent> channelComponentMap) + throws InstantiationException { LOGGER.info("Creating channels"); /* @@ -123,7 +160,7 @@ public abstract class AbstractConfigurationProvider implements ListMultimap<Class<? extends Channel>, String> channelsNotReused = ArrayListMultimap.create(); // assume all channels will not be re-used - for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channels.entrySet()) { + for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) { Class<? extends Channel> channelKlass = entry.getKey(); Set<String> channelNames = entry.getValue().keySet(); channelsNotReused.get(channelKlass).addAll(channelNames); @@ -140,9 +177,16 @@ public abstract class AbstractConfigurationProvider implements if(comp != null) { Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType()); - Configurables.configure(channel, comp); - conf.addChannel(comp.getComponentName(), channel); - LOGGER.info("Created channel " + chName); + try { + Configurables.configure(channel, comp); + channelComponentMap.put(comp.getComponentName(), + new ChannelComponent(channel)); + LOGGER.info("Created channel " + chName); + } catch (Exception e) { + String msg = String.format("Channel %s has been removed due to an " + + "error during configuration", chName); + LOGGER.error(msg, e); + } } } /* @@ -155,16 +199,22 @@ public abstract class AbstractConfigurationProvider implements Channel channel = getOrCreateChannel(channelsNotReused, chName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); - Configurables.configure(channel, context); - conf.addChannel(chName, channel); - LOGGER.info("Created channel " + chName); + try { + Configurables.configure(channel, context); + channelComponentMap.put(chName, new ChannelComponent(channel)); + LOGGER.info("Created channel " + chName); + } catch (Exception e) { + String msg = String.format("Channel %s has been removed due to an " + + "error during configuration", chName); + LOGGER.error(msg, e); + } } } /* * Any channel which was not re-used, will have it's reference removed */ for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) { - Map<String, Channel> channelMap = channels.get(channelKlass); + Map<String, Channel> channelMap = channelCache.get(channelKlass); if (channelMap != null) { for (String channelName : channelsNotReused.get(channelKlass)) { if(channelMap.remove(channelName) != null) { @@ -172,7 +222,7 @@ public abstract class AbstractConfigurationProvider implements } } if (channelMap.isEmpty()) { - channels.remove(channelKlass); + channelCache.remove(channelKlass); } } } @@ -193,10 +243,10 @@ public abstract class AbstractConfigurationProvider implements channel.setName(name); return channel; } - Map<String, Channel> channelMap = channels.get(channelClass); + Map<String, Channel> channelMap = channelCache.get(channelClass); if (channelMap == null) { channelMap = new HashMap<String, Channel>(); - channels.put(channelClass, channelMap); + channelCache.put(channelClass, channelMap); } Channel channel = channelMap.get(name); if(channel == null) { @@ -208,42 +258,62 @@ public abstract class AbstractConfigurationProvider implements return channel; } - private void loadSources(AgentConfiguration agentConf, MaterializedConfiguration conf) + private void loadSources(AgentConfiguration agentConf, + Map<String, ChannelComponent> channelComponentMap, + Map<String, SourceRunner> sourceRunnerMap) throws InstantiationException { - Set<String> sources = agentConf.getSourceSet(); + Set<String> sourceNames = agentConf.getSourceSet(); Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap(); /* * Components which have a ComponentConfiguration object */ - for (String sourceName : sources) { + for (String sourceName : sourceNames) { ComponentConfiguration comp = compMap.get(sourceName); if(comp != null) { SourceConfiguration config = (SourceConfiguration) comp; Source source = sourceFactory.create(comp.getComponentName(), comp.getType()); - - Configurables.configure(source, config); - Set<String> channelNames = config.getChannels(); - List<Channel> channels = new ArrayList<Channel>(); - for (String chName : channelNames) { - channels.add(conf.getChannels().get(chName)); + try { + Configurables.configure(source, config); + Set<String> channelNames = config.getChannels(); + List<Channel> sourceChannels = new ArrayList<Channel>(); + for (String chName : channelNames) { + ChannelComponent channelComponent = channelComponentMap.get(chName); + if(channelComponent != null) { + sourceChannels.add(channelComponent.channel); + } + } + if(sourceChannels.isEmpty()) { + String msg = String.format("Source %s is not connected to a " + + "channel", sourceName); + throw new IllegalStateException(msg); + } + ChannelSelectorConfiguration selectorConfig = + config.getSelectorConfiguration(); + + ChannelSelector selector = ChannelSelectorFactory.create( + sourceChannels, selectorConfig); + + ChannelProcessor channelProcessor = new ChannelProcessor(selector); + Configurables.configure(channelProcessor, config); + + source.setChannelProcessor(channelProcessor); + sourceRunnerMap.put(comp.getComponentName(), + SourceRunner.forSource(source)); + for(Channel channel : sourceChannels) { + ChannelComponent channelComponent = Preconditions. + checkNotNull(channelComponentMap.get(channel.getName()), + String.format("Channel %s", channel.getName())); + channelComponent.components.add(sourceName); + } + } catch (Exception e) { + String msg = String.format("Source %s has been removed due to an " + + "error during configuration", sourceName); + LOGGER.error(msg, e); } - - ChannelSelectorConfiguration selectorConfig = - config.getSelectorConfiguration(); - - ChannelSelector selector = ChannelSelectorFactory.create( - channels, selectorConfig); - - ChannelProcessor channelProcessor = new ChannelProcessor(selector); - Configurables.configure(channelProcessor, config); - - source.setChannelProcessor(channelProcessor); - conf.addSourceRunner(comp.getComponentName(), - SourceRunner.forSource(source)); } } /* @@ -251,41 +321,58 @@ public abstract class AbstractConfigurationProvider implements * and use only Context */ Map<String, Context> sourceContexts = agentConf.getSourceContext(); - for (String sourceName : sources) { + for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); if(context != null){ Source source = sourceFactory.create(sourceName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); - List<Channel> channels = new ArrayList<Channel>(); - Configurables.configure(source, context); - String[] channelNames = context.getString( - BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); - for (String chName : channelNames) { - channels.add(conf.getChannels().get(chName)); + try { + Configurables.configure(source, context); + List<Channel> sourceChannels = new ArrayList<Channel>(); + String[] channelNames = context.getString( + BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); + for (String chName : channelNames) { + ChannelComponent channelComponent = channelComponentMap.get(chName); + if(channelComponent != null) { + sourceChannels.add(channelComponent.channel); + } + } + if(sourceChannels.isEmpty()) { + String msg = String.format("Source %s is not connected to a " + + "channel", sourceName); + throw new IllegalStateException(msg); + } + Map<String, String> selectorConfig = context.getSubProperties( + BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); + + ChannelSelector selector = ChannelSelectorFactory.create( + sourceChannels, selectorConfig); + + ChannelProcessor channelProcessor = new ChannelProcessor(selector); + Configurables.configure(channelProcessor, context); + source.setChannelProcessor(channelProcessor); + sourceRunnerMap.put(sourceName, + SourceRunner.forSource(source)); + for(Channel channel : sourceChannels) { + ChannelComponent channelComponent = Preconditions. + checkNotNull(channelComponentMap.get(channel.getName()), + String.format("Channel %s", channel.getName())); + channelComponent.components.add(sourceName); + } + } catch (Exception e) { + String msg = String.format("Source %s has been removed due to an " + + "error during configuration", sourceName); + LOGGER.error(msg, e); } - - Map<String, String> selectorConfig = context.getSubProperties( - BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); - - ChannelSelector selector = ChannelSelectorFactory.create( - channels, selectorConfig); - - ChannelProcessor channelProcessor = new ChannelProcessor(selector); - Configurables.configure(channelProcessor, context); - - source.setChannelProcessor(channelProcessor); - conf.addSourceRunner(sourceName, - SourceRunner.forSource(source)); - } } } - private void loadSinks(AgentConfiguration agentConf, MaterializedConfiguration conf) + private void loadSinks(AgentConfiguration agentConf, + Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap) throws InstantiationException { Set<String> sinkNames = agentConf.getSinkSet(); - ImmutableMap<String,Channel> channels = conf.getChannels(); Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap(); Map<String, Sink> sinks = new HashMap<String, Sink>(); @@ -298,11 +385,23 @@ public abstract class AbstractConfigurationProvider implements SinkConfiguration config = (SinkConfiguration) comp; Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); - - Configurables.configure(sink, config); - - sink.setChannel(channels.get(config.getChannel())); - sinks.put(comp.getComponentName(), sink); + try { + Configurables.configure(sink, config); + ChannelComponent channelComponent = channelComponentMap. + get(config.getChannel()); + if(channelComponent == null) { + String msg = String.format("Sink %s is not connected to a " + + "channel", sinkName); + throw new IllegalStateException(msg); + } + sink.setChannel(channelComponent.channel); + sinks.put(comp.getComponentName(), sink); + channelComponent.components.add(sinkName); + } catch (Exception e) { + String msg = String.format("Sink %s has been removed due to an " + + "error during configuration", sinkName); + LOGGER.error(msg, e); + } } } /* @@ -315,31 +414,42 @@ public abstract class AbstractConfigurationProvider implements if(context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); - Configurables.configure(sink, context); - - sink.setChannel(channels.get(context.getString( - BasicConfigurationConstants.CONFIG_CHANNEL))); - sinks.put(sinkName, sink); + try { + Configurables.configure(sink, context); + ChannelComponent channelComponent = channelComponentMap. + get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); + if(channelComponent == null) { + String msg = String.format("Sink %s is not connected to a " + + "channel", sinkName); + throw new IllegalStateException(msg); + } + sink.setChannel(channelComponent.channel); + sinks.put(sinkName, sink); + channelComponent.components.add(sinkName); + } catch (Exception e) { + String msg = String.format("Sink %s has been removed due to an " + + "error during configuration", sinkName); + LOGGER.error(msg, e); + } } } - loadSinkGroups(agentConf, sinks, conf); + loadSinkGroups(agentConf, sinks, sinkRunnerMap); } private void loadSinkGroups(AgentConfiguration agentConf, - Map<String, Sink> sinks, MaterializedConfiguration conf) + Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap) throws InstantiationException { - Set<String> sinkgroupNames = agentConf.getSinkgroupSet(); + Set<String> sinkGroupNames = agentConf.getSinkgroupSet(); Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap(); Map<String, String> usedSinks = new HashMap<String, String>(); - for (String groupName: sinkgroupNames) { + for (String groupName: sinkGroupNames) { ComponentConfiguration comp = compMap.get(groupName); if(comp != null) { SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp; - List<String> groupSinkList = groupConf.getSinks(); List<Sink> groupSinks = new ArrayList<Sink>(); - for (String sink : groupSinkList) { + for (String sink : groupConf.getSinks()) { Sink s = sinks.remove(sink); if (s == null) { String sinkUser = usedSinks.get(sink); @@ -357,23 +467,43 @@ public abstract class AbstractConfigurationProvider implements groupSinks.add(s); usedSinks.put(sink, groupName); } - SinkGroup group = new SinkGroup(groupSinks); - Configurables.configure(group, groupConf); - conf.addSinkRunner(comp.getComponentName(), - new SinkRunner(group.getProcessor())); + try { + SinkGroup group = new SinkGroup(groupSinks); + Configurables.configure(group, groupConf); + sinkRunnerMap.put(comp.getComponentName(), + new SinkRunner(group.getProcessor())); + } catch (Exception e) { + String msg = String.format("SinkGroup %s has been removed due to " + + "an error during configuration", groupName); + LOGGER.error(msg, e); + } } } // add any unassigned sinks to solo collectors for(Entry<String, Sink> entry : sinks.entrySet()) { if (!usedSinks.containsValue(entry.getKey())) { - SinkProcessor pr = new DefaultSinkProcessor(); - List<Sink> sinkMap = new ArrayList<Sink>(); - sinkMap.add(entry.getValue()); - pr.setSinks(sinkMap); - Configurables.configure(pr, new Context()); - conf.addSinkRunner(entry.getKey(), - new SinkRunner(pr)); + try { + SinkProcessor pr = new DefaultSinkProcessor(); + List<Sink> sinkMap = new ArrayList<Sink>(); + sinkMap.add(entry.getValue()); + pr.setSinks(sinkMap); + Configurables.configure(pr, new Context()); + sinkRunnerMap.put(entry.getKey(), + new SinkRunner(pr)); + } catch(Exception e) { + String msg = String.format("SinkGroup %s has been removed due to " + + "an error during configuration", entry.getKey()); + LOGGER.error(msg, e); + } } } } + private static class ChannelComponent { + final Channel channel; + final List<String> components; + ChannelComponent(Channel channel) { + this.channel = channel; + components = Lists.newArrayList(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/5fdf673d/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java index 25001b1..15a478d 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java @@ -23,12 +23,17 @@ import junit.framework.Assert; import org.apache.flume.Channel; import org.apache.flume.ChannelException; +import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.annotations.Disposable; import org.apache.flume.annotations.Recyclable; import org.apache.flume.channel.AbstractChannel; +import org.apache.flume.conf.Configurable; import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.sink.AbstractSink; +import org.apache.flume.source.AbstractSource; import org.junit.Test; import com.google.common.collect.Maps; @@ -115,20 +120,84 @@ public class TestAbstractConfigurationProvider { Assert.assertNotSame(channel1, channel3); } - - private Map<String, String> getPropertiesForChannel(String agentName, String channelType) { + @Test + public void testSourceThrowsExceptionDuringConfiguration() throws Exception { + String agentName = "agent1"; + String sourceType = UnconfigurableSource.class.getName(); + String channelType = "memory"; + String sinkType = "null"; + Map<String, String> properties = getProperties(agentName, sourceType, + channelType, sinkType); + MemoryConfigurationProvider provider = + new MemoryConfigurationProvider(agentName, properties); + MaterializedConfiguration config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 0); + Assert.assertTrue(config.getChannels().size() == 1); + Assert.assertTrue(config.getSinkRunners().size() == 1); + } + @Test + public void testChannelThrowsExceptionDuringConfiguration() throws Exception { + String agentName = "agent1"; + String sourceType = "seq"; + String channelType = UnconfigurableChannel.class.getName(); + String sinkType = "null"; + Map<String, String> properties = getProperties(agentName, sourceType, + channelType, sinkType); + MemoryConfigurationProvider provider = + new MemoryConfigurationProvider(agentName, properties); + MaterializedConfiguration config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 0); + Assert.assertTrue(config.getChannels().size() == 0); + Assert.assertTrue(config.getSinkRunners().size() == 0); + } + @Test + public void testSinkThrowsExceptionDuringConfiguration() throws Exception { + String agentName = "agent1"; + String sourceType = "seq"; + String channelType = "memory"; + String sinkType = UnconfigurableSink.class.getName(); + Map<String, String> properties = getProperties(agentName, sourceType, + channelType, sinkType); + MemoryConfigurationProvider provider = + new MemoryConfigurationProvider(agentName, properties); + MaterializedConfiguration config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 1); + Assert.assertTrue(config.getChannels().size() == 1); + Assert.assertTrue(config.getSinkRunners().size() == 0); + } + @Test + public void testSourceAndSinkThrowExceptionDuringConfiguration() + throws Exception { + String agentName = "agent1"; + String sourceType = UnconfigurableSource.class.getName(); + String channelType = "memory"; + String sinkType = UnconfigurableSink.class.getName(); + Map<String, String> properties = getProperties(agentName, sourceType, + channelType, sinkType); + MemoryConfigurationProvider provider = + new MemoryConfigurationProvider(agentName, properties); + MaterializedConfiguration config = provider.getConfiguration(); + Assert.assertTrue(config.getSourceRunners().size() == 0); + Assert.assertTrue(config.getChannels().size() == 0); + Assert.assertTrue(config.getSinkRunners().size() == 0); + } + private Map<String, String> getProperties(String agentName, + String sourceType, String channelType, String sinkType) { Map<String, String> properties = Maps.newHashMap(); properties.put(agentName + ".sources", "source1"); properties.put(agentName + ".channels", "channel1"); properties.put(agentName + ".sinks", "sink1"); - properties.put(agentName + ".sources.source1.type", "seq"); + properties.put(agentName + ".sources.source1.type", sourceType); properties.put(agentName + ".sources.source1.channels", "channel1"); properties.put(agentName + ".channels.channel1.type", channelType); properties.put(agentName + ".channels.channel1.capacity", "100"); - properties.put(agentName + ".sinks.sink1.type", "null"); + properties.put(agentName + ".sinks.sink1.type", sinkType); properties.put(agentName + ".sinks.sink1.channel", "channel1"); return properties; } + private Map<String, String> getPropertiesForChannel(String agentName, String channelType) { + return getProperties(agentName, "seq", channelType, "null"); + } public static class MemoryConfigurationProvider extends AbstractConfigurationProvider { private Map<String, String> properties; @@ -190,4 +259,40 @@ public class TestAbstractConfigurationProvider { throw new UnsupportedOperationException(); } } + public static class UnconfigurableChannel extends AbstractChannel { + @Override + public void configure(Context context) { + throw new RuntimeException("expected"); + } + @Override + public void put(Event event) throws ChannelException { + throw new UnsupportedOperationException(); + } + @Override + public Event take() throws ChannelException { + throw new UnsupportedOperationException(); + } + @Override + public Transaction getTransaction() { + throw new UnsupportedOperationException(); + } + } + public static class UnconfigurableSource extends AbstractSource + implements Configurable { + @Override + public void configure(Context context) { + throw new RuntimeException("expected"); + } + } + public static class UnconfigurableSink extends AbstractSink + implements Configurable { + @Override + public void configure(Context context) { + throw new RuntimeException("expected"); + } + @Override + public Status process() throws EventDeliveryException { + throw new UnsupportedOperationException(); + } + } }
