http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
index 8ae5bd9..e6c7875 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
@@ -21,6 +21,9 @@ import 
org.apache.flume.conf.ComponentConfiguration.ComponentType;
 import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
 import org.apache.flume.conf.channel.ChannelConfiguration;
 import org.apache.flume.conf.channel.ChannelType;
+import org.apache.flume.configfilter.ConfigFilter;
+import org.apache.flume.conf.configfilter.ConfigFilterConfiguration;
+import org.apache.flume.conf.configfilter.ConfigFilterType;
 import org.apache.flume.conf.sink.SinkConfiguration;
 import org.apache.flume.conf.sink.SinkGroupConfiguration;
 import org.apache.flume.conf.sink.SinkType;
@@ -39,9 +42,34 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CHANNELS;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CHANNELS_PREFIX;
+import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIG;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS_PREFIX;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX;
+import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS_PREFIX;
+import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SOURCES;
+import static 
org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SOURCES_PREFIX;
+import static 
org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR;
+import static 
org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.WARNING;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.AGENT_NAME_MISSING;
+import static org.apache.flume.conf.FlumeConfigurationErrorType.CONFIG_ERROR;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.DUPLICATE_PROPERTY;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.INVALID_PROPERTY;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_NAME_NULL;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_PART_OF_ANOTHER_GROUP;
+import static 
org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_VALUE_NULL;
 
 /**
  * <p>
@@ -58,7 +86,7 @@ import java.util.StringTokenizer;
  */
 public class FlumeConfiguration {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(FlumeConfiguration.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlumeConfiguration.class);
 
   private final Map<String, AgentConfiguration> agentConfigMap;
   private final LinkedList<FlumeConfigurationError> errors;
@@ -71,13 +99,12 @@ public class FlumeConfiguration {
    */
   @Deprecated
   public FlumeConfiguration(Properties properties) {
-    agentConfigMap = new HashMap<String, AgentConfiguration>();
-    errors = new LinkedList<FlumeConfigurationError>();
+    agentConfigMap = new HashMap<>();
+    errors = new LinkedList<>();
     // Construct the in-memory component hierarchy
-    for (Object name : properties.keySet()) {
-      Object value = properties.get(name);
-      if (!addRawProperty(name.toString(), value.toString())) {
-        logger.warn("Configuration property ignored: " + name + " = " + value);
+    for (Entry entry : properties.entrySet()) {
+      if (!addRawProperty(entry.getKey().toString(), 
entry.getValue().toString())) {
+        LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), 
entry.getValue());
       }
     }
     // Now iterate thru the agentContext and create agent configs and add them
@@ -91,14 +118,12 @@ public class FlumeConfiguration {
    * Creates a populated Flume Configuration object.
    */
   public FlumeConfiguration(Map<String, String> properties) {
-    agentConfigMap = new HashMap<String, AgentConfiguration>();
-    errors = new LinkedList<FlumeConfigurationError>();
+    agentConfigMap = new HashMap<>();
+    errors = new LinkedList<>();
     // Construct the in-memory component hierarchy
-    for (String name : properties.keySet()) {
-      String value = properties.get(name);
-
-      if (!addRawProperty(name, value)) {
-        logger.warn("Configuration property ignored: " + name + " = " + value);
+    for (Entry<String, String> entry : properties.entrySet()) {
+      if (!addRawProperty(entry.getKey(), entry.getValue())) {
+        LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), 
entry.getValue());
       }
     }
     // Now iterate thru the agentContext and create agent configs and add them
@@ -117,83 +142,68 @@ public class FlumeConfiguration {
   }
 
   private void validateConfiguration() {
-    Iterator<String> it = agentConfigMap.keySet().iterator();
+    Set<Entry<String, AgentConfiguration>> entries = agentConfigMap.entrySet();
+    Iterator<Entry<String, AgentConfiguration>> it = entries.iterator();
 
     while (it.hasNext()) {
-      String agentName = it.next();
-      AgentConfiguration aconf = agentConfigMap.get(agentName);
+      Entry<String, AgentConfiguration> next = it.next();
+      String agentName = next.getKey();
+      AgentConfiguration aconf = next.getValue();
 
       if (!aconf.isValid()) {
-        logger.warn("Agent configuration invalid for agent '" + agentName
-            + "'. It will be removed.");
-        errors.add(new FlumeConfigurationError(agentName, "",
-            FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID,
-            ErrorOrWarning.ERROR));
-
+        LOGGER.warn("Agent configuration invalid for agent '{}'. It will be 
removed.", agentName);
+        addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR);
         it.remove();
       }
-      logger.debug("Channels:" + aconf.channels + "\n");
-      logger.debug("Sinks " + aconf.sinks + "\n");
-      logger.debug("Sources " + aconf.sources + "\n");
+      LOGGER.debug("Channels:{}\n", aconf.channels);
+      LOGGER.debug("Sinks {}\n", aconf.sinks);
+      LOGGER.debug("Sources {}\n", aconf.sources);
     }
 
-    logger.info("Post-validation flume configuration contains configuration"
-        + " for agents: " + agentConfigMap.keySet());
+    LOGGER.info(
+        "Post-validation flume configuration contains configuration for 
agents: {}",
+        agentConfigMap.keySet()
+    );
   }
 
-  private boolean addRawProperty(String name, String value) {
+  private boolean addRawProperty(String rawName, String rawValue) {
     // Null names and values not supported
-    if (name == null || value == null) {
-      errors
-          .add(new FlumeConfigurationError("", "",
-              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
-              ErrorOrWarning.ERROR));
+    if (rawName == null || rawValue == null) {
+      addError("", AGENT_NAME_MISSING, ERROR);
       return false;
     }
 
+    // Remove leading and trailing spaces
+    String name = rawName.trim();
+    String value = rawValue.trim();
+
     // Empty values are not supported
-    if (value.trim().length() == 0) {
-      errors
-          .add(new FlumeConfigurationError(name, "",
-              FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-              ErrorOrWarning.ERROR));
+    if (value.isEmpty()) {
+      addError(name, PROPERTY_VALUE_NULL, ERROR);
       return false;
     }
 
-    // Remove leading and trailing spaces
-    name = name.trim();
-    value = value.trim();
-
     int index = name.indexOf('.');
 
     // All configuration keys must have a prefix defined as agent name
     if (index == -1) {
-      errors
-          .add(new FlumeConfigurationError(name, "",
-              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
-              ErrorOrWarning.ERROR));
+      addError(name, AGENT_NAME_MISSING, ERROR);
       return false;
     }
 
     String agentName = name.substring(0, index);
 
     // Agent name must be specified for all properties
-    if (agentName.length() == 0) {
-      errors
-          .add(new FlumeConfigurationError(name, "",
-              FlumeConfigurationErrorType.AGENT_NAME_MISSING,
-              ErrorOrWarning.ERROR));
+    if (agentName.isEmpty()) {
+      addError(name, AGENT_NAME_MISSING, ERROR);
       return false;
     }
 
     String configKey = name.substring(index + 1);
 
     // Configuration key must be specified for every property
-    if (configKey.length() == 0) {
-      errors
-          .add(new FlumeConfigurationError(name, "",
-              FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
-              ErrorOrWarning.ERROR));
+    if (configKey.isEmpty()) {
+      addError(name, PROPERTY_NAME_NULL, ERROR);
       return false;
     }
 
@@ -209,9 +219,16 @@ public class FlumeConfiguration {
     return aconf.addProperty(configKey, value);
   }
 
+  private void addError(
+      String component, FlumeConfigurationErrorType errorType, ErrorOrWarning 
level
+  ) {
+    errors.add(new FlumeConfigurationError(component, "", errorType, level));
+  }
+
   public static class AgentConfiguration {
 
     private final String agentName;
+    private String configFilters;
     private String sources;
     private String sinks;
     private String channels;
@@ -221,32 +238,40 @@ public class FlumeConfiguration {
     private final Map<String, ComponentConfiguration> sinkConfigMap;
     private final Map<String, ComponentConfiguration> channelConfigMap;
     private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
+    private final Map<String, ComponentConfiguration> configFilterConfigMap;
 
+    private Map<String, Context> configFilterContextMap;
     private Map<String, Context> sourceContextMap;
     private Map<String, Context> sinkContextMap;
     private Map<String, Context> channelContextMap;
     private Map<String, Context> sinkGroupContextMap;
 
     private Set<String> sinkSet;
+    private Set<String> configFilterSet;
     private Set<String> sourceSet;
     private Set<String> channelSet;
     private Set<String> sinkgroupSet;
 
     private final List<FlumeConfigurationError> errorList;
+    private List<ConfigFilter> configFiltersInstances;
+    private Map<String, Pattern> configFilterPatternCache;
 
     private AgentConfiguration(String agentName,
                                List<FlumeConfigurationError> errorList) {
       this.agentName = agentName;
       this.errorList = errorList;
-      sourceConfigMap = new HashMap<String, ComponentConfiguration>();
-      sinkConfigMap = new HashMap<String, ComponentConfiguration>();
-      channelConfigMap = new HashMap<String, ComponentConfiguration>();
-      sinkgroupConfigMap = new HashMap<String, ComponentConfiguration>();
-      sourceContextMap = new HashMap<String, Context>();
-      sinkContextMap = new HashMap<String, Context>();
-      channelContextMap = new HashMap<String, Context>();
-      sinkGroupContextMap = new HashMap<String, Context>();
-
+      configFilterConfigMap = new HashMap<>();
+      sourceConfigMap = new HashMap<>();
+      sinkConfigMap = new HashMap<>();
+      channelConfigMap = new HashMap<>();
+      sinkgroupConfigMap = new HashMap<>();
+      configFilterContextMap = new HashMap<>();
+      sourceContextMap = new HashMap<>();
+      sinkContextMap = new HashMap<>();
+      channelContextMap = new HashMap<>();
+      sinkGroupContextMap = new HashMap<>();
+      configFiltersInstances = new ArrayList<>();
+      configFilterPatternCache = new HashMap<>();
     }
 
     public Map<String, ComponentConfiguration> getChannelConfigMap() {
@@ -257,6 +282,10 @@ public class FlumeConfiguration {
       return sourceConfigMap;
     }
 
+    public Map<String, ComponentConfiguration> getConfigFilterConfigMap() {
+      return configFilterConfigMap;
+    }
+
     public Map<String, ComponentConfiguration> getSinkConfigMap() {
       return sinkConfigMap;
     }
@@ -265,22 +294,30 @@ public class FlumeConfiguration {
       return sinkgroupConfigMap;
     }
 
+    public Map<String, Context> getConfigFilterContext() {
+      return configFilterContextMap;
+    }
+
     public Map<String, Context> getSourceContext() {
-      return this.sourceContextMap;
+      return sourceContextMap;
     }
 
     public Map<String, Context> getSinkContext() {
-      return this.sinkContextMap;
+      return sinkContextMap;
     }
 
     public Map<String, Context> getChannelContext() {
-      return this.channelContextMap;
+      return channelContextMap;
     }
 
     public Set<String> getSinkSet() {
       return sinkSet;
     }
 
+    public Set<String> getConfigFilterSet() {
+      return configFilterSet;
+    }
+
     public Set<String> getSourceSet() {
       return sourceSet;
     }
@@ -310,36 +347,35 @@ public class FlumeConfiguration {
      * @return true if the configuration is valid, false otherwise
      */
     private boolean isValid() {
-      logger.debug("Starting validation of configuration for agent: {}", 
agentName);
-      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
-        logger.debug("Initial configuration: {}", 
this.getPrevalidationConfig());
+      LOGGER.debug("Starting validation of configuration for agent: {}", 
agentName);
+      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+        LOGGER.debug("Initial configuration: {}", getPrevalidationConfig());
       }
 
+      configFilterSet = validateConfigFilterSet();
+      createConfigFilters();
+      runFiltersThroughConfigs();
+
       // Make sure that at least one channel is specified
-      if (channels == null || channels.trim().length() == 0) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' does not contain any channels. Marking it as invalid.");
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_CHANNELS,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.ERROR));
+      if (channels == null || channels.trim().isEmpty()) {
+        LOGGER.warn(
+            "Agent configuration for '{}' does not contain any channels. 
Marking it as invalid.",
+            agentName
+        );
+        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
         return false;
       }
 
-      channelSet =
-          new HashSet<String>(Arrays
-              .asList(channels.split("\\s+")));
-      // validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL,
-      // ATTR_TYPE);
+      channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+")));
 
       channelSet = validateChannels(channelSet);
-      if (channelSet.size() == 0) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' does not contain any valid channels. Marking it as invalid.");
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_CHANNELS,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.ERROR));
+      if (channelSet.isEmpty()) {
+        LOGGER.warn(
+            "Agent configuration for '{}' does not contain any valid channels. 
" +
+                "Marking it as invalid.",
+            agentName
+        );
+        addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
         return false;
       }
 
@@ -348,75 +384,155 @@ public class FlumeConfiguration {
       sinkgroupSet = validateGroups(sinkSet);
 
       // If no sources or sinks are present, then this is invalid
-      if (sourceSet.size() == 0 && sinkSet.size() == 0) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' has no sources or sinks. Will be marked invalid.");
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_SOURCES,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.ERROR));
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_SINKS,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.ERROR));
+      if (sourceSet.isEmpty() && sinkSet.isEmpty()) {
+        LOGGER.warn(
+            "Agent configuration for '{}' has no sources or sinks. Will be 
marked invalid.",
+            agentName
+        );
+        addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR);
+        addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR);
         return false;
       }
 
       // Now rewrite the sources/sinks/channels
 
-      this.sources = getSpaceDelimitedList(sourceSet);
-      this.channels = getSpaceDelimitedList(channelSet);
-      this.sinks = getSpaceDelimitedList(sinkSet);
-      this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);
+      this.configFilters = getSpaceDelimitedList(configFilterSet);
+      sources = getSpaceDelimitedList(sourceSet);
+      channels = getSpaceDelimitedList(channelSet);
+      sinks = getSpaceDelimitedList(sinkSet);
+      sinkgroups = getSpaceDelimitedList(sinkgroupSet);
 
-      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
-        logger.debug("Post validation configuration for {}", agentName);
-        logger.debug(this.getPostvalidationConfig());
+      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+        LOGGER.debug("Post validation configuration for {}", agentName);
+        LOGGER.debug(getPostvalidationConfig());
       }
 
       return true;
     }
 
-    private ChannelType getKnownChannel(String type) {
-      ChannelType[] values = ChannelType.values();
-      for (ChannelType value : values) {
-        if (value.toString().equalsIgnoreCase(type)) return value;
+    private void runFiltersThroughConfigs() {
+      runFiltersOnContextMaps(
+          sourceContextMap,
+          channelContextMap,
+          sinkContextMap,
+          sinkGroupContextMap
+      );
+    }
 
-        String channel = value.getChannelClassName();
+    private void runFiltersOnContextMaps(Map<String, Context>... maps) {
+      for (Map<String, Context> map: maps) {
+        for (Context context : map.values()) {
+          for (String key : context.getParameters().keySet()) {
+            filterValue(context, key);
+          }
+        }
+      }
+    }
 
-        if (channel != null && channel.equalsIgnoreCase(type)) return value;
+    private void createConfigFilters() {
+      for (String name: configFilterSet) {
+        Context context = configFilterContextMap.get(name);
+        ComponentConfiguration componentConfiguration = 
configFilterConfigMap.get(name);
+        try {
+          if (context != null) {
+            ConfigFilter configFilter = ConfigFilterFactory.create(
+                name, 
context.getString(BasicConfigurationConstants.CONFIG_TYPE)
+            );
+            configFilter.initializeWithConfiguration(context.getParameters());
+            configFiltersInstances.add(configFilter);
+            configFilterPatternCache.put(configFilter.getName(),
+                createConfigFilterPattern(configFilter));
+          } else if (componentConfiguration != null) {
+            ConfigFilter configFilter = ConfigFilterFactory.create(
+                componentConfiguration.getComponentName(), 
componentConfiguration.getType()
+            );
+            configFiltersInstances.add(configFilter);
+            configFilterPatternCache.put(configFilter.getName(), 
+                createConfigFilterPattern(configFilter));
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error while creating config filter {}", name, e);
+        }
+      }
+    }
 
+    private Pattern createConfigFilterPattern(ConfigFilter configFilter) {
+      //JAVA EL expression style ${myFilterName['my_key']} or
+      //JAVA EL expression style ${myFilterName["my_key"]} or
+      //JAVA EL expression style ${myFilterName[my_key]}
+      return Pattern.compile(
+          "\\$\\{" +  // ${
+              Pattern.quote(configFilter.getName()) + //<filterComponentName>
+              "\\[(|'|\")" +  // delimiter :'," or nothing
+              "(?<key>[-_a-zA-Z0-9]+)" + // key
+              "\\1\\]" + // matching delimiter
+              "\\}" // }
+      );
+    }
+
+    private void filterValue(Context c, String contextKey) {
+      for (ConfigFilter configFilter : configFiltersInstances) {
+        try {
+          Pattern pattern = 
configFilterPatternCache.get(configFilter.getName());
+          String currentValue = c.getString(contextKey);
+          Matcher matcher = pattern.matcher(currentValue);
+          String filteredValue = currentValue;
+          while (matcher.find()) {
+            String key = matcher.group("key");
+            LOGGER.debug("Replacing {} from config filter {}", key, 
configFilter.getName());
+            String filtered = configFilter.filter(key);
+            if (filtered == null) {
+              continue;
+            }
+            String fullMatch = matcher.group();
+            filteredValue = filteredValue.replace(fullMatch, filtered);
+          }
+          c.put(contextKey, filteredValue);
+        } catch (Exception e) {
+          e.printStackTrace();
+          LOGGER.error("Error while matching and filtering configFilter: {} 
and key: {}",
+              new Object[]{configFilter.getName(), contextKey, e});
+        }
       }
-      return null;
+    }
+
+    private void addError(String key, FlumeConfigurationErrorType errorType, 
ErrorOrWarning level) {
+      errorList.add(new FlumeConfigurationError(agentName, key, errorType, 
level));
+    }
+
+    private ChannelType getKnownChannel(String type) {
+      return getKnownComponent(type, ChannelType.values());
     }
 
     private SinkType getKnownSink(String type) {
-      SinkType[] values = SinkType.values();
-      for (SinkType value : values) {
-        if (value.toString().equalsIgnoreCase(type)) return value;
-        String sink = value.getSinkClassName();
-        if (sink != null && sink.equalsIgnoreCase(type)) return value;
-      }
-      return null;
+      return getKnownComponent(type, SinkType.values());
     }
 
     private SourceType getKnownSource(String type) {
-      SourceType[] values = SourceType.values();
-      for (SourceType value : values) {
+      return getKnownComponent(type, SourceType.values());
+    }
+
+    private ConfigFilterType getKnownConfigFilter(String type) {
+      return getKnownComponent(type, ConfigFilterType.values());
+    }
+
+    private <T extends ComponentWithClassName> T getKnownComponent(String 
type, T[] values) {
+      for (T value : values) {
         if (value.toString().equalsIgnoreCase(type)) return value;
-        String src = value.getSourceClassName();
+        String src = value.getClassName();
         if (src != null && src.equalsIgnoreCase(type)) return value;
       }
       return null;
     }
 
+
     /**
      * If it is a known component it will do the full validation required for
      * that component, else it will do the validation required for that class.
      */
     private Set<String> validateChannels(Set<String> channelSet) {
       Iterator<String> iter = channelSet.iterator();
-      Map<String, Context> newContextMap = new HashMap<String, Context>();
+      Map<String, Context> newContextMap = new HashMap<>();
       ChannelConfiguration conf = null;
       /*
        * The logic for the following code:
@@ -449,7 +565,7 @@ public class FlumeConfiguration {
           String config = null;
           // Not a known channel - cannot do specific validation to this 
channel
           if (chType == null) {
-            config = 
channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG);
+            config = channelContext.getString(CONFIG_CONFIG);
             if (config == null || config.isEmpty()) {
               config = "OTHER";
             } else {
@@ -464,7 +580,7 @@ public class FlumeConfiguration {
             conf =
                 (ChannelConfiguration) ComponentConfigurationFactory.create(
                     channelName, config, ComponentType.CHANNEL);
-            logger.debug("Created channel " + channelName);
+            LOGGER.debug("Created channel {}", channelName);
             if (conf != null) {
               conf.configure(channelContext);
             }
@@ -483,14 +599,15 @@ public class FlumeConfiguration {
             // thrown
             if (conf != null) errorList.addAll(conf.getErrors());
             iter.remove();
-            logger.warn("Could not configure channel " + channelName
-                + " due to: " + e.getMessage(), e);
+            LOGGER.warn(
+                "Could not configure channel {} due to: {}",
+                new Object[]{channelName, e.getMessage(), e}
+            );
 
           }
         } else {
           iter.remove();
-          errorList.add(new FlumeConfigurationError(agentName, channelName,
-              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
+          addError(channelName, CONFIG_ERROR, ERROR);
         }
       }
       channelContextMap = newContextMap;
@@ -501,15 +618,89 @@ public class FlumeConfiguration {
       return channelSet;
     }
 
+    private Set<String> validateConfigFilterSet() {
+      if (configFilters == null || configFilters.isEmpty()) {
+        LOGGER.warn("Agent configuration for '{}' has no configfilters.", 
agentName);
+        return new HashSet<>();
+      }
+      Set<String> configFilterSet = new 
HashSet<>(Arrays.asList(configFilters.split("\\s+")));
+      Map<String, Context> newContextMap = new HashMap<>();
+
+      Iterator<String> iter = configFilterSet.iterator();
+      ConfigFilterConfiguration conf = null;
+      while (iter.hasNext()) {
+        String configFilterName = iter.next();
+        Context configFilterContext = 
configFilterContextMap.get(configFilterName);
+        if (configFilterContext != null) {
+
+
+          // Get the configuration object for the channel:
+          ConfigFilterType chType = 
getKnownConfigFilter(configFilterContext.getString(
+              BasicConfigurationConstants.CONFIG_TYPE));
+          boolean configSpecified = false;
+          String config = null;
+          // Not a known channel - cannot do specific validation to this 
channel
+          if (chType == null) {
+            config = configFilterContext.getString(CONFIG_CONFIG);
+            if (config == null || config.isEmpty()) {
+              config = "OTHER";
+            } else {
+              configSpecified = true;
+            }
+          } else {
+            config = chType.toString().toUpperCase(Locale.ENGLISH);
+            configSpecified = true;
+          }
+
+          try {
+            conf =
+                (ConfigFilterConfiguration) 
ComponentConfigurationFactory.create(
+                    configFilterName, config, ComponentType.CONFIG_FILTER);
+            LOGGER.debug("Created configfilter {}", configFilterName);
+            if (conf != null) {
+              conf.configure(configFilterContext);
+            }
+            if ((configSpecified && conf.isNotFoundConfigClass()) ||
+                !configSpecified) {
+              newContextMap.put(configFilterName, configFilterContext);
+            } else if (configSpecified) {
+              configFilterConfigMap.put(configFilterName, conf);
+            }
+
+            if (conf != null) {
+              errorList.addAll(conf.getErrors());
+            }
+          } catch (ConfigurationException e) {
+            if (conf != null) errorList.addAll(conf.getErrors());
+            iter.remove();
+            LOGGER.warn(
+                "Could not configure configfilter {} due to: {}",
+                new Object[]{configFilterName, e.getMessage(), e}
+            );
+
+          }
+        } else {
+          iter.remove();
+          addError(configFilterName, CONFIG_ERROR, ERROR);
+          LOGGER.warn("Configuration empty for: {}. Removed.", 
configFilterName);
+        }
+      }
+
+      configFilterContextMap = newContextMap;
+      Set<String> tempchannelSet = new HashSet<String>();
+      tempchannelSet.addAll(configFilterConfigMap.keySet());
+      tempchannelSet.addAll(configFilterContextMap.keySet());
+      configFilterSet.retainAll(tempchannelSet);
+
+      return configFilterSet;
+    }
+
+
     private Set<String> validateSources(Set<String> channelSet) {
       //Arrays.split() call will throw NPE if the sources string is empty
       if (sources == null || sources.isEmpty()) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' has no sources.");
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_SOURCES,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.WARNING));
+        LOGGER.warn("Agent configuration for '{}' has no sources.", agentName);
+        addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING);
         return new HashSet<String>();
       }
       Set<String> sourceSet =
@@ -546,7 +737,7 @@ public class FlumeConfiguration {
               BasicConfigurationConstants.CONFIG_TYPE));
           if (srcType == null) {
             config = srcContext.getString(
-                BasicConfigurationConstants.CONFIG_CONFIG);
+                CONFIG_CONFIG);
             if (config == null || config.isEmpty()) {
               config = "OTHER";
             } else {
@@ -573,7 +764,7 @@ public class FlumeConfiguration {
                 throw new ConfigurationException(
                     "No Channels configured for " + sourceName);
               }
-              srcContext.put(BasicConfigurationConstants.CONFIG_CHANNELS,
+              srcContext.put(CONFIG_CHANNELS,
                   this.getSpaceDelimitedList(channels));
             }
             if ((configSpecified && srcConf.isNotFoundConfigClass()) ||
@@ -586,14 +777,15 @@ public class FlumeConfiguration {
           } catch (ConfigurationException e) {
             if (srcConf != null) errorList.addAll(srcConf.getErrors());
             iter.remove();
-            logger.warn("Could not configure source  " + sourceName
-                + " due to: " + e.getMessage(), e);
+            LOGGER.warn(
+                "Could not configure source  {} due to: {}",
+                new Object[]{sourceName, e.getMessage(), e}
+            );
           }
         } else {
           iter.remove();
-          errorList.add(new FlumeConfigurationError(agentName, sourceName,
-              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
-          logger.warn("Configuration empty for: " + sourceName + ".Removed.");
+          addError(sourceName, CONFIG_ERROR, ERROR);
+          LOGGER.warn("Configuration empty for: {}.Removed.", sourceName);
         }
       }
 
@@ -614,12 +806,8 @@ public class FlumeConfiguration {
       Set<String> sinkSet;
       SinkConfiguration sinkConf = null;
       if (sinks == null || sinks.isEmpty()) {
-        logger.warn("Agent configuration for '" + agentName
-            + "' has no sinks.");
-        errorList.add(new FlumeConfigurationError(agentName,
-            BasicConfigurationConstants.CONFIG_SINKS,
-            FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
-            ErrorOrWarning.WARNING));
+        LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName);
+        addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING);
         return new HashSet<String>();
       } else {
         sinkSet =
@@ -650,9 +838,8 @@ public class FlumeConfiguration {
         Context sinkContext = sinkContextMap.get(sinkName.trim());
         if (sinkContext == null) {
           iter.remove();
-          logger.warn("no context for sink" + sinkName);
-          errorList.add(new FlumeConfigurationError(agentName, sinkName,
-              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
+          LOGGER.warn("no context for sink{}", sinkName);
+          addError(sinkName, CONFIG_ERROR, ERROR);
         } else {
           String config = null;
           boolean configSpecified = false;
@@ -660,7 +847,7 @@ public class FlumeConfiguration {
               BasicConfigurationConstants.CONFIG_TYPE));
           if (sinkType == null) {
             config = sinkContext.getString(
-                BasicConfigurationConstants.CONFIG_CONFIG);
+                CONFIG_CONFIG);
             if (config == null || config.isEmpty()) {
               config = "OTHER";
             } else {
@@ -671,7 +858,7 @@ public class FlumeConfiguration {
             configSpecified = true;
           }
           try {
-            logger.debug("Creating sink: " + sinkName + " using " + config);
+            LOGGER.debug("Creating sink: {} using {}", sinkName, config);
 
             sinkConf =
                 (SinkConfiguration) ComponentConfigurationFactory.create(
@@ -694,8 +881,10 @@ public class FlumeConfiguration {
           } catch (ConfigurationException e) {
             iter.remove();
             if (sinkConf != null) errorList.addAll(sinkConf.getErrors());
-            logger.warn("Could not configure sink  " + sinkName
-                + " due to: " + e.getMessage(), e);
+            LOGGER.warn(
+                "Could not configure sink  {} due to: {}",
+                new Object[]{sinkName, e.getMessage(), e}
+            );
           }
         }
         // Filter out any sinks that have invalid channel
@@ -740,15 +929,11 @@ public class FlumeConfiguration {
             if (conf != null) errorList.addAll(conf.getErrors());
             if (groupSinks != null && !groupSinks.isEmpty()) {
               List<String> sinkArray = new ArrayList<String>();
-              for (String sink : groupSinks) {
-                sinkArray.add(sink);
-              }
+              sinkArray.addAll(groupSinks);
               conf.setSinks(sinkArray);
               sinkgroupConfigMap.put(sinkgroupName, conf);
             } else {
-              errorList.add(new FlumeConfigurationError(agentName, 
sinkgroupName,
-                  FlumeConfigurationErrorType.CONFIG_ERROR,
-                  ErrorOrWarning.ERROR));
+              addError(sinkgroupName, CONFIG_ERROR, ERROR);
               if (conf != null) errorList.addAll(conf.getErrors());
               throw new ConfigurationException(
                   "No available sinks for sinkgroup: " + sinkgroupName
@@ -757,19 +942,16 @@ public class FlumeConfiguration {
 
           } catch (ConfigurationException e) {
             iter.remove();
-            errorList
-                .add(new FlumeConfigurationError(agentName, sinkgroupName,
-                    FlumeConfigurationErrorType.CONFIG_ERROR,
-                    ErrorOrWarning.ERROR));
-            logger.warn("Could not configure sink group " + sinkgroupName
-                + " due to: " + e.getMessage(), e);
+            addError(sinkgroupName, CONFIG_ERROR, ERROR);
+            LOGGER.warn(
+                "Could not configure sink group {} due to: {}",
+                new Object[]{sinkgroupName, e.getMessage(), e}
+            );
           }
         } else {
           iter.remove();
-          errorList.add(new FlumeConfigurationError(agentName, sinkgroupName,
-              FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
-          logger.warn("Configuration error for: " + sinkgroupName
-              + ".Removed.");
+          addError(sinkgroupName, CONFIG_ERROR, ERROR);
+          LOGGER.warn("Configuration error for: {}.Removed.", sinkgroupName);
         }
 
       }
@@ -800,23 +982,20 @@ public class FlumeConfiguration {
       while (sinkIt.hasNext()) {
         String curSink = sinkIt.next();
         if (usedSinks.containsKey(curSink)) {
-          logger.warn("Agent configuration for '" + agentName + "' sinkgroup '"
-              + groupConf.getComponentName() + "' sink '" + curSink
-              + "' in use by " + "another group: '" + usedSinks.get(curSink)
-              + "', sink not added");
-          errorList.add(new FlumeConfigurationError(agentName, groupConf
-              .getComponentName(),
-              FlumeConfigurationErrorType.PROPERTY_PART_OF_ANOTHER_GROUP,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn(
+              "Agent configuration for '{}' sinkgroup '{}' sink '{}' in use by 
another group: " +
+                  "'{}', sink not added",
+              new Object[]{agentName, groupConf.getComponentName(), curSink, 
usedSinks.get(curSink)}
+          );
+          addError(groupConf.getComponentName(), 
PROPERTY_PART_OF_ANOTHER_GROUP, ERROR);
           sinkIt.remove();
           continue;
         } else if (!sinkSet.contains(curSink)) {
-          logger.warn("Agent configuration for '" + agentName + "' sinkgroup '"
-              + groupConf.getComponentName() + "' sink not found: '" + curSink
-              + "',  sink not added");
-          errorList.add(new FlumeConfigurationError(agentName, curSink,
-              FlumeConfigurationErrorType.INVALID_PROPERTY,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn("Agent configuration for '{}' sinkgroup '{}' sink not 
found: '{}', " +
+                  " sink not added",
+              new Object[]{agentName, groupConf.getComponentName(), curSink}
+          );
+          addError(curSink, INVALID_PROPERTY, ERROR);
           sinkIt.remove();
           continue;
         } else {
@@ -827,11 +1006,11 @@ public class FlumeConfiguration {
     }
 
     private String getSpaceDelimitedList(Set<String> entries) {
-      if (entries.size() == 0) {
+      if (entries.isEmpty()) {
         return null;
       }
 
-      StringBuilder sb = new StringBuilder("");
+      StringBuilder sb = new StringBuilder();
 
       for (String entry : entries) {
         sb.append(" ").append(entry);
@@ -854,10 +1033,11 @@ public class FlumeConfiguration {
 
     public String getPrevalidationConfig() {
       StringBuilder sb = new StringBuilder("AgentConfiguration[");
-      sb.append(agentName).append("]").append(NEWLINE).append("SOURCES: ");
-      sb.append(sourceContextMap).append(NEWLINE).append("CHANNELS: ");
-      sb.append(channelContextMap).append(NEWLINE).append("SINKS: ");
-      sb.append(sinkContextMap).append(NEWLINE);
+      sb.append(agentName).append("]").append(NEWLINE);
+      sb.append("CONFIG_FILTERS: 
").append(configFilterContextMap).append(NEWLINE);
+      sb.append("SOURCES: ").append(sourceContextMap).append(NEWLINE);
+      sb.append("CHANNELS: ").append(channelContextMap).append(NEWLINE);
+      sb.append("SINKS: ").append(sinkContextMap).append(NEWLINE);
 
       return sb.toString();
     }
@@ -907,145 +1087,132 @@ public class FlumeConfiguration {
     }
 
     private boolean addProperty(String key, String value) {
+      // Check for configFilters
+      if (CONFIG_CONFIGFILTERS.equals(key)) {
+        if (configFilters == null) {
+          configFilters = value;
+          return true;
+        } else {
+          LOGGER.warn("Duplicate configfilter list specified for agent: {}", 
agentName);
+          addError(CONFIG_CONFIGFILTERS, DUPLICATE_PROPERTY, ERROR);
+          return false;
+        }
+      }
       // Check for sources
-      if (key.equals(BasicConfigurationConstants.CONFIG_SOURCES)) {
+      if (CONFIG_SOURCES.equals(key)) {
         if (sources == null) {
           sources = value;
           return true;
         } else {
-          logger
-              .warn("Duplicate source list specified for agent: " + agentName);
-          errorList.add(new FlumeConfigurationError(agentName,
-              BasicConfigurationConstants.CONFIG_SOURCES,
-              FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn("Duplicate source list specified for agent: {}", 
agentName);
+          addError(CONFIG_SOURCES, DUPLICATE_PROPERTY, ERROR);
           return false;
         }
       }
 
       // Check for sinks
-      if (key.equals(BasicConfigurationConstants.CONFIG_SINKS)) {
+      if (CONFIG_SINKS.equals(key)) {
         if (sinks == null) {
           sinks = value;
-          logger.info("Added sinks: " + sinks + " Agent: " + this.agentName);
+          LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName);
           return true;
         } else {
-          logger.warn("Duplicate sink list specfied for agent: " + agentName);
-          errorList.add(new FlumeConfigurationError(agentName,
-              BasicConfigurationConstants.CONFIG_SINKS,
-              FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName);
+          addError(CONFIG_SINKS, DUPLICATE_PROPERTY, ERROR);
           return false;
         }
       }
 
       // Check for channels
-      if (key.equals(BasicConfigurationConstants.CONFIG_CHANNELS)) {
+      if (CONFIG_CHANNELS.equals(key)) {
         if (channels == null) {
           channels = value;
 
           return true;
         } else {
-          logger.warn("Duplicate channel list specified for agent: "
-              + agentName);
-          errorList.add(new FlumeConfigurationError(agentName,
-              BasicConfigurationConstants.CONFIG_CHANNELS,
-              FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn("Duplicate channel list specified for agent: {}", 
agentName);
+          addError(CONFIG_CHANNELS, DUPLICATE_PROPERTY, ERROR);
           return false;
         }
       }
 
       // Check for sinkgroups
-      if (key.equals(BasicConfigurationConstants.CONFIG_SINKGROUPS)) {
+      if (CONFIG_SINKGROUPS.equals(key)) {
         if (sinkgroups == null) {
           sinkgroups = value;
 
           return true;
         } else {
-          logger
-              .warn("Duplicate sinkgroup list specfied for agent: " + 
agentName);
-          errorList.add(new FlumeConfigurationError(agentName,
-              BasicConfigurationConstants.CONFIG_SINKGROUPS,
-              FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
-              ErrorOrWarning.ERROR));
+          LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", 
agentName);
+          addError(CONFIG_SINKGROUPS, DUPLICATE_PROPERTY, ERROR);
           return false;
         }
       }
 
-      ComponentNameAndConfigKey cnck = parseConfigKey(key,
-          BasicConfigurationConstants.CONFIG_SOURCES_PREFIX);
-
-      if (cnck != null) {
-        // it is a source
-        String name = cnck.getComponentName();
-        Context srcConf = sourceContextMap.get(name);
-
-        if (srcConf == null) {
-          srcConf = new Context();
-          sourceContextMap.put(name, srcConf);
-        }
-
-        srcConf.put(cnck.getConfigKey(), value);
+      if (addAsSourceConfig(key, value)
+          || addAsChannelValue(key, value)
+          || addAsSinkConfig(key, value)
+          || addAsSinkGroupConfig(key, value)
+          || addAsConfigFilterConfig(key, value)
+      ) {
         return true;
       }
 
-      cnck = parseConfigKey(key,
-          BasicConfigurationConstants.CONFIG_CHANNELS_PREFIX);
-
-      if (cnck != null) {
-        // it is a channel
-        String name = cnck.getComponentName();
-        Context channelConf = channelContextMap.get(name);
+      LOGGER.warn("Invalid property specified: {}", key);
+      addError(key, INVALID_PROPERTY, ERROR);
+      return false;
+    }
 
-        if (channelConf == null) {
-          channelConf = new Context();
-          channelContextMap.put(name, channelConf);
-        }
+    private boolean addAsConfigFilterConfig(String key, String value) {
+      return addComponentConfig(
+          key, value, CONFIG_CONFIGFILTERS_PREFIX, configFilterContextMap
+      );
+    }
 
-        channelConf.put(cnck.getConfigKey(), value);
-        return true;
-      }
+    private boolean addAsSinkGroupConfig(String key, String value) {
+      return addComponentConfig(
+          key, value, CONFIG_SINKGROUPS_PREFIX, sinkGroupContextMap
+      );
+    }
 
-      cnck = parseConfigKey(key,
-          BasicConfigurationConstants.CONFIG_SINKS_PREFIX);
+    private boolean addAsSinkConfig(String key, String value) {
+      return addComponentConfig(
+          key, value, CONFIG_SINKS_PREFIX, sinkContextMap
+      );
+    }
 
-      if (cnck != null) {
-        // it is a sink
-        String name = cnck.getComponentName().trim();
-        logger.info("Processing:" + name);
-        Context sinkConf = sinkContextMap.get(name);
+    private boolean addAsChannelValue(String key, String value) {
+      return addComponentConfig(
+          key, value, CONFIG_CHANNELS_PREFIX, channelContextMap
+      );
+    }
 
-        if (sinkConf == null) {
-          logger.debug("Created context for " + name + ": "
-              + cnck.getConfigKey());
-          sinkConf = new Context();
-          sinkContextMap.put(name, sinkConf);
-        }
+    private boolean addAsSourceConfig(String key, String value) {
+      return addComponentConfig(
+          key, value, CONFIG_SOURCES_PREFIX, sourceContextMap
+      );
+    }
 
-        sinkConf.put(cnck.getConfigKey(), value);
-        return true;
-      }
+    private boolean addComponentConfig(
+        String key, String value, String configPrefix, Map<String, Context> 
contextMap
 
-      cnck = parseConfigKey(key,
-          BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);
+    ) {
+      ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
+      if (parsed != null) {
+        String name = parsed.getComponentName().trim();
+        LOGGER.info("Processing:{}", name);
+        Context context = contextMap.get(name);
 
-      if (cnck != null) {
-        String name = cnck.getComponentName();
-        Context groupConf = sinkGroupContextMap.get(name);
-        if (groupConf == null) {
-          groupConf = new Context();
-          sinkGroupContextMap.put(name, groupConf);
+        if (context == null) {
+          LOGGER.debug("Created context for {}: {}", name, 
parsed.getConfigKey());
+          context = new Context();
+          contextMap.put(name, context);
         }
 
-        groupConf.put(cnck.getConfigKey(), value);
-
+        context.put(parsed.getConfigKey(), value);
         return true;
       }
 
-      logger.warn("Invalid property specified: " + key);
-      errorList.add(new FlumeConfigurationError(agentName, key,
-          FlumeConfigurationErrorType.INVALID_PROPERTY, ErrorOrWarning.ERROR));
       return false;
     }
 
@@ -1067,7 +1234,7 @@ public class FlumeConfiguration {
       String configKey = key.substring(prefix.length() + name.length() + 1);
 
       // name and config key must be non-empty
-      if (name.length() == 0 || configKey.length() == 0) {
+      if (name.isEmpty() || configKey.isEmpty()) {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
index b3b477f..f7860c3 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java
@@ -16,10 +16,8 @@
  */
 package org.apache.flume.conf.channel;
 
-import org.apache.flume.Context;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.ConfigurationException;
-import org.apache.flume.conf.source.SourceConfiguration;
 
 public class ChannelConfiguration extends ComponentConfiguration {
 
@@ -27,11 +25,6 @@ public class ChannelConfiguration extends 
ComponentConfiguration {
     super(componentName);
   }
 
-  @Override
-  public void configure(Context context) throws ConfigurationException {
-    super.configure(context);
-  }
-
   public enum ChannelConfigurationType {
     OTHER(null),
     MEMORY("org.apache.flume.conf.channel.MemoryChannelConfiguration"),
@@ -65,7 +58,7 @@ public class ChannelConfiguration extends 
ComponentConfiguration {
     @SuppressWarnings("unchecked")
     public ChannelConfiguration getConfiguration(String name)
         throws ConfigurationException {
-      if (this.equals(ChannelConfigurationType.OTHER)) {
+      if (this == OTHER) {
         return new ChannelConfiguration(name);
       }
       Class<? extends ChannelConfiguration> clazz;

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java
index e68e129..7e23284 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java
@@ -18,11 +18,9 @@ package org.apache.flume.conf.channel;
 
 import java.util.Set;
 
-import org.apache.flume.Context;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.ConfigurationException;
 import 
org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType;
-import org.apache.flume.conf.source.SourceConfiguration;
 
 public class ChannelSelectorConfiguration extends
     ComponentConfiguration {
@@ -44,10 +42,6 @@ public class ChannelSelectorConfiguration extends
     this.channelNames = channelNames;
   }
 
-  public void configure(Context context) throws ConfigurationException {
-    super.configure(context);
-  }
-
   public enum ChannelSelectorConfigurationType {
     OTHER(null),
     REPLICATING(null),
@@ -69,7 +63,7 @@ public class ChannelSelectorConfiguration extends
     public ChannelSelectorConfiguration getConfiguration(
         String name)
         throws ConfigurationException {
-      if (this.equals(ChannelConfigurationType.OTHER)) {
+      if (this == OTHER) {
         return new ChannelSelectorConfiguration(name);
       }
       Class<? extends ChannelSelectorConfiguration> clazz;

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
index 38b08ad..d1a0dc2 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
@@ -16,10 +16,12 @@
  */
 package org.apache.flume.conf.channel;
 
+import org.apache.flume.conf.ComponentWithClassName;
+
 /**
  * Enumeration of built in channel selector types available in the system.
  */
-public enum ChannelSelectorType {
+public enum ChannelSelectorType implements ComponentWithClassName {
 
   /**
    * Place holder for custom channel selectors not part of this enumeration.
@@ -42,7 +44,13 @@ public enum ChannelSelectorType {
     this.channelSelectorClassName = channelSelectorClassName;
   }
 
+  @Deprecated
   public String getChannelSelectorClassName() {
     return channelSelectorClassName;
   }
+
+  @Override
+  public String getClassName() {
+    return channelSelectorClassName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
index 2e3cb34..6d610a6 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java
@@ -18,10 +18,12 @@
  */
 package org.apache.flume.conf.channel;
 
+import org.apache.flume.conf.ComponentWithClassName;
+
 /**
  * Enumeration of built in channel types available in the system.
  */
-public enum ChannelType {
+public enum ChannelType implements ComponentWithClassName {
 
   /**
    * Place holder for custom channels not part of this enumeration.
@@ -59,7 +61,13 @@ public enum ChannelType {
     this.channelClassName = channelClassName;
   }
 
+  @Deprecated
   public String getChannelClassName() {
     return channelClassName;
   }
+
+  @Override
+  public String getClassName() {
+    return channelClassName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java
new file mode 100644
index 0000000..bdd718f
--- /dev/null
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.configfilter;
+
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.ConfigurationException;
+
+public class ConfigFilterConfiguration extends ComponentConfiguration {
+
+  public enum ConfigFilterConfigurationType {
+    OTHER(null),
+    
ENV("org.apache.flume.conf.configfilter.EnvironmentVariableConfigFilterConfiguration"),
+    
HADOOP("org.apache.flume.conf.configfilter.HadoopCredentialStoreConfigFilterConfiguration"),
+    
EXTERNAL("org.apache.flume.conf.configfilter.ExternalProcessConfigFilterConfiguration");
+
+    private final String configurationName;
+
+    ConfigFilterConfigurationType(String type) {
+      configurationName = type;
+    }
+
+    public String getConfigFilterConfigurationType() {
+      return configurationName;
+    }
+
+    @SuppressWarnings("unchecked")
+    public ConfigFilterConfiguration getConfiguration(String name)
+        throws ConfigurationException {
+      if (this == OTHER) {
+        return new ConfigFilterConfiguration(name);
+      }
+      Class<? extends ConfigFilterConfiguration> clazz;
+      ConfigFilterConfiguration instance = null;
+      try {
+        if (configurationName != null) {
+          clazz =
+              (Class<? extends ConfigFilterConfiguration>) Class
+                  .forName(configurationName);
+          instance = clazz.getConstructor(String.class).newInstance(name);
+        } else {
+          return new ConfigFilterConfiguration(name);
+        }
+      } catch (ClassNotFoundException e) {
+        // Could not find the configuration stub, do basic validation
+        instance = new ConfigFilterConfiguration(name);
+        // Let the caller know that this was created because of this exception.
+        instance.setNotFoundConfigClass();
+      } catch (Exception e) {
+        throw new ConfigurationException("Couldn't create configuration", e);
+      }
+      return instance;
+    }
+  }
+
+  protected ConfigFilterConfiguration(String componentName) {
+    super(componentName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java
new file mode 100644
index 0000000..ed8e3c6
--- /dev/null
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.configfilter;
+
+import org.apache.flume.conf.ComponentWithClassName;
+
+public enum ConfigFilterType implements ComponentWithClassName {
+  OTHER(null),
+  ENV("org.apache.flume.configfilter.EnvironmentVariableConfigFilter"),
+  HADOOP("org.apache.flume.configfilter.HadoopCredentialStoreConfigFilter"),
+  EXTERNAL("org.apache.flume.configfilter.ExternalProcessConfigFilter");
+
+  private final String className;
+
+  ConfigFilterType(String className) {
+    this.className = className;
+  }
+
+  @Override
+  public String getClassName() {
+    return className;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
index d6f4cbf..4538792 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
@@ -55,12 +55,6 @@ public class SinkConfiguration extends 
ComponentConfiguration {
 
   @Override
   public String toString(int indentCount) {
-    StringBuilder indentSb = new StringBuilder("");
-
-    for (int i = 0; i < indentCount; i++) {
-      indentSb.append(FlumeConfiguration.INDENTSTEP);
-    }
-
     String basicStr = super.toString(indentCount);
     StringBuilder sb = new StringBuilder();
     sb.append(basicStr).append(FlumeConfiguration.INDENTSTEP).append(
@@ -176,7 +170,7 @@ public class SinkConfiguration extends 
ComponentConfiguration {
     @SuppressWarnings("unchecked")
     public SinkConfiguration getConfiguration(String name)
         throws ConfigurationException {
-      if (this.equals(SinkConfigurationType.OTHER)) {
+      if (this == OTHER) {
         return new SinkConfiguration(name);
       }
       Class<? extends SinkConfiguration> clazz;

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
index c3fc2bb..a81b2cb 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java
@@ -91,7 +91,7 @@ public class SinkGroupConfiguration extends 
ComponentConfiguration {
     SinkProcessorType[] values = SinkProcessorType.values();
     for (SinkProcessorType value : values) {
       if (value.toString().equalsIgnoreCase(type)) return value;
-      String sinkProcessClassName = value.getSinkProcessorClassName();
+      String sinkProcessClassName = value.getClassName();
       if (sinkProcessClassName != null && 
sinkProcessClassName.equalsIgnoreCase(type)) {
         return value;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
index d0c9802..94383cc 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java
@@ -21,8 +21,6 @@ import java.util.Set;
 import org.apache.flume.Context;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.ConfigurationException;
-import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType;
-import org.apache.flume.conf.source.SourceConfiguration;
 
 public class SinkProcessorConfiguration extends ComponentConfiguration {
   protected Set<String> sinks;

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
index 19bd51a..d1b66c5 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
@@ -18,7 +18,9 @@
  */
 package org.apache.flume.conf.sink;
 
-public enum SinkProcessorType {
+import org.apache.flume.conf.ComponentWithClassName;
+
+public enum SinkProcessorType implements ComponentWithClassName {
   /**
    * Place holder for custom sinks not part of this enumeration.
    */
@@ -52,7 +54,13 @@ public enum SinkProcessorType {
     this.processorClassName = processorClassName;
   }
 
+  @Deprecated
   public String getSinkProcessorClassName() {
     return processorClassName;
   }
+
+  @Override
+  public String getClassName() {
+    return processorClassName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
index bfae570..c3f8cac 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
@@ -18,10 +18,12 @@
  */
 package org.apache.flume.conf.sink;
 
+import org.apache.flume.conf.ComponentWithClassName;
+
 /**
  * Enumeration of built in sink types available in the system.
  */
-public enum SinkType {
+public enum SinkType implements ComponentWithClassName {
 
   /**
    * Place holder for custom sinks not part of this enumeration.
@@ -120,8 +122,14 @@ public enum SinkType {
     this.sinkClassName = sinkClassName;
   }
 
+  @Deprecated
   public String getSinkClassName() {
     return sinkClassName;
   }
 
+
+  @Override
+  public String getClassName() {
+    return sinkClassName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index 237efa3..4edda3f 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -128,7 +128,7 @@ public class SourceConfiguration extends 
ComponentConfiguration {
     ChannelSelectorType[] values = ChannelSelectorType.values();
     for (ChannelSelectorType value : values) {
       if (value.toString().equalsIgnoreCase(type)) return value;
-      String clName = value.getChannelSelectorClassName();
+      String clName = value.getClassName();
       if (clName != null && clName.equalsIgnoreCase(type)) return value;
     }
     return null;
@@ -237,7 +237,7 @@ public class SourceConfiguration extends 
ComponentConfiguration {
     @SuppressWarnings("unchecked")
     public SourceConfiguration getConfiguration(String name)
         throws ConfigurationException {
-      if (this.equals(SourceConfigurationType.OTHER)) {
+      if (this == OTHER) {
         return new SourceConfiguration(name);
       }
       Class<? extends SourceConfiguration> clazz = null;

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index 3e7e7be..b556b6c 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -18,10 +18,12 @@
  */
 package org.apache.flume.conf.source;
 
+import org.apache.flume.conf.ComponentWithClassName;
+
 /**
  * Enumeration of built in source types available in the system.
  */
-public enum SourceType {
+public enum SourceType implements ComponentWithClassName {
 
   /**
    * Place holder for custom sources not part of this enumeration.
@@ -126,7 +128,13 @@ public enum SourceType {
     this.sourceClassName = sourceClassName;
   }
 
+  @Deprecated
   public String getSourceClassName() {
     return sourceClassName;
   }
+
+  @Override
+  public String getClassName() {
+    return sourceClassName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java
new file mode 100644
index 0000000..ba88c35
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf;
+
+import org.apache.flume.Context;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR;
+import static org.junit.Assert.*;
+
+public class TestAgentConfiguration {
+
+  public static final Map<String, String> PROPERTIES = new HashMap<>();
+  public static final String AGENT = "agent";
+  public static final String SINKS = AGENT + ".sinks";
+  public static final String SOURCES = AGENT + ".sources";
+  public static final String CHANNELS = AGENT + ".channels";
+
+  @BeforeClass
+  public static void setupClass() {
+    PROPERTIES.put(SOURCES, "s1 s2");
+    PROPERTIES.put(SOURCES + ".s1.type", "s1_type");
+    PROPERTIES.put(SOURCES + ".s1.channels", "c1");
+    PROPERTIES.put(SOURCES + ".s2.type", "jms");
+    PROPERTIES.put(SOURCES + ".s2.channels", "c2");
+    PROPERTIES.put(CHANNELS, "c1 c2");
+    PROPERTIES.put(CHANNELS + ".c1.type", "c1_type");
+    PROPERTIES.put(CHANNELS + ".c2.type", "memory");
+    PROPERTIES.put(SINKS, "k1 k2");
+    PROPERTIES.put(SINKS + ".k1.type", "k1_type");
+    PROPERTIES.put(SINKS + ".k2.type", "null");
+    PROPERTIES.put(SINKS + ".k1.channel", "c1");
+    PROPERTIES.put(SINKS + ".k2.channel", "c2");
+    PROPERTIES.put(AGENT + ".sinkgroups", "g1");
+    PROPERTIES.put(AGENT + ".sinkgroups.g1.sinks", "k1 k2");
+    PROPERTIES.put(AGENT + ".configfilters", "f1 f2");
+    PROPERTIES.put(AGENT + ".configfilters.f1.type", "f1_type");
+    PROPERTIES.put(AGENT + ".configfilters.f2.type", "env");
+  }
+
+  @Test
+  public void testConfigHasNoErrors() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    assertTrue(configuration.getConfigurationErrors().isEmpty());
+  }
+
+  @Test
+  public void testSourcesAdded() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Set<String> sourceSet = 
configuration.getConfigurationFor(AGENT).getSourceSet();
+    assertEquals(new HashSet<>(Arrays.asList("s1", "s2")), sourceSet);
+  }
+
+  @Test
+  public void testFiltersAdded() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Set<String> configFilterSet = 
configuration.getConfigurationFor(AGENT).getConfigFilterSet();
+    assertEquals(new HashSet<>(Arrays.asList("f1", "f2")), configFilterSet);
+  }
+
+  @Test
+  public void testSinksAdded() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Set<String> sinkSet = 
configuration.getConfigurationFor(AGENT).getSinkSet();
+    assertEquals(new HashSet<>(Arrays.asList("k1", "k2")), sinkSet);
+  }
+
+  @Test
+  public void testChannelsAdded() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Set<String> channelSet = 
configuration.getConfigurationFor(AGENT).getChannelSet();
+    assertEquals(new HashSet<>(Arrays.asList("c1", "c2")), channelSet);
+  }
+
+  @Test
+  public void testSinkGroupsAdded() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Set<String> sinkSet = 
configuration.getConfigurationFor(AGENT).getSinkgroupSet();
+    assertEquals(new HashSet<>(Arrays.asList("g1")), sinkSet);
+  }
+
+  @Test
+  public void testConfigFiltersMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, Context> contextMap =
+        configuration.getConfigurationFor(AGENT).getConfigFilterContext();
+    assertEquals("f1_type", contextMap.get("f1").getString("type"));
+  }
+
+  @Test
+  public void testSourcesMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, Context> contextMap = 
configuration.getConfigurationFor(AGENT).getSourceContext();
+    assertEquals("s1_type", contextMap.get("s1").getString("type"));
+  }
+
+  @Test
+  public void testSinksMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, Context> contextMap = 
configuration.getConfigurationFor(AGENT).getSinkContext();
+    assertEquals("k1_type", contextMap.get("k1").getString("type"));
+  }
+
+  @Test
+  public void testChannelsMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, Context> contextMap = 
configuration.getConfigurationFor(AGENT).getChannelContext();
+    assertEquals("c1_type", contextMap.get("c1").getString("type"));
+  }
+
+  @Test
+  public void testChannelsConfigMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, ComponentConfiguration> configMap =
+        configuration.getConfigurationFor(AGENT).getChannelConfigMap();
+    assertEquals("memory", configMap.get("c2").getType());
+  }
+
+  @Test
+  public void testConfigFilterConfigMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, ComponentConfiguration> configMap =
+        configuration.getConfigurationFor(AGENT).getConfigFilterConfigMap();
+    assertEquals("env", configMap.get("f2").getType());
+  }
+
+  @Test
+  public void testSourceConfigMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, ComponentConfiguration> configMap =
+        configuration.getConfigurationFor(AGENT).getSourceConfigMap();
+    assertEquals("jms", configMap.get("s2").getType());
+  }
+
+  @Test
+  public void testSinkConfigMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, ComponentConfiguration> configMap =
+        configuration.getConfigurationFor(AGENT).getSinkConfigMap();
+    assertEquals("null", configMap.get("k2").getType());
+  }
+
+  @Test
+  public void testSinkgroupConfigMappedCorrectly() {
+    FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES);
+    Map<String, ComponentConfiguration> configMap =
+        configuration.getConfigurationFor(AGENT).getSinkGroupConfigMap();
+    assertEquals("Sinkgroup", configMap.get("g1").getType());
+  }
+
+  @Test
+  public void testNoChannelIsInvalid() {
+    Map<String, String> properties = new HashMap<>(PROPERTIES);
+    properties.put(CHANNELS, "");
+    FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties);
+
+    assertFalse(flumeConfiguration.getConfigurationErrors().isEmpty());
+    assertNull(flumeConfiguration.getConfigurationFor(AGENT));
+  }
+
+  @Test
+  public void testNoSourcesIsValid() {
+    Map<String, String> properties = new HashMap<>(PROPERTIES);
+    properties.remove(SOURCES);
+    properties.remove(SOURCES + ".s1.type");
+    properties.remove(SOURCES + ".s1.channels");
+    properties.remove(SOURCES + ".s2.type");
+    properties.remove(SOURCES + ".s2.channels");
+    FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties);
+
+    assertConfigHasNoError(flumeConfiguration);
+    assertNotNull(flumeConfiguration.getConfigurationFor(AGENT));
+  }
+
+  @Test
+  public void testNoSinksIsValid() {
+    Map<String, String> properties = new HashMap<>(PROPERTIES);
+    properties.remove(SINKS);
+    properties.remove(SINKS + ".k1.type", "k1_type");
+    properties.remove(SINKS + ".k2.type", "null");
+    properties.remove(SINKS + ".k1.channel", "c1");
+    properties.remove(SINKS + ".k2.channel", "c2");
+    properties.remove(AGENT + ".sinkgroups", "g1");
+    properties.remove(AGENT + ".sinkgroups.g1.sinks", "k1 k2");
+
+    FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties);
+
+    assertConfigHasNoError(flumeConfiguration);
+    assertNotNull(flumeConfiguration.getConfigurationFor(AGENT));
+  }
+
+  private void assertConfigHasNoError(FlumeConfiguration configuration) {
+    List<FlumeConfigurationError> configurationErrors = 
configuration.getConfigurationErrors();
+    long count = 0L;
+    for (FlumeConfigurationError e : configurationErrors) {
+      if (e.getErrorOrWarning() == ERROR) {
+        count++;
+      }
+    }
+    assertTrue(count == 0);
+  }
+
+  @Test
+  public void testNoSourcesAndNoSinksIsInvalid() {
+    Map<String, String> properties = new HashMap<>(PROPERTIES);
+    properties.put(SOURCES, "");
+    properties.put(SINKS, "");
+    FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties);
+
+    assertFalse(flumeConfiguration.getConfigurationErrors().isEmpty());
+    assertNull(flumeConfiguration.getConfigurationFor(AGENT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java
index e206055..a881d00 100644
--- 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java
@@ -16,20 +16,27 @@
  */
 package org.apache.flume.conf;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Properties;
 
 import junit.framework.Assert;
 
 import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
 import org.junit.Test;
 
+import static 
org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR;
+import static org.apache.flume.conf.FlumeConfigurationErrorType.*;
+import static org.junit.Assert.assertEquals;
+
 public class TestFlumeConfiguration {
 
   /**
    * Test fails without FLUME-1743
    */
   @Test
-  public void testFLUME1743() throws Exception {
+  public void testFLUME1743() {
     Properties properties = new Properties();
     properties.put("agent1.channels", "ch0");
     properties.put("agent1.channels.ch0.type", "memory");
@@ -45,6 +52,9 @@ public class TestFlumeConfiguration {
     properties.put("agent1.sinks.sink0.type", "null");
     properties.put("agent1.sinks.sink0.channel", "ch0");
 
+    properties.put("agent1.configfilters", "f1");
+    properties.put("agent1.configfilters.f1.type", "env");
+
     FlumeConfiguration conf = new FlumeConfiguration(properties);
     AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1");
     Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1,
@@ -56,5 +66,102 @@ public class TestFlumeConfiguration {
     Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0"));
     Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0"));
     Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0"));
+    Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1"));
+  }
+
+  @Test
+  public void testFlumeConfigAdsErrorOnNullName() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put(null, "something");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "", "", ERROR);
+  }
+
+  @Test
+  public void testFlumeConfigAddsErrorOnNullValue() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("something", null);
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "", "", ERROR);
+  }
+
+
+  @Test
+  public void testFlumeConfigAddsErrorOnEmptyValue() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("something", "");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), PROPERTY_VALUE_NULL, "something", 
"", ERROR);
+  }
+
+
+  @Test
+  public void testFlumeConfigAddsErrorOnNoAgentNameValue() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("something", "value");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "something", 
"", ERROR);
+  }
+
+  @Test
+  public void testFlumeConfigAddsErrorOnEmptyAgentNameValue() {
+    Properties properties = new Properties();
+    properties.put(".something", "value");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), AGENT_NAME_MISSING, ".something", 
"", ERROR);
+  }
+
+  @Test
+  public void testFlumeConfigAddsErrorOnEmptyPropertyName() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("agent.", "something");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(1, configurationErrors.size());
+    assertError(configurationErrors.get(0), PROPERTY_NAME_NULL, "agent.", "", 
ERROR);
   }
+
+  @Test
+  public void testFlumeConfigAddsErrorOnInvalidConfig() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("agent.channels", "c1");
+    properties.put("agent.channel.c1", "cc1");
+    FlumeConfiguration config = new FlumeConfiguration(properties);
+
+    List<FlumeConfigurationError> configurationErrors = 
config.getConfigurationErrors();
+    assertEquals(4, configurationErrors.size());
+    assertError(configurationErrors.get(0), INVALID_PROPERTY, "agent", 
"channel.c1", ERROR);
+    assertError(configurationErrors.get(1), CONFIG_ERROR, "agent", "c1", 
ERROR);
+    assertError(configurationErrors.get(2), PROPERTY_VALUE_NULL, "agent", 
"channels", ERROR);
+    assertError(configurationErrors.get(3), AGENT_CONFIGURATION_INVALID, 
"agent", "", ERROR);
+  }
+
+  private void assertError(
+          FlumeConfigurationError error,
+          FlumeConfigurationErrorType agentNameMissing,
+          String componentName, String key,
+          ErrorOrWarning eow
+  ) {
+    assertEquals(agentNameMissing, error.getErrorType());
+    assertEquals("ComponentName mismatch.", componentName, 
error.getComponentName());
+    assertEquals("Key mismatch.", key, error.getKey());
+    assertEquals(eow, error.getErrorOrWarning());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java
new file mode 100644
index 0000000..ada2d8f
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFlumeConfigurationConfigFilter {
+
+  @Test
+  public void testFlumeConfigFilterWorks() {
+    Properties properties = new Properties();
+    properties.put("agent1.channels", "ch0");
+    properties.put("agent1.channels.ch0.type", "file");
+    properties.put("agent1.channels.ch0.param1", "${f1['param']}");
+    properties.put("agent1.channels.ch0.param2", "${f1['param\"]}");
+    properties.put("agent1.channels.ch0.param3", "${f1['null']}");
+    properties.put("agent1.channels.ch0.param4", "${f1['throw']}");
+
+    properties.put("agent1.sources", "src0");
+    properties.put("agent1.sources.src0.type", "multiport_syslogtcp");
+    properties.put("agent1.sources.src0.channels", "ch0");
+    properties.put("agent1.sources.src0.host", "${f1[host]}");
+    properties.put("agent1.sources.src0.ports", "10001 10002 10003");
+    properties.put("agent1.sources.src0.portHeader", 
"${f2[\"port\"]}-${f1['header']}");
+
+    properties.put("agent1.sinks", "sink0");
+    properties.put("agent1.sinks.sink0.type", "thrift");
+    properties.put("agent1.sinks.sink0.param", "${f2['param']}");
+    properties.put("agent1.sinks.sink0.channel", "ch0");
+
+    properties.put("agent1.configfilters", "f1 f2");
+    properties.put("agent1.configfilters.f1.type",
+        "org.apache.flume.conf.configfilter.MockConfigFilter");
+    properties.put("agent1.configfilters.f2.type",
+        "org.apache.flume.conf.configfilter.MockConfigFilter");
+
+    FlumeConfiguration conf = new FlumeConfiguration(properties);
+    AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1");
+    Context src0 = agentConfiguration.getSourceContext().get("src0");
+    assertEquals("filtered_host", src0.getString("host"));
+    assertEquals("filtered_port-filtered_header", 
src0.getString("portHeader"));
+
+    Context sink0 = agentConfiguration.getSinkContext().get("sink0");
+    assertEquals("filtered_param", sink0.getString("param"));
+
+    Context ch0 = agentConfiguration.getChannelContext().get("ch0");
+    assertEquals("filtered_param", ch0.getString("param1"));
+    assertEquals("${f1['param\"]}", ch0.getString("param2"));
+    assertEquals("${f1['null']}", ch0.getString("param3"));
+    assertEquals("${f1['throw']}", ch0.getString("param4"));
+  }
+
+}

Reply via email to