Updated Branches:
  refs/heads/trunk 123c7bf6d -> 5fdf673d1

FLUME-1772. AbstractConfigurationProvider should remove component which throws 
exception from configure method.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5fdf673d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5fdf673d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5fdf673d

Branch: refs/heads/trunk
Commit: 5fdf673d1a9cbab9144286a3afc335f8fcf7cd6c
Parents: 123c7bf
Author: Hari Shreedharan <[email protected]>
Authored: Wed Dec 19 20:53:29 2012 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Wed Dec 19 20:54:17 2012 -0800

----------------------------------------------------------------------
 .../flume/node/AbstractConfigurationProvider.java  |  312 ++++++++++-----
 .../node/TestAbstractConfigurationProvider.java    |  113 +++++-
 2 files changed, 330 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5fdf673d/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
 
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index daef76b..e63c601 100644
--- 
a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ 
b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -19,6 +19,7 @@ package org.apache.flume.node;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,9 +57,11 @@ import org.apache.flume.source.DefaultSourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public abstract class AbstractConfigurationProvider implements
     ConfigurationProvider {
@@ -71,7 +74,8 @@ public abstract class AbstractConfigurationProvider implements
   private final SinkFactory sinkFactory;
   private final ChannelFactory channelFactory;
 
-  private final Map<Class<? extends Channel>, Map<String, Channel>> channels;
+
+  private final Map<Class<? extends Channel>, Map<String, Channel>> 
channelCache;
 
   public AbstractConfigurationProvider(String agentName) {
     super();
@@ -80,7 +84,7 @@ public abstract class AbstractConfigurationProvider implements
     this.sinkFactory = new DefaultSinkFactory();
     this.channelFactory = new DefaultChannelFactory();
 
-    channels = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
+    channelCache = new HashMap<Class<? extends Channel>, Map<String, 
Channel>>();
   }
 
   protected abstract FlumeConfiguration getFlumeConfiguration();
@@ -90,12 +94,45 @@ public abstract class AbstractConfigurationProvider 
implements
     FlumeConfiguration fconfig = getFlumeConfiguration();
     AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
     if (agentConf != null) {
+      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
+      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
+      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
       try {
-        loadChannels(agentConf, conf);
-        loadSources(agentConf, conf);
-        loadSinks(agentConf, conf);
+        loadChannels(agentConf, channelComponentMap);
+        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
+        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
+        Set<String> channelNames =
+            new HashSet<String>(channelComponentMap.keySet());
+        for(String channelName : channelNames) {
+          ChannelComponent channelComponent = channelComponentMap.
+              get(channelName);
+          if(channelComponent.components.isEmpty()) {
+            LOGGER.warn(String.format("Channel %s has no components connected" 
+
+                " and has been removed.", channelName));
+            channelComponentMap.remove(channelName);
+            Map<String, Channel> nameChannelMap = channelCache.
+                get(channelComponent.channel.getClass());
+            if(nameChannelMap != null) {
+              nameChannelMap.remove(channelName);
+            }
+          } else {
+            LOGGER.info(String.format("Channel %s connected to %s",
+                channelName, channelComponent.components.toString()));
+            conf.addChannel(channelName, channelComponent.channel);
+          }
+        }
+        for(Map.Entry<String, SourceRunner> entry : 
sourceRunnerMap.entrySet()) {
+          conf.addSourceRunner(entry.getKey(), entry.getValue());
+        }
+        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
+          conf.addSinkRunner(entry.getKey(), entry.getValue());
+        }
       } catch (InstantiationException ex) {
         LOGGER.error("Failed to instantiate component", ex);
+      } finally {
+        channelComponentMap.clear();
+        sourceRunnerMap.clear();
+        sinkRunnerMap.clear();
       }
     } else {
       LOGGER.warn("No configuration found for this host:{}", getAgentName());
@@ -107,9 +144,9 @@ public abstract class AbstractConfigurationProvider 
implements
     return agentName;
   }
 
