Updated Branches: refs/heads/trunk 5f1243251 -> eff07d212
FLUME-1745. FlumeConfiguration eats Exceptions. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/eff07d21 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/eff07d21 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/eff07d21 Branch: refs/heads/trunk Commit: eff07d2125b589d8a6915deb8d52ff53e9611337 Parents: 5f12432 Author: Hari Shreedharan <[email protected]> Authored: Mon Dec 10 14:15:26 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Mon Dec 10 14:15:26 2012 -0800 ---------------------------------------------------------------------- .../flume/conf/ComponentConfigurationFactory.java | 18 +++-- .../org/apache/flume/conf/FlumeConfiguration.java | 14 ++-- .../apache/flume/conf/TestFlumeConfiguration.java | 61 +++++++++++++++ 3 files changed, 79 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/eff07d21/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java index 6c151dc..c867a03 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java @@ -17,12 +17,12 @@ package org.apache.flume.conf; import org.apache.flume.conf.ComponentConfiguration.ComponentType; -import org.apache.flume.conf.source.SourceConfiguration.SourceConfigurationType; -import org.apache.flume.conf.sink.SinkGroupConfiguration; +import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType; +import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType; import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType; +import org.apache.flume.conf.sink.SinkGroupConfiguration; import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfigurationType; -import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType; -import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType; +import org.apache.flume.conf.source.SourceConfiguration.SourceConfigurationType; public class ComponentConfigurationFactory { @SuppressWarnings("unchecked") @@ -38,7 +38,7 @@ public class ComponentConfigurationFactory { try { confType = (Class<? extends ComponentConfiguration>) Class.forName(type); return confType.getConstructor(String.class).newInstance(type); - } catch (Exception e) { + } catch (Exception ignored) { try { type = type.toUpperCase(); switch(component){ @@ -64,8 +64,12 @@ public class ComponentConfigurationFactory { "Cannot create configuration. Unknown Type specified: " + type); } - } catch (Exception e2) { - throw new ConfigurationException("Could not create configuration!", e); + } catch (ConfigurationException e) { + throw e; + } catch (Exception e) { + throw new ConfigurationException("Could not create configuration! " + + " Due to " + e.getClass().getSimpleName() + ": " + e.getMessage(), + e); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/eff07d21/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java index 5d56599..526ce59 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java @@ -484,7 +484,7 @@ public class FlumeConfiguration { if (conf != null) errorList.addAll(conf.getErrors()); iter.remove(); logger.warn("Could not configure channel " + channelName - + ". Skipping it"); + + " due to: " + e.getMessage(), e); } } else { @@ -586,7 +586,8 @@ public class FlumeConfiguration { } catch (ConfigurationException e) { if (srcConf != null) errorList.addAll(srcConf.getErrors()); iter.remove(); - logger.warn("Removed " + sourceName + " due to " + e.getMessage()); + logger.warn("Could not configure source " + sourceName + + " due to: " + e.getMessage(), e); } } else { iter.remove(); @@ -693,8 +694,8 @@ public class FlumeConfiguration { } catch (ConfigurationException e) { iter.remove(); if (sinkConf != null) errorList.addAll(sinkConf.getErrors()); - logger.warn("Configuration for : " + sinkName - + " has errors, and will be removed: ", e); + logger.warn("Could not configure sink " + sinkName + + " due to: " + e.getMessage(), e); } } // Filter out any sinks that have invalid channel @@ -760,9 +761,8 @@ public class FlumeConfiguration { .add(new FlumeConfigurationError(agentName, sinkgroupName, FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR)); - logger.warn("Configuration error for: " + sinkgroupName - + ".Removed."); - + logger.warn("Could not configure sink group " + sinkgroupName + + " due to: " + e.getMessage(), e); } } else { iter.remove(); http://git-wip-us.apache.org/repos/asf/flume/blob/eff07d21/flume-ng-configuration/src/test/java/com/apache/flume/conf/TestFlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/com/apache/flume/conf/TestFlumeConfiguration.java b/flume-ng-configuration/src/test/java/com/apache/flume/conf/TestFlumeConfiguration.java new file mode 100644 index 0000000..2ae56fd --- /dev/null +++ b/flume-ng-configuration/src/test/java/com/apache/flume/conf/TestFlumeConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.apache.flume.conf; + +import java.util.Properties; + +import junit.framework.Assert; + +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.junit.Test; + +public class TestFlumeConfiguration { + + /** + * Test fails without FLUME-1743 + */ + @Test + public void testFLUME1743() throws Exception { + Properties properties = new Properties(); + properties.put("agent1.channels", "ch0"); + properties.put("agent1.channels.ch0.type", "memory"); + + properties.put("agent1.sources", "src0"); + properties.put("agent1.sources.src0.type", "multiport_syslogtcp"); + properties.put("agent1.sources.src0.channels", "ch0"); + properties.put("agent1.sources.src0.host", "localhost"); + properties.put("agent1.sources.src0.ports", "10001 10002 10003"); + properties.put("agent1.sources.src0.portHeader", "port"); + + properties.put("agent1.sinks", "sink0"); + properties.put("agent1.sinks.sink0.type", "null"); + properties.put("agent1.sinks.sink0.channel", "ch0"); + + FlumeConfiguration conf = new FlumeConfiguration(properties); + AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); + Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1, + agentConfiguration.getSourceSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1, + agentConfiguration.getChannelSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1, + agentConfiguration.getSinkSet().size()); + Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0")); + Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0")); + Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0")); + } +}
