http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java index 8ae5bd9..e6c7875 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java @@ -21,6 +21,9 @@ import org.apache.flume.conf.ComponentConfiguration.ComponentType; import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning; import org.apache.flume.conf.channel.ChannelConfiguration; import org.apache.flume.conf.channel.ChannelType; +import org.apache.flume.configfilter.ConfigFilter; +import org.apache.flume.conf.configfilter.ConfigFilterConfiguration; +import org.apache.flume.conf.configfilter.ConfigFilterType; import org.apache.flume.conf.sink.SinkConfiguration; import org.apache.flume.conf.sink.SinkGroupConfiguration; import org.apache.flume.conf.sink.SinkType; @@ -39,9 +42,34 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CHANNELS; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CHANNELS_PREFIX; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIG; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS_PREFIX; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS_PREFIX; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SOURCES; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SOURCES_PREFIX; +import static org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR; +import static org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.WARNING; +import static org.apache.flume.conf.FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID; +import static org.apache.flume.conf.FlumeConfigurationErrorType.AGENT_NAME_MISSING; +import static org.apache.flume.conf.FlumeConfigurationErrorType.CONFIG_ERROR; +import static org.apache.flume.conf.FlumeConfigurationErrorType.DUPLICATE_PROPERTY; +import static org.apache.flume.conf.FlumeConfigurationErrorType.INVALID_PROPERTY; +import static org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_NAME_NULL; +import static org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_PART_OF_ANOTHER_GROUP; +import static org.apache.flume.conf.FlumeConfigurationErrorType.PROPERTY_VALUE_NULL; /** * <p> @@ -58,7 +86,7 @@ import java.util.StringTokenizer; */ public class FlumeConfiguration { - private static final Logger logger = LoggerFactory.getLogger(FlumeConfiguration.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FlumeConfiguration.class); private final Map<String, AgentConfiguration> agentConfigMap; private final LinkedList<FlumeConfigurationError> errors; @@ -71,13 +99,12 @@ public class FlumeConfiguration { */ @Deprecated public FlumeConfiguration(Properties properties) { - agentConfigMap = new HashMap<String, AgentConfiguration>(); - errors = new LinkedList<FlumeConfigurationError>(); + agentConfigMap = new HashMap<>(); + errors = new LinkedList<>(); // Construct the in-memory component hierarchy - for (Object name : properties.keySet()) { - Object value = properties.get(name); - if (!addRawProperty(name.toString(), value.toString())) { - logger.warn("Configuration property ignored: " + name + " = " + value); + for (Entry entry : properties.entrySet()) { + if (!addRawProperty(entry.getKey().toString(), entry.getValue().toString())) { + LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue()); } } // Now iterate thru the agentContext and create agent configs and add them @@ -91,14 +118,12 @@ public class FlumeConfiguration { * Creates a populated Flume Configuration object. */ public FlumeConfiguration(Map<String, String> properties) { - agentConfigMap = new HashMap<String, AgentConfiguration>(); - errors = new LinkedList<FlumeConfigurationError>(); + agentConfigMap = new HashMap<>(); + errors = new LinkedList<>(); // Construct the in-memory component hierarchy - for (String name : properties.keySet()) { - String value = properties.get(name); - - if (!addRawProperty(name, value)) { - logger.warn("Configuration property ignored: " + name + " = " + value); + for (Entry<String, String> entry : properties.entrySet()) { + if (!addRawProperty(entry.getKey(), entry.getValue())) { + LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue()); } } // Now iterate thru the agentContext and create agent configs and add them @@ -117,83 +142,68 @@ public class FlumeConfiguration { } private void validateConfiguration() { - Iterator<String> it = agentConfigMap.keySet().iterator(); + Set<Entry<String, AgentConfiguration>> entries = agentConfigMap.entrySet(); + Iterator<Entry<String, AgentConfiguration>> it = entries.iterator(); while (it.hasNext()) { - String agentName = it.next(); - AgentConfiguration aconf = agentConfigMap.get(agentName); + Entry<String, AgentConfiguration> next = it.next(); + String agentName = next.getKey(); + AgentConfiguration aconf = next.getValue(); if (!aconf.isValid()) { - logger.warn("Agent configuration invalid for agent '" + agentName - + "'. It will be removed."); - errors.add(new FlumeConfigurationError(agentName, "", - FlumeConfigurationErrorType.AGENT_CONFIGURATION_INVALID, - ErrorOrWarning.ERROR)); - + LOGGER.warn("Agent configuration invalid for agent '{}'. It will be removed.", agentName); + addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR); it.remove(); } - logger.debug("Channels:" + aconf.channels + "\n"); - logger.debug("Sinks " + aconf.sinks + "\n"); - logger.debug("Sources " + aconf.sources + "\n"); + LOGGER.debug("Channels:{}\n", aconf.channels); + LOGGER.debug("Sinks {}\n", aconf.sinks); + LOGGER.debug("Sources {}\n", aconf.sources); } - logger.info("Post-validation flume configuration contains configuration" - + " for agents: " + agentConfigMap.keySet()); + LOGGER.info( + "Post-validation flume configuration contains configuration for agents: {}", + agentConfigMap.keySet() + ); } - private boolean addRawProperty(String name, String value) { + private boolean addRawProperty(String rawName, String rawValue) { // Null names and values not supported - if (name == null || value == null) { - errors - .add(new FlumeConfigurationError("", "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + if (rawName == null || rawValue == null) { + addError("", AGENT_NAME_MISSING, ERROR); return false; } + // Remove leading and trailing spaces + String name = rawName.trim(); + String value = rawValue.trim(); + // Empty values are not supported - if (value.trim().length() == 0) { - errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); + if (value.isEmpty()) { + addError(name, PROPERTY_VALUE_NULL, ERROR); return false; } - // Remove leading and trailing spaces - name = name.trim(); - value = value.trim(); - int index = name.indexOf('.'); // All configuration keys must have a prefix defined as agent name if (index == -1) { - errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + addError(name, AGENT_NAME_MISSING, ERROR); return false; } String agentName = name.substring(0, index); // Agent name must be specified for all properties - if (agentName.length() == 0) { - errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + if (agentName.isEmpty()) { + addError(name, AGENT_NAME_MISSING, ERROR); return false; } String configKey = name.substring(index + 1); // Configuration key must be specified for every property - if (configKey.length() == 0) { - errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.PROPERTY_NAME_NULL, - ErrorOrWarning.ERROR)); + if (configKey.isEmpty()) { + addError(name, PROPERTY_NAME_NULL, ERROR); return false; } @@ -209,9 +219,16 @@ public class FlumeConfiguration { return aconf.addProperty(configKey, value); } + private void addError( + String component, FlumeConfigurationErrorType errorType, ErrorOrWarning level + ) { + errors.add(new FlumeConfigurationError(component, "", errorType, level)); + } + public static class AgentConfiguration { private final String agentName; + private String configFilters; private String sources; private String sinks; private String channels; @@ -221,32 +238,40 @@ public class FlumeConfiguration { private final Map<String, ComponentConfiguration> sinkConfigMap; private final Map<String, ComponentConfiguration> channelConfigMap; private final Map<String, ComponentConfiguration> sinkgroupConfigMap; + private final Map<String, ComponentConfiguration> configFilterConfigMap; + private Map<String, Context> configFilterContextMap; private Map<String, Context> sourceContextMap; private Map<String, Context> sinkContextMap; private Map<String, Context> channelContextMap; private Map<String, Context> sinkGroupContextMap; private Set<String> sinkSet; + private Set<String> configFilterSet; private Set<String> sourceSet; private Set<String> channelSet; private Set<String> sinkgroupSet; private final List<FlumeConfigurationError> errorList; + private List<ConfigFilter> configFiltersInstances; + private Map<String, Pattern> configFilterPatternCache; private AgentConfiguration(String agentName, List<FlumeConfigurationError> errorList) { this.agentName = agentName; this.errorList = errorList; - sourceConfigMap = new HashMap<String, ComponentConfiguration>(); - sinkConfigMap = new HashMap<String, ComponentConfiguration>(); - channelConfigMap = new HashMap<String, ComponentConfiguration>(); - sinkgroupConfigMap = new HashMap<String, ComponentConfiguration>(); - sourceContextMap = new HashMap<String, Context>(); - sinkContextMap = new HashMap<String, Context>(); - channelContextMap = new HashMap<String, Context>(); - sinkGroupContextMap = new HashMap<String, Context>(); - + configFilterConfigMap = new HashMap<>(); + sourceConfigMap = new HashMap<>(); + sinkConfigMap = new HashMap<>(); + channelConfigMap = new HashMap<>(); + sinkgroupConfigMap = new HashMap<>(); + configFilterContextMap = new HashMap<>(); + sourceContextMap = new HashMap<>(); + sinkContextMap = new HashMap<>(); + channelContextMap = new HashMap<>(); + sinkGroupContextMap = new HashMap<>(); + configFiltersInstances = new ArrayList<>(); + configFilterPatternCache = new HashMap<>(); } public Map<String, ComponentConfiguration> getChannelConfigMap() { @@ -257,6 +282,10 @@ public class FlumeConfiguration { return sourceConfigMap; } + public Map<String, ComponentConfiguration> getConfigFilterConfigMap() { + return configFilterConfigMap; + } + public Map<String, ComponentConfiguration> getSinkConfigMap() { return sinkConfigMap; } @@ -265,22 +294,30 @@ public class FlumeConfiguration { return sinkgroupConfigMap; } + public Map<String, Context> getConfigFilterContext() { + return configFilterContextMap; + } + public Map<String, Context> getSourceContext() { - return this.sourceContextMap; + return sourceContextMap; } public Map<String, Context> getSinkContext() { - return this.sinkContextMap; + return sinkContextMap; } public Map<String, Context> getChannelContext() { - return this.channelContextMap; + return channelContextMap; } public Set<String> getSinkSet() { return sinkSet; } + public Set<String> getConfigFilterSet() { + return configFilterSet; + } + public Set<String> getSourceSet() { return sourceSet; } @@ -310,36 +347,35 @@ public class FlumeConfiguration { * @return true if the configuration is valid, false otherwise */ private boolean isValid() { - logger.debug("Starting validation of configuration for agent: {}", agentName); - if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { - logger.debug("Initial configuration: {}", this.getPrevalidationConfig()); + LOGGER.debug("Starting validation of configuration for agent: {}", agentName); + if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + LOGGER.debug("Initial configuration: {}", getPrevalidationConfig()); } + configFilterSet = validateConfigFilterSet(); + createConfigFilters(); + runFiltersThroughConfigs(); + // Make sure that at least one channel is specified - if (channels == null || channels.trim().length() == 0) { - logger.warn("Agent configuration for '" + agentName - + "' does not contain any channels. Marking it as invalid."); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_CHANNELS, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); + if (channels == null || channels.trim().isEmpty()) { + LOGGER.warn( + "Agent configuration for '{}' does not contain any channels. Marking it as invalid.", + agentName + ); + addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR); return false; } - channelSet = - new HashSet<String>(Arrays - .asList(channels.split("\\s+"))); - // validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL, - // ATTR_TYPE); + channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+"))); channelSet = validateChannels(channelSet); - if (channelSet.size() == 0) { - logger.warn("Agent configuration for '" + agentName - + "' does not contain any valid channels. Marking it as invalid."); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_CHANNELS, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); + if (channelSet.isEmpty()) { + LOGGER.warn( + "Agent configuration for '{}' does not contain any valid channels. " + + "Marking it as invalid.", + agentName + ); + addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR); return false; } @@ -348,75 +384,155 @@ public class FlumeConfiguration { sinkgroupSet = validateGroups(sinkSet); // If no sources or sinks are present, then this is invalid - if (sourceSet.size() == 0 && sinkSet.size() == 0) { - logger.warn("Agent configuration for '" + agentName - + "' has no sources or sinks. Will be marked invalid."); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SOURCES, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SINKS, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); + if (sourceSet.isEmpty() && sinkSet.isEmpty()) { + LOGGER.warn( + "Agent configuration for '{}' has no sources or sinks. Will be marked invalid.", + agentName + ); + addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR); + addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR); return false; } // Now rewrite the sources/sinks/channels - this.sources = getSpaceDelimitedList(sourceSet); - this.channels = getSpaceDelimitedList(channelSet); - this.sinks = getSpaceDelimitedList(sinkSet); - this.sinkgroups = getSpaceDelimitedList(sinkgroupSet); + this.configFilters = getSpaceDelimitedList(configFilterSet); + sources = getSpaceDelimitedList(sourceSet); + channels = getSpaceDelimitedList(channelSet); + sinks = getSpaceDelimitedList(sinkSet); + sinkgroups = getSpaceDelimitedList(sinkgroupSet); - if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { - logger.debug("Post validation configuration for {}", agentName); - logger.debug(this.getPostvalidationConfig()); + if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + LOGGER.debug("Post validation configuration for {}", agentName); + LOGGER.debug(getPostvalidationConfig()); } return true; } - private ChannelType getKnownChannel(String type) { - ChannelType[] values = ChannelType.values(); - for (ChannelType value : values) { - if (value.toString().equalsIgnoreCase(type)) return value; + private void runFiltersThroughConfigs() { + runFiltersOnContextMaps( + sourceContextMap, + channelContextMap, + sinkContextMap, + sinkGroupContextMap + ); + } - String channel = value.getChannelClassName(); + private void runFiltersOnContextMaps(Map<String, Context>... maps) { + for (Map<String, Context> map: maps) { + for (Context context : map.values()) { + for (String key : context.getParameters().keySet()) { + filterValue(context, key); + } + } + } + } - if (channel != null && channel.equalsIgnoreCase(type)) return value; + private void createConfigFilters() { + for (String name: configFilterSet) { + Context context = configFilterContextMap.get(name); + ComponentConfiguration componentConfiguration = configFilterConfigMap.get(name); + try { + if (context != null) { + ConfigFilter configFilter = ConfigFilterFactory.create( + name, context.getString(BasicConfigurationConstants.CONFIG_TYPE) + ); + configFilter.initializeWithConfiguration(context.getParameters()); + configFiltersInstances.add(configFilter); + configFilterPatternCache.put(configFilter.getName(), + createConfigFilterPattern(configFilter)); + } else if (componentConfiguration != null) { + ConfigFilter configFilter = ConfigFilterFactory.create( + componentConfiguration.getComponentName(), componentConfiguration.getType() + ); + configFiltersInstances.add(configFilter); + configFilterPatternCache.put(configFilter.getName(), + createConfigFilterPattern(configFilter)); + } + } catch (Exception e) { + LOGGER.error("Error while creating config filter {}", name, e); + } + } + } + private Pattern createConfigFilterPattern(ConfigFilter configFilter) { + //JAVA EL expression style ${myFilterName['my_key']} or + //JAVA EL expression style ${myFilterName["my_key"]} or + //JAVA EL expression style ${myFilterName[my_key]} + return Pattern.compile( + "\\$\\{" + // ${ + Pattern.quote(configFilter.getName()) + //<filterComponentName> + "\\[(|'|\")" + // delimiter :'," or nothing + "(?<key>[-_a-zA-Z0-9]+)" + // key + "\\1\\]" + // matching delimiter + "\\}" // } + ); + } + + private void filterValue(Context c, String contextKey) { + for (ConfigFilter configFilter : configFiltersInstances) { + try { + Pattern pattern = configFilterPatternCache.get(configFilter.getName()); + String currentValue = c.getString(contextKey); + Matcher matcher = pattern.matcher(currentValue); + String filteredValue = currentValue; + while (matcher.find()) { + String key = matcher.group("key"); + LOGGER.debug("Replacing {} from config filter {}", key, configFilter.getName()); + String filtered = configFilter.filter(key); + if (filtered == null) { + continue; + } + String fullMatch = matcher.group(); + filteredValue = filteredValue.replace(fullMatch, filtered); + } + c.put(contextKey, filteredValue); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("Error while matching and filtering configFilter: {} and key: {}", + new Object[]{configFilter.getName(), contextKey, e}); + } } - return null; + } + + private void addError(String key, FlumeConfigurationErrorType errorType, ErrorOrWarning level) { + errorList.add(new FlumeConfigurationError(agentName, key, errorType, level)); + } + + private ChannelType getKnownChannel(String type) { + return getKnownComponent(type, ChannelType.values()); } private SinkType getKnownSink(String type) { - SinkType[] values = SinkType.values(); - for (SinkType value : values) { - if (value.toString().equalsIgnoreCase(type)) return value; - String sink = value.getSinkClassName(); - if (sink != null && sink.equalsIgnoreCase(type)) return value; - } - return null; + return getKnownComponent(type, SinkType.values()); } private SourceType getKnownSource(String type) { - SourceType[] values = SourceType.values(); - for (SourceType value : values) { + return getKnownComponent(type, SourceType.values()); + } + + private ConfigFilterType getKnownConfigFilter(String type) { + return getKnownComponent(type, ConfigFilterType.values()); + } + + private <T extends ComponentWithClassName> T getKnownComponent(String type, T[] values) { + for (T value : values) { if (value.toString().equalsIgnoreCase(type)) return value; - String src = value.getSourceClassName(); + String src = value.getClassName(); if (src != null && src.equalsIgnoreCase(type)) return value; } return null; } + /** * If it is a known component it will do the full validation required for * that component, else it will do the validation required for that class. */ private Set<String> validateChannels(Set<String> channelSet) { Iterator<String> iter = channelSet.iterator(); - Map<String, Context> newContextMap = new HashMap<String, Context>(); + Map<String, Context> newContextMap = new HashMap<>(); ChannelConfiguration conf = null; /* * The logic for the following code: @@ -449,7 +565,7 @@ public class FlumeConfiguration { String config = null; // Not a known channel - cannot do specific validation to this channel if (chType == null) { - config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG); + config = channelContext.getString(CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { @@ -464,7 +580,7 @@ public class FlumeConfiguration { conf = (ChannelConfiguration) ComponentConfigurationFactory.create( channelName, config, ComponentType.CHANNEL); - logger.debug("Created channel " + channelName); + LOGGER.debug("Created channel {}", channelName); if (conf != null) { conf.configure(channelContext); } @@ -483,14 +599,15 @@ public class FlumeConfiguration { // thrown if (conf != null) errorList.addAll(conf.getErrors()); iter.remove(); - logger.warn("Could not configure channel " + channelName - + " due to: " + e.getMessage(), e); + LOGGER.warn( + "Could not configure channel {} due to: {}", + new Object[]{channelName, e.getMessage(), e} + ); } } else { iter.remove(); - errorList.add(new FlumeConfigurationError(agentName, channelName, - FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); + addError(channelName, CONFIG_ERROR, ERROR); } } channelContextMap = newContextMap; @@ -501,15 +618,89 @@ public class FlumeConfiguration { return channelSet; } + private Set<String> validateConfigFilterSet() { + if (configFilters == null || configFilters.isEmpty()) { + LOGGER.warn("Agent configuration for '{}' has no configfilters.", agentName); + return new HashSet<>(); + } + Set<String> configFilterSet = new HashSet<>(Arrays.asList(configFilters.split("\\s+"))); + Map<String, Context> newContextMap = new HashMap<>(); + + Iterator<String> iter = configFilterSet.iterator(); + ConfigFilterConfiguration conf = null; + while (iter.hasNext()) { + String configFilterName = iter.next(); + Context configFilterContext = configFilterContextMap.get(configFilterName); + if (configFilterContext != null) { + + + // Get the configuration object for the channel: + ConfigFilterType chType = getKnownConfigFilter(configFilterContext.getString( + BasicConfigurationConstants.CONFIG_TYPE)); + boolean configSpecified = false; + String config = null; + // Not a known channel - cannot do specific validation to this channel + if (chType == null) { + config = configFilterContext.getString(CONFIG_CONFIG); + if (config == null || config.isEmpty()) { + config = "OTHER"; + } else { + configSpecified = true; + } + } else { + config = chType.toString().toUpperCase(Locale.ENGLISH); + configSpecified = true; + } + + try { + conf = + (ConfigFilterConfiguration) ComponentConfigurationFactory.create( + configFilterName, config, ComponentType.CONFIG_FILTER); + LOGGER.debug("Created configfilter {}", configFilterName); + if (conf != null) { + conf.configure(configFilterContext); + } + if ((configSpecified && conf.isNotFoundConfigClass()) || + !configSpecified) { + newContextMap.put(configFilterName, configFilterContext); + } else if (configSpecified) { + configFilterConfigMap.put(configFilterName, conf); + } + + if (conf != null) { + errorList.addAll(conf.getErrors()); + } + } catch (ConfigurationException e) { + if (conf != null) errorList.addAll(conf.getErrors()); + iter.remove(); + LOGGER.warn( + "Could not configure configfilter {} due to: {}", + new Object[]{configFilterName, e.getMessage(), e} + ); + + } + } else { + iter.remove(); + addError(configFilterName, CONFIG_ERROR, ERROR); + LOGGER.warn("Configuration empty for: {}. Removed.", configFilterName); + } + } + + configFilterContextMap = newContextMap; + Set<String> tempchannelSet = new HashSet<String>(); + tempchannelSet.addAll(configFilterConfigMap.keySet()); + tempchannelSet.addAll(configFilterContextMap.keySet()); + configFilterSet.retainAll(tempchannelSet); + + return configFilterSet; + } + + private Set<String> validateSources(Set<String> channelSet) { //Arrays.split() call will throw NPE if the sources string is empty if (sources == null || sources.isEmpty()) { - logger.warn("Agent configuration for '" + agentName - + "' has no sources."); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SOURCES, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.WARNING)); + LOGGER.warn("Agent configuration for '{}' has no sources.", agentName); + addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING); return new HashSet<String>(); } Set<String> sourceSet = @@ -546,7 +737,7 @@ public class FlumeConfiguration { BasicConfigurationConstants.CONFIG_TYPE)); if (srcType == null) { config = srcContext.getString( - BasicConfigurationConstants.CONFIG_CONFIG); + CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { @@ -573,7 +764,7 @@ public class FlumeConfiguration { throw new ConfigurationException( "No Channels configured for " + sourceName); } - srcContext.put(BasicConfigurationConstants.CONFIG_CHANNELS, + srcContext.put(CONFIG_CHANNELS, this.getSpaceDelimitedList(channels)); } if ((configSpecified && srcConf.isNotFoundConfigClass()) || @@ -586,14 +777,15 @@ public class FlumeConfiguration { } catch (ConfigurationException e) { if (srcConf != null) errorList.addAll(srcConf.getErrors()); iter.remove(); - logger.warn("Could not configure source " + sourceName - + " due to: " + e.getMessage(), e); + LOGGER.warn( + "Could not configure source {} due to: {}", + new Object[]{sourceName, e.getMessage(), e} + ); } } else { iter.remove(); - errorList.add(new FlumeConfigurationError(agentName, sourceName, - FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); - logger.warn("Configuration empty for: " + sourceName + ".Removed."); + addError(sourceName, CONFIG_ERROR, ERROR); + LOGGER.warn("Configuration empty for: {}.Removed.", sourceName); } } @@ -614,12 +806,8 @@ public class FlumeConfiguration { Set<String> sinkSet; SinkConfiguration sinkConf = null; if (sinks == null || sinks.isEmpty()) { - logger.warn("Agent configuration for '" + agentName - + "' has no sinks."); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SINKS, - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.WARNING)); + LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName); + addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING); return new HashSet<String>(); } else { sinkSet = @@ -650,9 +838,8 @@ public class FlumeConfiguration { Context sinkContext = sinkContextMap.get(sinkName.trim()); if (sinkContext == null) { iter.remove(); - logger.warn("no context for sink" + sinkName); - errorList.add(new FlumeConfigurationError(agentName, sinkName, - FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); + LOGGER.warn("no context for sink{}", sinkName); + addError(sinkName, CONFIG_ERROR, ERROR); } else { String config = null; boolean configSpecified = false; @@ -660,7 +847,7 @@ public class FlumeConfiguration { BasicConfigurationConstants.CONFIG_TYPE)); if (sinkType == null) { config = sinkContext.getString( - BasicConfigurationConstants.CONFIG_CONFIG); + CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { @@ -671,7 +858,7 @@ public class FlumeConfiguration { configSpecified = true; } try { - logger.debug("Creating sink: " + sinkName + " using " + config); + LOGGER.debug("Creating sink: {} using {}", sinkName, config); sinkConf = (SinkConfiguration) ComponentConfigurationFactory.create( @@ -694,8 +881,10 @@ public class FlumeConfiguration { } catch (ConfigurationException e) { iter.remove(); if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); - logger.warn("Could not configure sink " + sinkName - + " due to: " + e.getMessage(), e); + LOGGER.warn( + "Could not configure sink {} due to: {}", + new Object[]{sinkName, e.getMessage(), e} + ); } } // Filter out any sinks that have invalid channel @@ -740,15 +929,11 @@ public class FlumeConfiguration { if (conf != null) errorList.addAll(conf.getErrors()); if (groupSinks != null && !groupSinks.isEmpty()) { List<String> sinkArray = new ArrayList<String>(); - for (String sink : groupSinks) { - sinkArray.add(sink); - } + sinkArray.addAll(groupSinks); conf.setSinks(sinkArray); sinkgroupConfigMap.put(sinkgroupName, conf); } else { - errorList.add(new FlumeConfigurationError(agentName, sinkgroupName, - FlumeConfigurationErrorType.CONFIG_ERROR, - ErrorOrWarning.ERROR)); + addError(sinkgroupName, CONFIG_ERROR, ERROR); if (conf != null) errorList.addAll(conf.getErrors()); throw new ConfigurationException( "No available sinks for sinkgroup: " + sinkgroupName @@ -757,19 +942,16 @@ public class FlumeConfiguration { } catch (ConfigurationException e) { iter.remove(); - errorList - .add(new FlumeConfigurationError(agentName, sinkgroupName, - FlumeConfigurationErrorType.CONFIG_ERROR, - ErrorOrWarning.ERROR)); - logger.warn("Could not configure sink group " + sinkgroupName - + " due to: " + e.getMessage(), e); + addError(sinkgroupName, CONFIG_ERROR, ERROR); + LOGGER.warn( + "Could not configure sink group {} due to: {}", + new Object[]{sinkgroupName, e.getMessage(), e} + ); } } else { iter.remove(); - errorList.add(new FlumeConfigurationError(agentName, sinkgroupName, - FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); - logger.warn("Configuration error for: " + sinkgroupName - + ".Removed."); + addError(sinkgroupName, CONFIG_ERROR, ERROR); + LOGGER.warn("Configuration error for: {}.Removed.", sinkgroupName); } } @@ -800,23 +982,20 @@ public class FlumeConfiguration { while (sinkIt.hasNext()) { String curSink = sinkIt.next(); if (usedSinks.containsKey(curSink)) { - logger.warn("Agent configuration for '" + agentName + "' sinkgroup '" - + groupConf.getComponentName() + "' sink '" + curSink - + "' in use by " + "another group: '" + usedSinks.get(curSink) - + "', sink not added"); - errorList.add(new FlumeConfigurationError(agentName, groupConf - .getComponentName(), - FlumeConfigurationErrorType.PROPERTY_PART_OF_ANOTHER_GROUP, - ErrorOrWarning.ERROR)); + LOGGER.warn( + "Agent configuration for '{}' sinkgroup '{}' sink '{}' in use by another group: " + + "'{}', sink not added", + new Object[]{agentName, groupConf.getComponentName(), curSink, usedSinks.get(curSink)} + ); + addError(groupConf.getComponentName(), PROPERTY_PART_OF_ANOTHER_GROUP, ERROR); sinkIt.remove(); continue; } else if (!sinkSet.contains(curSink)) { - logger.warn("Agent configuration for '" + agentName + "' sinkgroup '" - + groupConf.getComponentName() + "' sink not found: '" + curSink - + "', sink not added"); - errorList.add(new FlumeConfigurationError(agentName, curSink, - FlumeConfigurationErrorType.INVALID_PROPERTY, - ErrorOrWarning.ERROR)); + LOGGER.warn("Agent configuration for '{}' sinkgroup '{}' sink not found: '{}', " + + " sink not added", + new Object[]{agentName, groupConf.getComponentName(), curSink} + ); + addError(curSink, INVALID_PROPERTY, ERROR); sinkIt.remove(); continue; } else { @@ -827,11 +1006,11 @@ public class FlumeConfiguration { } private String getSpaceDelimitedList(Set<String> entries) { - if (entries.size() == 0) { + if (entries.isEmpty()) { return null; } - StringBuilder sb = new StringBuilder(""); + StringBuilder sb = new StringBuilder(); for (String entry : entries) { sb.append(" ").append(entry); @@ -854,10 +1033,11 @@ public class FlumeConfiguration { public String getPrevalidationConfig() { StringBuilder sb = new StringBuilder("AgentConfiguration["); - sb.append(agentName).append("]").append(NEWLINE).append("SOURCES: "); - sb.append(sourceContextMap).append(NEWLINE).append("CHANNELS: "); - sb.append(channelContextMap).append(NEWLINE).append("SINKS: "); - sb.append(sinkContextMap).append(NEWLINE); + sb.append(agentName).append("]").append(NEWLINE); + sb.append("CONFIG_FILTERS: ").append(configFilterContextMap).append(NEWLINE); + sb.append("SOURCES: ").append(sourceContextMap).append(NEWLINE); + sb.append("CHANNELS: ").append(channelContextMap).append(NEWLINE); + sb.append("SINKS: ").append(sinkContextMap).append(NEWLINE); return sb.toString(); } @@ -907,145 +1087,132 @@ public class FlumeConfiguration { } private boolean addProperty(String key, String value) { + // Check for configFilters + if (CONFIG_CONFIGFILTERS.equals(key)) { + if (configFilters == null) { + configFilters = value; + return true; + } else { + LOGGER.warn("Duplicate configfilter list specified for agent: {}", agentName); + addError(CONFIG_CONFIGFILTERS, DUPLICATE_PROPERTY, ERROR); + return false; + } + } // Check for sources - if (key.equals(BasicConfigurationConstants.CONFIG_SOURCES)) { + if (CONFIG_SOURCES.equals(key)) { if (sources == null) { sources = value; return true; } else { - logger - .warn("Duplicate source list specified for agent: " + agentName); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SOURCES, - FlumeConfigurationErrorType.DUPLICATE_PROPERTY, - ErrorOrWarning.ERROR)); + LOGGER.warn("Duplicate source list specified for agent: {}", agentName); + addError(CONFIG_SOURCES, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for sinks - if (key.equals(BasicConfigurationConstants.CONFIG_SINKS)) { + if (CONFIG_SINKS.equals(key)) { if (sinks == null) { sinks = value; - logger.info("Added sinks: " + sinks + " Agent: " + this.agentName); + LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName); return true; } else { - logger.warn("Duplicate sink list specfied for agent: " + agentName); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SINKS, - FlumeConfigurationErrorType.DUPLICATE_PROPERTY, - ErrorOrWarning.ERROR)); + LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName); + addError(CONFIG_SINKS, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for channels - if (key.equals(BasicConfigurationConstants.CONFIG_CHANNELS)) { + if (CONFIG_CHANNELS.equals(key)) { if (channels == null) { channels = value; return true; } else { - logger.warn("Duplicate channel list specified for agent: " - + agentName); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_CHANNELS, - FlumeConfigurationErrorType.DUPLICATE_PROPERTY, - ErrorOrWarning.ERROR)); + LOGGER.warn("Duplicate channel list specified for agent: {}", agentName); + addError(CONFIG_CHANNELS, DUPLICATE_PROPERTY, ERROR); return false; } } // Check for sinkgroups - if (key.equals(BasicConfigurationConstants.CONFIG_SINKGROUPS)) { + if (CONFIG_SINKGROUPS.equals(key)) { if (sinkgroups == null) { sinkgroups = value; return true; } else { - logger - .warn("Duplicate sinkgroup list specfied for agent: " + agentName); - errorList.add(new FlumeConfigurationError(agentName, - BasicConfigurationConstants.CONFIG_SINKGROUPS, - FlumeConfigurationErrorType.DUPLICATE_PROPERTY, - ErrorOrWarning.ERROR)); + LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName); + addError(CONFIG_SINKGROUPS, DUPLICATE_PROPERTY, ERROR); return false; } } - ComponentNameAndConfigKey cnck = parseConfigKey(key, - BasicConfigurationConstants.CONFIG_SOURCES_PREFIX); - - if (cnck != null) { - // it is a source - String name = cnck.getComponentName(); - Context srcConf = sourceContextMap.get(name); - - if (srcConf == null) { - srcConf = new Context(); - sourceContextMap.put(name, srcConf); - } - - srcConf.put(cnck.getConfigKey(), value); + if (addAsSourceConfig(key, value) + || addAsChannelValue(key, value) + || addAsSinkConfig(key, value) + || addAsSinkGroupConfig(key, value) + || addAsConfigFilterConfig(key, value) + ) { return true; } - cnck = parseConfigKey(key, - BasicConfigurationConstants.CONFIG_CHANNELS_PREFIX); - - if (cnck != null) { - // it is a channel - String name = cnck.getComponentName(); - Context channelConf = channelContextMap.get(name); + LOGGER.warn("Invalid property specified: {}", key); + addError(key, INVALID_PROPERTY, ERROR); + return false; + } - if (channelConf == null) { - channelConf = new Context(); - channelContextMap.put(name, channelConf); - } + private boolean addAsConfigFilterConfig(String key, String value) { + return addComponentConfig( + key, value, CONFIG_CONFIGFILTERS_PREFIX, configFilterContextMap + ); + } - channelConf.put(cnck.getConfigKey(), value); - return true; - } + private boolean addAsSinkGroupConfig(String key, String value) { + return addComponentConfig( + key, value, CONFIG_SINKGROUPS_PREFIX, sinkGroupContextMap + ); + } - cnck = parseConfigKey(key, - BasicConfigurationConstants.CONFIG_SINKS_PREFIX); + private boolean addAsSinkConfig(String key, String value) { + return addComponentConfig( + key, value, CONFIG_SINKS_PREFIX, sinkContextMap + ); + } - if (cnck != null) { - // it is a sink - String name = cnck.getComponentName().trim(); - logger.info("Processing:" + name); - Context sinkConf = sinkContextMap.get(name); + private boolean addAsChannelValue(String key, String value) { + return addComponentConfig( + key, value, CONFIG_CHANNELS_PREFIX, channelContextMap + ); + } - if (sinkConf == null) { - logger.debug("Created context for " + name + ": " - + cnck.getConfigKey()); - sinkConf = new Context(); - sinkContextMap.put(name, sinkConf); - } + private boolean addAsSourceConfig(String key, String value) { + return addComponentConfig( + key, value, CONFIG_SOURCES_PREFIX, sourceContextMap + ); + } - sinkConf.put(cnck.getConfigKey(), value); - return true; - } + private boolean addComponentConfig( + String key, String value, String configPrefix, Map<String, Context> contextMap - cnck = parseConfigKey(key, - BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX); + ) { + ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix); + if (parsed != null) { + String name = parsed.getComponentName().trim(); + LOGGER.info("Processing:{}", name); + Context context = contextMap.get(name); - if (cnck != null) { - String name = cnck.getComponentName(); - Context groupConf = sinkGroupContextMap.get(name); - if (groupConf == null) { - groupConf = new Context(); - sinkGroupContextMap.put(name, groupConf); + if (context == null) { + LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey()); + context = new Context(); + contextMap.put(name, context); } - groupConf.put(cnck.getConfigKey(), value); - + context.put(parsed.getConfigKey(), value); return true; } - logger.warn("Invalid property specified: " + key); - errorList.add(new FlumeConfigurationError(agentName, key, - FlumeConfigurationErrorType.INVALID_PROPERTY, ErrorOrWarning.ERROR)); return false; } @@ -1067,7 +1234,7 @@ public class FlumeConfiguration { String configKey = key.substring(prefix.length() + name.length() + 1); // name and config key must be non-empty - if (name.length() == 0 || configKey.length() == 0) { + if (name.isEmpty() || configKey.isEmpty()) { return null; }
http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java index b3b477f..f7860c3 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelConfiguration.java @@ -16,10 +16,8 @@ */ package org.apache.flume.conf.channel; -import org.apache.flume.Context; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.conf.ConfigurationException; -import org.apache.flume.conf.source.SourceConfiguration; public class ChannelConfiguration extends ComponentConfiguration { @@ -27,11 +25,6 @@ public class ChannelConfiguration extends ComponentConfiguration { super(componentName); } - @Override - public void configure(Context context) throws ConfigurationException { - super.configure(context); - } - public enum ChannelConfigurationType { OTHER(null), MEMORY("org.apache.flume.conf.channel.MemoryChannelConfiguration"), @@ -65,7 +58,7 @@ public class ChannelConfiguration extends ComponentConfiguration { @SuppressWarnings("unchecked") public ChannelConfiguration getConfiguration(String name) throws ConfigurationException { - if (this.equals(ChannelConfigurationType.OTHER)) { + if (this == OTHER) { return new ChannelConfiguration(name); } Class<? extends ChannelConfiguration> clazz; http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java index e68e129..7e23284 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorConfiguration.java @@ -18,11 +18,9 @@ package org.apache.flume.conf.channel; import java.util.Set; -import org.apache.flume.Context; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType; -import org.apache.flume.conf.source.SourceConfiguration; public class ChannelSelectorConfiguration extends ComponentConfiguration { @@ -44,10 +42,6 @@ public class ChannelSelectorConfiguration extends this.channelNames = channelNames; } - public void configure(Context context) throws ConfigurationException { - super.configure(context); - } - public enum ChannelSelectorConfigurationType { OTHER(null), REPLICATING(null), @@ -69,7 +63,7 @@ public class ChannelSelectorConfiguration extends public ChannelSelectorConfiguration getConfiguration( String name) throws ConfigurationException { - if (this.equals(ChannelConfigurationType.OTHER)) { + if (this == OTHER) { return new ChannelSelectorConfiguration(name); } Class<? extends ChannelSelectorConfiguration> clazz; http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java index 38b08ad..d1a0dc2 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java @@ -16,10 +16,12 @@ */ package org.apache.flume.conf.channel; +import org.apache.flume.conf.ComponentWithClassName; + /** * Enumeration of built in channel selector types available in the system. */ -public enum ChannelSelectorType { +public enum ChannelSelectorType implements ComponentWithClassName { /** * Place holder for custom channel selectors not part of this enumeration. @@ -42,7 +44,13 @@ public enum ChannelSelectorType { this.channelSelectorClassName = channelSelectorClassName; } + @Deprecated public String getChannelSelectorClassName() { return channelSelectorClassName; } + + @Override + public String getClassName() { + return channelSelectorClassName; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java index 2e3cb34..6d610a6 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelType.java @@ -18,10 +18,12 @@ */ package org.apache.flume.conf.channel; +import org.apache.flume.conf.ComponentWithClassName; + /** * Enumeration of built in channel types available in the system. */ -public enum ChannelType { +public enum ChannelType implements ComponentWithClassName { /** * Place holder for custom channels not part of this enumeration. @@ -59,7 +61,13 @@ public enum ChannelType { this.channelClassName = channelClassName; } + @Deprecated public String getChannelClassName() { return channelClassName; } + + @Override + public String getClassName() { + return channelClassName; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java new file mode 100644 index 0000000..bdd718f --- /dev/null +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterConfiguration.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.configfilter; + +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.ConfigurationException; + +public class ConfigFilterConfiguration extends ComponentConfiguration { + + public enum ConfigFilterConfigurationType { + OTHER(null), + ENV("org.apache.flume.conf.configfilter.EnvironmentVariableConfigFilterConfiguration"), + HADOOP("org.apache.flume.conf.configfilter.HadoopCredentialStoreConfigFilterConfiguration"), + EXTERNAL("org.apache.flume.conf.configfilter.ExternalProcessConfigFilterConfiguration"); + + private final String configurationName; + + ConfigFilterConfigurationType(String type) { + configurationName = type; + } + + public String getConfigFilterConfigurationType() { + return configurationName; + } + + @SuppressWarnings("unchecked") + public ConfigFilterConfiguration getConfiguration(String name) + throws ConfigurationException { + if (this == OTHER) { + return new ConfigFilterConfiguration(name); + } + Class<? extends ConfigFilterConfiguration> clazz; + ConfigFilterConfiguration instance = null; + try { + if (configurationName != null) { + clazz = + (Class<? extends ConfigFilterConfiguration>) Class + .forName(configurationName); + instance = clazz.getConstructor(String.class).newInstance(name); + } else { + return new ConfigFilterConfiguration(name); + } + } catch (ClassNotFoundException e) { + // Could not find the configuration stub, do basic validation + instance = new ConfigFilterConfiguration(name); + // Let the caller know that this was created because of this exception. + instance.setNotFoundConfigClass(); + } catch (Exception e) { + throw new ConfigurationException("Couldn't create configuration", e); + } + return instance; + } + } + + protected ConfigFilterConfiguration(String componentName) { + super(componentName); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java new file mode 100644 index 0000000..ed8e3c6 --- /dev/null +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/configfilter/ConfigFilterType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.configfilter; + +import org.apache.flume.conf.ComponentWithClassName; + +public enum ConfigFilterType implements ComponentWithClassName { + OTHER(null), + ENV("org.apache.flume.configfilter.EnvironmentVariableConfigFilter"), + HADOOP("org.apache.flume.configfilter.HadoopCredentialStoreConfigFilter"), + EXTERNAL("org.apache.flume.configfilter.ExternalProcessConfigFilter"); + + private final String className; + + ConfigFilterType(String className) { + this.className = className; + } + + @Override + public String getClassName() { + return className; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java index d6f4cbf..4538792 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java @@ -55,12 +55,6 @@ public class SinkConfiguration extends ComponentConfiguration { @Override public String toString(int indentCount) { - StringBuilder indentSb = new StringBuilder(""); - - for (int i = 0; i < indentCount; i++) { - indentSb.append(FlumeConfiguration.INDENTSTEP); - } - String basicStr = super.toString(indentCount); StringBuilder sb = new StringBuilder(); sb.append(basicStr).append(FlumeConfiguration.INDENTSTEP).append( @@ -176,7 +170,7 @@ public class SinkConfiguration extends ComponentConfiguration { @SuppressWarnings("unchecked") public SinkConfiguration getConfiguration(String name) throws ConfigurationException { - if (this.equals(SinkConfigurationType.OTHER)) { + if (this == OTHER) { return new SinkConfiguration(name); } Class<? extends SinkConfiguration> clazz; http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java index c3fc2bb..a81b2cb 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java @@ -91,7 +91,7 @@ public class SinkGroupConfiguration extends ComponentConfiguration { SinkProcessorType[] values = SinkProcessorType.values(); for (SinkProcessorType value : values) { if (value.toString().equalsIgnoreCase(type)) return value; - String sinkProcessClassName = value.getSinkProcessorClassName(); + String sinkProcessClassName = value.getClassName(); if (sinkProcessClassName != null && sinkProcessClassName.equalsIgnoreCase(type)) { return value; } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java index d0c9802..94383cc 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorConfiguration.java @@ -21,8 +21,6 @@ import java.util.Set; import org.apache.flume.Context; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.conf.ConfigurationException; -import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType; -import org.apache.flume.conf.source.SourceConfiguration; public class SinkProcessorConfiguration extends ComponentConfiguration { protected Set<String> sinks; http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java index 19bd51a..d1b66c5 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java @@ -18,7 +18,9 @@ */ package org.apache.flume.conf.sink; -public enum SinkProcessorType { +import org.apache.flume.conf.ComponentWithClassName; + +public enum SinkProcessorType implements ComponentWithClassName { /** * Place holder for custom sinks not part of this enumeration. */ @@ -52,7 +54,13 @@ public enum SinkProcessorType { this.processorClassName = processorClassName; } + @Deprecated public String getSinkProcessorClassName() { return processorClassName; } + + @Override + public String getClassName() { + return processorClassName; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java index bfae570..c3f8cac 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java @@ -18,10 +18,12 @@ */ package org.apache.flume.conf.sink; +import org.apache.flume.conf.ComponentWithClassName; + /** * Enumeration of built in sink types available in the system. */ -public enum SinkType { +public enum SinkType implements ComponentWithClassName { /** * Place holder for custom sinks not part of this enumeration. @@ -120,8 +122,14 @@ public enum SinkType { this.sinkClassName = sinkClassName; } + @Deprecated public String getSinkClassName() { return sinkClassName; } + + @Override + public String getClassName() { + return sinkClassName; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java index 237efa3..4edda3f 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java @@ -128,7 +128,7 @@ public class SourceConfiguration extends ComponentConfiguration { ChannelSelectorType[] values = ChannelSelectorType.values(); for (ChannelSelectorType value : values) { if (value.toString().equalsIgnoreCase(type)) return value; - String clName = value.getChannelSelectorClassName(); + String clName = value.getClassName(); if (clName != null && clName.equalsIgnoreCase(type)) return value; } return null; @@ -237,7 +237,7 @@ public class SourceConfiguration extends ComponentConfiguration { @SuppressWarnings("unchecked") public SourceConfiguration getConfiguration(String name) throws ConfigurationException { - if (this.equals(SourceConfigurationType.OTHER)) { + if (this == OTHER) { return new SourceConfiguration(name); } Class<? extends SourceConfiguration> clazz = null; http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java index 3e7e7be..b556b6c 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java @@ -18,10 +18,12 @@ */ package org.apache.flume.conf.source; +import org.apache.flume.conf.ComponentWithClassName; + /** * Enumeration of built in source types available in the system. */ -public enum SourceType { +public enum SourceType implements ComponentWithClassName { /** * Place holder for custom sources not part of this enumeration. @@ -126,7 +128,13 @@ public enum SourceType { this.sourceClassName = sourceClassName; } + @Deprecated public String getSourceClassName() { return sourceClassName; } + + @Override + public String getClassName() { + return sourceClassName; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java new file mode 100644 index 0000000..ba88c35 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestAgentConfiguration.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf; + +import org.apache.flume.Context; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR; +import static org.junit.Assert.*; + +public class TestAgentConfiguration { + + public static final Map<String, String> PROPERTIES = new HashMap<>(); + public static final String AGENT = "agent"; + public static final String SINKS = AGENT + ".sinks"; + public static final String SOURCES = AGENT + ".sources"; + public static final String CHANNELS = AGENT + ".channels"; + + @BeforeClass + public static void setupClass() { + PROPERTIES.put(SOURCES, "s1 s2"); + PROPERTIES.put(SOURCES + ".s1.type", "s1_type"); + PROPERTIES.put(SOURCES + ".s1.channels", "c1"); + PROPERTIES.put(SOURCES + ".s2.type", "jms"); + PROPERTIES.put(SOURCES + ".s2.channels", "c2"); + PROPERTIES.put(CHANNELS, "c1 c2"); + PROPERTIES.put(CHANNELS + ".c1.type", "c1_type"); + PROPERTIES.put(CHANNELS + ".c2.type", "memory"); + PROPERTIES.put(SINKS, "k1 k2"); + PROPERTIES.put(SINKS + ".k1.type", "k1_type"); + PROPERTIES.put(SINKS + ".k2.type", "null"); + PROPERTIES.put(SINKS + ".k1.channel", "c1"); + PROPERTIES.put(SINKS + ".k2.channel", "c2"); + PROPERTIES.put(AGENT + ".sinkgroups", "g1"); + PROPERTIES.put(AGENT + ".sinkgroups.g1.sinks", "k1 k2"); + PROPERTIES.put(AGENT + ".configfilters", "f1 f2"); + PROPERTIES.put(AGENT + ".configfilters.f1.type", "f1_type"); + PROPERTIES.put(AGENT + ".configfilters.f2.type", "env"); + } + + @Test + public void testConfigHasNoErrors() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + assertTrue(configuration.getConfigurationErrors().isEmpty()); + } + + @Test + public void testSourcesAdded() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Set<String> sourceSet = configuration.getConfigurationFor(AGENT).getSourceSet(); + assertEquals(new HashSet<>(Arrays.asList("s1", "s2")), sourceSet); + } + + @Test + public void testFiltersAdded() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Set<String> configFilterSet = configuration.getConfigurationFor(AGENT).getConfigFilterSet(); + assertEquals(new HashSet<>(Arrays.asList("f1", "f2")), configFilterSet); + } + + @Test + public void testSinksAdded() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Set<String> sinkSet = configuration.getConfigurationFor(AGENT).getSinkSet(); + assertEquals(new HashSet<>(Arrays.asList("k1", "k2")), sinkSet); + } + + @Test + public void testChannelsAdded() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Set<String> channelSet = configuration.getConfigurationFor(AGENT).getChannelSet(); + assertEquals(new HashSet<>(Arrays.asList("c1", "c2")), channelSet); + } + + @Test + public void testSinkGroupsAdded() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Set<String> sinkSet = configuration.getConfigurationFor(AGENT).getSinkgroupSet(); + assertEquals(new HashSet<>(Arrays.asList("g1")), sinkSet); + } + + @Test + public void testConfigFiltersMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, Context> contextMap = + configuration.getConfigurationFor(AGENT).getConfigFilterContext(); + assertEquals("f1_type", contextMap.get("f1").getString("type")); + } + + @Test + public void testSourcesMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, Context> contextMap = configuration.getConfigurationFor(AGENT).getSourceContext(); + assertEquals("s1_type", contextMap.get("s1").getString("type")); + } + + @Test + public void testSinksMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, Context> contextMap = configuration.getConfigurationFor(AGENT).getSinkContext(); + assertEquals("k1_type", contextMap.get("k1").getString("type")); + } + + @Test + public void testChannelsMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, Context> contextMap = configuration.getConfigurationFor(AGENT).getChannelContext(); + assertEquals("c1_type", contextMap.get("c1").getString("type")); + } + + @Test + public void testChannelsConfigMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, ComponentConfiguration> configMap = + configuration.getConfigurationFor(AGENT).getChannelConfigMap(); + assertEquals("memory", configMap.get("c2").getType()); + } + + @Test + public void testConfigFilterConfigMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, ComponentConfiguration> configMap = + configuration.getConfigurationFor(AGENT).getConfigFilterConfigMap(); + assertEquals("env", configMap.get("f2").getType()); + } + + @Test + public void testSourceConfigMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, ComponentConfiguration> configMap = + configuration.getConfigurationFor(AGENT).getSourceConfigMap(); + assertEquals("jms", configMap.get("s2").getType()); + } + + @Test + public void testSinkConfigMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, ComponentConfiguration> configMap = + configuration.getConfigurationFor(AGENT).getSinkConfigMap(); + assertEquals("null", configMap.get("k2").getType()); + } + + @Test + public void testSinkgroupConfigMappedCorrectly() { + FlumeConfiguration configuration = new FlumeConfiguration(PROPERTIES); + Map<String, ComponentConfiguration> configMap = + configuration.getConfigurationFor(AGENT).getSinkGroupConfigMap(); + assertEquals("Sinkgroup", configMap.get("g1").getType()); + } + + @Test + public void testNoChannelIsInvalid() { + Map<String, String> properties = new HashMap<>(PROPERTIES); + properties.put(CHANNELS, ""); + FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties); + + assertFalse(flumeConfiguration.getConfigurationErrors().isEmpty()); + assertNull(flumeConfiguration.getConfigurationFor(AGENT)); + } + + @Test + public void testNoSourcesIsValid() { + Map<String, String> properties = new HashMap<>(PROPERTIES); + properties.remove(SOURCES); + properties.remove(SOURCES + ".s1.type"); + properties.remove(SOURCES + ".s1.channels"); + properties.remove(SOURCES + ".s2.type"); + properties.remove(SOURCES + ".s2.channels"); + FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties); + + assertConfigHasNoError(flumeConfiguration); + assertNotNull(flumeConfiguration.getConfigurationFor(AGENT)); + } + + @Test + public void testNoSinksIsValid() { + Map<String, String> properties = new HashMap<>(PROPERTIES); + properties.remove(SINKS); + properties.remove(SINKS + ".k1.type", "k1_type"); + properties.remove(SINKS + ".k2.type", "null"); + properties.remove(SINKS + ".k1.channel", "c1"); + properties.remove(SINKS + ".k2.channel", "c2"); + properties.remove(AGENT + ".sinkgroups", "g1"); + properties.remove(AGENT + ".sinkgroups.g1.sinks", "k1 k2"); + + FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties); + + assertConfigHasNoError(flumeConfiguration); + assertNotNull(flumeConfiguration.getConfigurationFor(AGENT)); + } + + private void assertConfigHasNoError(FlumeConfiguration configuration) { + List<FlumeConfigurationError> configurationErrors = configuration.getConfigurationErrors(); + long count = 0L; + for (FlumeConfigurationError e : configurationErrors) { + if (e.getErrorOrWarning() == ERROR) { + count++; + } + } + assertTrue(count == 0); + } + + @Test + public void testNoSourcesAndNoSinksIsInvalid() { + Map<String, String> properties = new HashMap<>(PROPERTIES); + properties.put(SOURCES, ""); + properties.put(SINKS, ""); + FlumeConfiguration flumeConfiguration = new FlumeConfiguration(properties); + + assertFalse(flumeConfiguration.getConfigurationErrors().isEmpty()); + assertNull(flumeConfiguration.getConfigurationFor(AGENT)); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java index e206055..a881d00 100644 --- a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java @@ -16,20 +16,27 @@ */ package org.apache.flume.conf; +import java.util.HashMap; +import java.util.List; import java.util.Properties; import junit.framework.Assert; import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning; import org.junit.Test; +import static org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning.ERROR; +import static org.apache.flume.conf.FlumeConfigurationErrorType.*; +import static org.junit.Assert.assertEquals; + public class TestFlumeConfiguration { /** * Test fails without FLUME-1743 */ @Test - public void testFLUME1743() throws Exception { + public void testFLUME1743() { Properties properties = new Properties(); properties.put("agent1.channels", "ch0"); properties.put("agent1.channels.ch0.type", "memory"); @@ -45,6 +52,9 @@ public class TestFlumeConfiguration { properties.put("agent1.sinks.sink0.type", "null"); properties.put("agent1.sinks.sink0.channel", "ch0"); + properties.put("agent1.configfilters", "f1"); + properties.put("agent1.configfilters.f1.type", "env"); + FlumeConfiguration conf = new FlumeConfiguration(properties); AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1, @@ -56,5 +66,102 @@ public class TestFlumeConfiguration { Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0")); Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0")); Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0")); + Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1")); + } + + @Test + public void testFlumeConfigAdsErrorOnNullName() { + HashMap<String, String> properties = new HashMap<>(); + properties.put(null, "something"); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "", "", ERROR); + } + + @Test + public void testFlumeConfigAddsErrorOnNullValue() { + HashMap<String, String> properties = new HashMap<>(); + properties.put("something", null); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "", "", ERROR); + } + + + @Test + public void testFlumeConfigAddsErrorOnEmptyValue() { + HashMap<String, String> properties = new HashMap<>(); + properties.put("something", ""); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), PROPERTY_VALUE_NULL, "something", "", ERROR); + } + + + @Test + public void testFlumeConfigAddsErrorOnNoAgentNameValue() { + HashMap<String, String> properties = new HashMap<>(); + properties.put("something", "value"); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), AGENT_NAME_MISSING, "something", "", ERROR); + } + + @Test + public void testFlumeConfigAddsErrorOnEmptyAgentNameValue() { + Properties properties = new Properties(); + properties.put(".something", "value"); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), AGENT_NAME_MISSING, ".something", "", ERROR); + } + + @Test + public void testFlumeConfigAddsErrorOnEmptyPropertyName() { + HashMap<String, String> properties = new HashMap<>(); + properties.put("agent.", "something"); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(1, configurationErrors.size()); + assertError(configurationErrors.get(0), PROPERTY_NAME_NULL, "agent.", "", ERROR); } + + @Test + public void testFlumeConfigAddsErrorOnInvalidConfig() { + HashMap<String, String> properties = new HashMap<>(); + properties.put("agent.channels", "c1"); + properties.put("agent.channel.c1", "cc1"); + FlumeConfiguration config = new FlumeConfiguration(properties); + + List<FlumeConfigurationError> configurationErrors = config.getConfigurationErrors(); + assertEquals(4, configurationErrors.size()); + assertError(configurationErrors.get(0), INVALID_PROPERTY, "agent", "channel.c1", ERROR); + assertError(configurationErrors.get(1), CONFIG_ERROR, "agent", "c1", ERROR); + assertError(configurationErrors.get(2), PROPERTY_VALUE_NULL, "agent", "channels", ERROR); + assertError(configurationErrors.get(3), AGENT_CONFIGURATION_INVALID, "agent", "", ERROR); + } + + private void assertError( + FlumeConfigurationError error, + FlumeConfigurationErrorType agentNameMissing, + String componentName, String key, + ErrorOrWarning eow + ) { + assertEquals(agentNameMissing, error.getErrorType()); + assertEquals("ComponentName mismatch.", componentName, error.getComponentName()); + assertEquals("Key mismatch.", key, error.getKey()); + assertEquals(eow, error.getErrorOrWarning()); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java new file mode 100644 index 0000000..ada2d8f --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf; + +import org.apache.flume.Context; +import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.junit.Test; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class TestFlumeConfigurationConfigFilter { + + @Test + public void testFlumeConfigFilterWorks() { + Properties properties = new Properties(); + properties.put("agent1.channels", "ch0"); + properties.put("agent1.channels.ch0.type", "file"); + properties.put("agent1.channels.ch0.param1", "${f1['param']}"); + properties.put("agent1.channels.ch0.param2", "${f1['param\"]}"); + properties.put("agent1.channels.ch0.param3", "${f1['null']}"); + properties.put("agent1.channels.ch0.param4", "${f1['throw']}"); + + properties.put("agent1.sources", "src0"); + properties.put("agent1.sources.src0.type", "multiport_syslogtcp"); + properties.put("agent1.sources.src0.channels", "ch0"); + properties.put("agent1.sources.src0.host", "${f1[host]}"); + properties.put("agent1.sources.src0.ports", "10001 10002 10003"); + properties.put("agent1.sources.src0.portHeader", "${f2[\"port\"]}-${f1['header']}"); + + properties.put("agent1.sinks", "sink0"); + properties.put("agent1.sinks.sink0.type", "thrift"); + properties.put("agent1.sinks.sink0.param", "${f2['param']}"); + properties.put("agent1.sinks.sink0.channel", "ch0"); + + properties.put("agent1.configfilters", "f1 f2"); + properties.put("agent1.configfilters.f1.type", + "org.apache.flume.conf.configfilter.MockConfigFilter"); + properties.put("agent1.configfilters.f2.type", + "org.apache.flume.conf.configfilter.MockConfigFilter"); + + FlumeConfiguration conf = new FlumeConfiguration(properties); + AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); + Context src0 = agentConfiguration.getSourceContext().get("src0"); + assertEquals("filtered_host", src0.getString("host")); + assertEquals("filtered_port-filtered_header", src0.getString("portHeader")); + + Context sink0 = agentConfiguration.getSinkContext().get("sink0"); + assertEquals("filtered_param", sink0.getString("param")); + + Context ch0 = agentConfiguration.getChannelContext().get("ch0"); + assertEquals("filtered_param", ch0.getString("param1")); + assertEquals("${f1['param\"]}", ch0.getString("param2")); + assertEquals("${f1['null']}", ch0.getString("param3")); + assertEquals("${f1['throw']}", ch0.getString("param4")); + } + +}