-
   private void loadChannels(AgentConfiguration agentConf,
-      MaterializedConfiguration conf) throws InstantiationException {
+      Map<String, ChannelComponent> channelComponentMap)
+          throws InstantiationException {
     LOGGER.info("Creating channels");
 
     /*
@@ -123,7 +160,7 @@ public abstract class AbstractConfigurationProvider 
implements
     ListMultimap<Class<? extends Channel>, String> channelsNotReused =
         ArrayListMultimap.create();
     // assume all channels will not be re-used
-    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : 
channels.entrySet()) {
+    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : 
channelCache.entrySet()) {
       Class<? extends Channel> channelKlass = entry.getKey();
       Set<String> channelNames = entry.getValue().keySet();
       channelsNotReused.get(channelKlass).addAll(channelNames);
@@ -140,9 +177,16 @@ public abstract class AbstractConfigurationProvider 
implements
       if(comp != null) {
         Channel channel = getOrCreateChannel(channelsNotReused,
             comp.getComponentName(), comp.getType());
-        Configurables.configure(channel, comp);
-        conf.addChannel(comp.getComponentName(), channel);
-        LOGGER.info("Created channel " + chName);
+        try {
+          Configurables.configure(channel, comp);
+          channelComponentMap.put(comp.getComponentName(),
+              new ChannelComponent(channel));
+          LOGGER.info("Created channel " + chName);
+        } catch (Exception e) {
+          String msg = String.format("Channel %s has been removed due to an " +
+              "error during configuration", chName);
+          LOGGER.error(msg, e);
+        }
       }
     }
     /*
@@ -155,16 +199,22 @@ public abstract class AbstractConfigurationProvider 
implements
         Channel channel =
             getOrCreateChannel(channelsNotReused, chName, context.getString(
                 BasicConfigurationConstants.CONFIG_TYPE));
-        Configurables.configure(channel, context);
-        conf.addChannel(chName, channel);
-        LOGGER.info("Created channel " + chName);
+        try {
+          Configurables.configure(channel, context);
+          channelComponentMap.put(chName, new ChannelComponent(channel));
+          LOGGER.info("Created channel " + chName);
+        } catch (Exception e) {
+          String msg = String.format("Channel %s has been removed due to an " +
+                "error during configuration", chName);
+          LOGGER.error(msg, e);
+        }
       }
     }
     /*
      * Any channel which was not re-used, will have it's reference removed
      */
     for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
-      Map<String, Channel> channelMap = channels.get(channelKlass);
+      Map<String, Channel> channelMap = channelCache.get(channelKlass);
       if (channelMap != null) {
         for (String channelName : channelsNotReused.get(channelKlass)) {
           if(channelMap.remove(channelName) != null) {
@@ -172,7 +222,7 @@ public abstract class AbstractConfigurationProvider 
implements
           }
         }
         if (channelMap.isEmpty()) {
-          channels.remove(channelKlass);
+          channelCache.remove(channelKlass);
         }
       }
     }
@@ -193,10 +243,10 @@ public abstract class AbstractConfigurationProvider 
implements
       channel.setName(name);
       return channel;
     }
-    Map<String, Channel> channelMap = channels.get(channelClass);
+    Map<String, Channel> channelMap = channelCache.get(channelClass);
     if (channelMap == null) {
       channelMap = new HashMap<String, Channel>();
-      channels.put(channelClass, channelMap);
+      channelCache.put(channelClass, channelMap);
     }
     Channel channel = channelMap.get(name);
     if(channel == null) {
@@ -208,42 +258,62 @@ public abstract class AbstractConfigurationProvider 
implements
     return channel;
   }
 
-  private void loadSources(AgentConfiguration agentConf, 
MaterializedConfiguration conf)
+  private void loadSources(AgentConfiguration agentConf,
+      Map<String, ChannelComponent> channelComponentMap,
+      Map<String, SourceRunner> sourceRunnerMap)
       throws InstantiationException {
 
-    Set<String> sources = agentConf.getSourceSet();
+    Set<String> sourceNames = agentConf.getSourceSet();
     Map<String, ComponentConfiguration> compMap =
         agentConf.getSourceConfigMap();
     /*
      * Components which have a ComponentConfiguration object
      */
-    for (String sourceName : sources) {
+    for (String sourceName : sourceNames) {
       ComponentConfiguration comp = compMap.get(sourceName);
       if(comp != null) {
         SourceConfiguration config = (SourceConfiguration) comp;
 
         Source source = sourceFactory.create(comp.getComponentName(),
             comp.getType());
-
-        Configurables.configure(source, config);
-        Set<String> channelNames = config.getChannels();
-        List<Channel> channels = new ArrayList<Channel>();
-        for (String chName : channelNames) {
-          channels.add(conf.getChannels().get(chName));
+        try {
+          Configurables.configure(source, config);
+          Set<String> channelNames = config.getChannels();
+          List<Channel> sourceChannels = new ArrayList<Channel>();
+          for (String chName : channelNames) {
+            ChannelComponent channelComponent = 
channelComponentMap.get(chName);
+            if(channelComponent != null) {
+              sourceChannels.add(channelComponent.channel);
+            }
+          }
+          if(sourceChannels.isEmpty()) {
+            String msg = String.format("Source %s is not connected to a " +
+                "channel",  sourceName);
+            throw new IllegalStateException(msg);
+          }
+          ChannelSelectorConfiguration selectorConfig =
+              config.getSelectorConfiguration();
+
+          ChannelSelector selector = ChannelSelectorFactory.create(
+              sourceChannels, selectorConfig);
+
+          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+          Configurables.configure(channelProcessor, config);
+
+          source.setChannelProcessor(channelProcessor);
+          sourceRunnerMap.put(comp.getComponentName(),
+              SourceRunner.forSource(source));
+          for(Channel channel : sourceChannels) {
+            ChannelComponent channelComponent = Preconditions.
+                checkNotNull(channelComponentMap.get(channel.getName()),
+                    String.format("Channel %s", channel.getName()));
+            channelComponent.components.add(sourceName);
+          }
+        } catch (Exception e) {
+          String msg = String.format("Source %s has been removed due to an " +
+              "error during configuration", sourceName);
+          LOGGER.error(msg, e);
         }
-
-        ChannelSelectorConfiguration selectorConfig =
-            config.getSelectorConfiguration();
-
-        ChannelSelector selector = ChannelSelectorFactory.create(
-            channels, selectorConfig);
-
-        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
-        Configurables.configure(channelProcessor, config);
-
-        source.setChannelProcessor(channelProcessor);
-        conf.addSourceRunner(comp.getComponentName(),
-            SourceRunner.forSource(source));
       }
     }
     /*
@@ -251,41 +321,58 @@ public abstract class AbstractConfigurationProvider 
implements
      * and use only Context
      */
     Map<String, Context> sourceContexts = agentConf.getSourceContext();
-    for (String sourceName : sources) {
+    for (String sourceName : sourceNames) {
       Context context = sourceContexts.get(sourceName);
       if(context != null){
         Source source =
             sourceFactory.create(sourceName,
                 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
-        List<Channel> channels = new ArrayList<Channel>();
-        Configurables.configure(source, context);
-        String[] channelNames = context.getString(
-            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
-        for (String chName : channelNames) {
-          channels.add(conf.getChannels().get(chName));
+        try {
+          Configurables.configure(source, context);
+          List<Channel> sourceChannels = new ArrayList<Channel>();
+          String[] channelNames = context.getString(
+              BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
+          for (String chName : channelNames) {
+            ChannelComponent channelComponent = 
channelComponentMap.get(chName);
+            if(channelComponent != null) {
+              sourceChannels.add(channelComponent.channel);
+            }
+          }
+          if(sourceChannels.isEmpty()) {
+            String msg = String.format("Source %s is not connected to a " +
+                "channel",  sourceName);
+            throw new IllegalStateException(msg);
+          }
+          Map<String, String> selectorConfig = context.getSubProperties(
+              
BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
+
+          ChannelSelector selector = ChannelSelectorFactory.create(
+              sourceChannels, selectorConfig);
+
+          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
+          Configurables.configure(channelProcessor, context);
+          source.setChannelProcessor(channelProcessor);
+          sourceRunnerMap.put(sourceName,
+              SourceRunner.forSource(source));
+          for(Channel channel : sourceChannels) {
+            ChannelComponent channelComponent = Preconditions.
+                checkNotNull(channelComponentMap.get(channel.getName()),
+                    String.format("Channel %s", channel.getName()));
+            channelComponent.components.add(sourceName);
+          }
+        } catch (Exception e) {
+          String msg = String.format("Source %s has been removed due to an " +
+              "error during configuration", sourceName);
+          LOGGER.error(msg, e);
         }
-
-        Map<String, String> selectorConfig = context.getSubProperties(
-            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
-
-        ChannelSelector selector = ChannelSelectorFactory.create(
-            channels, selectorConfig);
-
-        ChannelProcessor channelProcessor = new ChannelProcessor(selector);
-        Configurables.configure(channelProcessor, context);
-
-        source.setChannelProcessor(channelProcessor);
-        conf.addSourceRunner(sourceName,
-            SourceRunner.forSource(source));
-
       }
     }
   }
 
-  private void loadSinks(AgentConfiguration agentConf, 
MaterializedConfiguration conf)
+  private void loadSinks(AgentConfiguration agentConf,
+      Map<String, ChannelComponent> channelComponentMap, Map<String, 
SinkRunner> sinkRunnerMap)
       throws InstantiationException {
     Set<String> sinkNames = agentConf.getSinkSet();
-    ImmutableMap<String,Channel> channels = conf.getChannels();
     Map<String, ComponentConfiguration> compMap =
         agentConf.getSinkConfigMap();
     Map<String, Sink> sinks = new HashMap<String, Sink>();
@@ -298,11 +385,23 @@ public abstract class AbstractConfigurationProvider 
implements
         SinkConfiguration config = (SinkConfiguration) comp;
         Sink sink = sinkFactory.create(comp.getComponentName(),
             comp.getType());
-
-        Configurables.configure(sink, config);
-
-        sink.setChannel(channels.get(config.getChannel()));
-        sinks.put(comp.getComponentName(), sink);
+        try {
+          Configurables.configure(sink, config);
+          ChannelComponent channelComponent = channelComponentMap.
+              get(config.getChannel());
+          if(channelComponent == null) {
+            String msg = String.format("Sink %s is not connected to a " +
+                "channel",  sinkName);
+            throw new IllegalStateException(msg);
+          }
+          sink.setChannel(channelComponent.channel);
+          sinks.put(comp.getComponentName(), sink);
+          channelComponent.components.add(sinkName);
+        } catch (Exception e) {
+          String msg = String.format("Sink %s has been removed due to an " +
+              "error during configuration", sinkName);
+          LOGGER.error(msg, e);
+        }
       }
     }
     /*
@@ -315,31 +414,42 @@ public abstract class AbstractConfigurationProvider 
implements
       if(context != null) {
         Sink sink = sinkFactory.create(sinkName, context.getString(
             BasicConfigurationConstants.CONFIG_TYPE));
-        Configurables.configure(sink, context);
-
-        sink.setChannel(channels.get(context.getString(
-            BasicConfigurationConstants.CONFIG_CHANNEL)));
-        sinks.put(sinkName, sink);
+        try {
+          Configurables.configure(sink, context);
+          ChannelComponent channelComponent = channelComponentMap.
+              
get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
+          if(channelComponent == null) {
+            String msg = String.format("Sink %s is not connected to a " +
+                "channel",  sinkName);
+            throw new IllegalStateException(msg);
+          }
+          sink.setChannel(channelComponent.channel);
+          sinks.put(sinkName, sink);
+          channelComponent.components.add(sinkName);
+        } catch (Exception e) {
+          String msg = String.format("Sink %s has been removed due to an " +
+              "error during configuration", sinkName);
+          LOGGER.error(msg, e);
+        }
       }
     }
 
-    loadSinkGroups(agentConf, sinks, conf);
+    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
   }
 
   private void loadSinkGroups(AgentConfiguration agentConf,
-      Map<String, Sink> sinks, MaterializedConfiguration conf)
+      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
           throws InstantiationException {
-    Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
+    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
     Map<String, ComponentConfiguration> compMap =
         agentConf.getSinkGroupConfigMap();
     Map<String, String> usedSinks = new HashMap<String, String>();
-    for (String groupName: sinkgroupNames) {
+    for (String groupName: sinkGroupNames) {
       ComponentConfiguration comp = compMap.get(groupName);
       if(comp != null) {
         SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
-        List<String> groupSinkList = groupConf.getSinks();
         List<Sink> groupSinks = new ArrayList<Sink>();
-        for (String sink : groupSinkList) {
+        for (String sink : groupConf.getSinks()) {
           Sink s = sinks.remove(sink);
           if (s == null) {
             String sinkUser = usedSinks.get(sink);
@@ -357,23 +467,43 @@ public abstract class AbstractConfigurationProvider 
implements
           groupSinks.add(s);
           usedSinks.put(sink, groupName);
         }
-        SinkGroup group = new SinkGroup(groupSinks);
-        Configurables.configure(group, groupConf);
-        conf.addSinkRunner(comp.getComponentName(),
-            new SinkRunner(group.getProcessor()));
+        try {
+          SinkGroup group = new SinkGroup(groupSinks);
+          Configurables.configure(group, groupConf);
+          sinkRunnerMap.put(comp.getComponentName(),
+              new SinkRunner(group.getProcessor()));
+        } catch (Exception e) {
+          String msg = String.format("SinkGroup %s has been removed due to " +
+              "an error during configuration", groupName);
+          LOGGER.error(msg, e);
+        }
       }
     }
     // add any unassigned sinks to solo collectors
     for(Entry<String, Sink> entry : sinks.entrySet()) {
       if (!usedSinks.containsValue(entry.getKey())) {
-        SinkProcessor pr = new DefaultSinkProcessor();
-        List<Sink> sinkMap = new ArrayList<Sink>();
-        sinkMap.add(entry.getValue());
-        pr.setSinks(sinkMap);
-        Configurables.configure(pr, new Context());
-        conf.addSinkRunner(entry.getKey(),
-            new SinkRunner(pr));
+        try {
+          SinkProcessor pr = new DefaultSinkProcessor();
+          List<Sink> sinkMap = new ArrayList<Sink>();
+          sinkMap.add(entry.getValue());
+          pr.setSinks(sinkMap);
+          Configurables.configure(pr, new Context());
+          sinkRunnerMap.put(entry.getKey(),
+              new SinkRunner(pr));
+        } catch(Exception e) {
+          String msg = String.format("SinkGroup %s has been removed due to " +
+              "an error during configuration", entry.getKey());
+          LOGGER.error(msg, e);
+        }
       }
     }
   }
+  private static class ChannelComponent {
+    final Channel channel;
+    final List<String> components;
+    ChannelComponent(Channel channel) {
+      this.channel = channel;
+      components = Lists.newArrayList();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/5fdf673d/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
index 25001b1..15a478d 100644
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
+++ 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
@@ -23,12 +23,17 @@ import junit.framework.Assert;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.annotations.Disposable;
 import org.apache.flume.annotations.Recyclable;
 import org.apache.flume.channel.AbstractChannel;
+import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.source.AbstractSource;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -115,20 +120,84 @@ public class TestAbstractConfigurationProvider {
     Assert.assertNotSame(channel1, channel3);
   }
 
-
-  private Map<String, String> getPropertiesForChannel(String agentName, String 
channelType) {
+  @Test
+  public void testSourceThrowsExceptionDuringConfiguration() throws Exception {
+    String agentName = "agent1";
+    String sourceType = UnconfigurableSource.class.getName();
+    String channelType = "memory";
+    String sinkType = "null";
+    Map<String, String> properties = getProperties(agentName, sourceType,
+        channelType, sinkType);
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 0);
+    Assert.assertTrue(config.getChannels().size() == 1);
+    Assert.assertTrue(config.getSinkRunners().size() == 1);
+  }
+  @Test
+  public void testChannelThrowsExceptionDuringConfiguration() throws Exception 
{
+    String agentName = "agent1";
+    String sourceType = "seq";
+    String channelType = UnconfigurableChannel.class.getName();
+    String sinkType = "null";
+    Map<String, String> properties = getProperties(agentName, sourceType,
+        channelType, sinkType);
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 0);
+    Assert.assertTrue(config.getChannels().size() == 0);
+    Assert.assertTrue(config.getSinkRunners().size() == 0);
+  }
+  @Test
+  public void testSinkThrowsExceptionDuringConfiguration() throws Exception {
+    String agentName = "agent1";
+    String sourceType = "seq";
+    String channelType = "memory";
+    String sinkType = UnconfigurableSink.class.getName();
+    Map<String, String> properties = getProperties(agentName, sourceType,
+        channelType, sinkType);
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 1);
+    Assert.assertTrue(config.getChannels().size() == 1);
+    Assert.assertTrue(config.getSinkRunners().size() == 0);
+  }
+  @Test
+  public void testSourceAndSinkThrowExceptionDuringConfiguration()
+      throws Exception {
+    String agentName = "agent1";
+    String sourceType = UnconfigurableSource.class.getName();
+    String channelType = "memory";
+    String sinkType = UnconfigurableSink.class.getName();
+    Map<String, String> properties = getProperties(agentName, sourceType,
+        channelType, sinkType);
+    MemoryConfigurationProvider provider =
+        new MemoryConfigurationProvider(agentName, properties);
+    MaterializedConfiguration config = provider.getConfiguration();
+    Assert.assertTrue(config.getSourceRunners().size() == 0);
+    Assert.assertTrue(config.getChannels().size() == 0);
+    Assert.assertTrue(config.getSinkRunners().size() == 0);
+  }
+  private Map<String, String> getProperties(String agentName,
+      String sourceType, String channelType, String sinkType) {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(agentName + ".sources", "source1");
     properties.put(agentName + ".channels", "channel1");
     properties.put(agentName + ".sinks", "sink1");
-    properties.put(agentName + ".sources.source1.type", "seq");
+    properties.put(agentName + ".sources.source1.type", sourceType);
     properties.put(agentName + ".sources.source1.channels", "channel1");
     properties.put(agentName + ".channels.channel1.type", channelType);
     properties.put(agentName + ".channels.channel1.capacity", "100");
-    properties.put(agentName + ".sinks.sink1.type", "null");
+    properties.put(agentName + ".sinks.sink1.type", sinkType);
     properties.put(agentName + ".sinks.sink1.channel", "channel1");
     return properties;
   }
+  private Map<String, String> getPropertiesForChannel(String agentName, String 
channelType) {
+    return getProperties(agentName, "seq", channelType, "null");
+  }
 
   public static class MemoryConfigurationProvider extends 
AbstractConfigurationProvider {
     private Map<String, String> properties;
@@ -190,4 +259,40 @@ public class TestAbstractConfigurationProvider {
       throw new UnsupportedOperationException();
     }
   }
+  public static class UnconfigurableChannel extends AbstractChannel {
+    @Override
+    public void configure(Context context) {
+      throw new RuntimeException("expected");
+    }
+    @Override
+    public void put(Event event) throws ChannelException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public Event take() throws ChannelException {
+      throw new UnsupportedOperationException();
+     }
+    @Override
+    public Transaction getTransaction() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  public static class UnconfigurableSource extends AbstractSource
+  implements Configurable {
+    @Override
+    public void configure(Context context) {
+      throw new RuntimeException("expected");
+    }
+  }
+  public static class UnconfigurableSink extends AbstractSink
+  implements Configurable {
+    @Override
+    public void configure(Context context) {
+      throw new RuntimeException("expected");
+    }
+    @Override
+    public Status process() throws EventDeliveryException {
+      throw new UnsupportedOperationException();
+    }
+  }
 }

Reply via email to