Repository: flume Updated Branches: refs/heads/flume-1.6 c458a838a -> e296f0d72
FLUME-1491. Support fetching configuration from Zookeeper. (Ashish Paliwal via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e296f0d7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e296f0d7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e296f0d7 Branch: refs/heads/flume-1.6 Commit: e296f0d7243f766442f8c57cb99bb12537def254 Parents: c458a83 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 12 13:16:13 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 12 13:16:13 2014 -0700 ---------------------------------------------------------------------- bin/flume-ng | 33 +++-- flume-ng-node/pom.xml | 15 ++ .../node/AbstractConfigurationProvider.java | 18 ++- .../AbstractZooKeeperConfigurationProvider.java | 104 ++++++++++++++ .../java/org/apache/flume/node/Application.java | 131 ++++++++++++------ .../PollingZooKeeperConfigurationProvider.java | 135 ++++++++++++++++++ .../PropertiesFileConfigurationProvider.java | 15 -- .../StaticZooKeeperConfigurationProvider.java | 55 ++++++++ ...tAbstractZooKeeperConfigurationProvider.java | 136 +++++++++++++++++++ ...stPollingZooKeeperConfigurationProvider.java | 95 +++++++++++++ ...estStaticZooKeeperConfigurationProvider.java | 44 ++++++ pom.xml | 20 +++ 12 files changed, 722 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/bin/flume-ng ---------------------------------------------------------------------- diff --git a/bin/flume-ng b/bin/flume-ng index e09e26b..4b323a6 100755 --- a/bin/flume-ng +++ b/bin/flume-ng @@ -174,25 +174,28 @@ display_help() { Usage: $0 <command> [options]... commands: - help display this help text - agent run a Flume agent - avro-client run an avro Flume client - version show Flume version info + help display this help text + agent run a Flume agent + avro-client run an avro Flume client + version show Flume version info global options: - --conf,-c <conf> use configs in <conf> directory - --classpath,-C <cp> append to the classpath - --dryrun,-d do not actually start Flume, just print the command - --plugins-path <dirs> colon-separated list of plugins.d directories. See the - plugins.d section in the user guide for more details. - Default: \$FLUME_HOME/plugins.d - -Dproperty=value sets a Java system property value - -Xproperty=value sets a Java -X option + --conf,-c <conf> use configs in <conf> directory + --classpath,-C <cp> append to the classpath + --dryrun,-d do not actually start Flume, just print the command + --plugins-path <dirs> colon-separated list of plugins.d directories. See the + plugins.d section in the user guide for more details. + Default: \$FLUME_HOME/plugins.d + -Dproperty=value sets a Java system property value + -Xproperty=value sets a Java -X option agent options: - --conf-file,-f <file> specify a config file (required) - --name,-n <name> the name of this agent (required) - --help,-h display help text + --name,-n <name> the name of this agent (required) + --conf-file,-f <file> specify a config file (required if -z missing) + --zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing) + --zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs + --no-reload-conf do not reload config file if changed + --help,-h display help text avro-client options: --rpcProps,-P <file> RPC client properties file with server connection params http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml index dce2527..caf2c11 100644 --- a/flume-ng-node/pom.xml +++ b/flume-ng-node/pom.xml @@ -154,6 +154,21 @@ <artifactId>jackson-mapper-asl</artifactId> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index e63c601..40abba2 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -17,13 +17,8 @@ */ package org.apache.flume.node; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import org.apache.flume.Channel; import org.apache.flume.ChannelFactory; @@ -506,4 +501,15 @@ public abstract class AbstractConfigurationProvider implements components = Lists.newArrayList(); } } + + protected Map<String, String> toMap(Properties properties) { + Map<String, String> result = Maps.newHashMap(); + Enumeration<?> propertyNames = properties.propertyNames(); + while (propertyNames.hasMoreElements()) { + String name = (String) propertyNames.nextElement(); + String value = properties.getProperty(name); + result.put(name, value); + } + return result; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..f193f9f --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Charsets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.flume.conf.FlumeConfiguration; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * ZooKeeper based configuration implementation provider. + * + * The Agent configuration can be uploaded in ZooKeeper under a base name, which + * defaults to /flume + * + * Currently the agent configuration is stored under the agent name node in + * ZooKeeper + * + * <PRE> + * /flume + * /a1 [agent config file] + * /a2 [agent config file] + * /a3 [agent config file] + * </PRE> + * + * Configuration format is same as PropertiesFileConfigurationProvider + * + * Configuration properties + * + * agentName - Name of Agent for which configuration needs to be pulled + * + * zkConnString - Connection string to ZooKeeper Ensemble + * (host:port,host1:port1) + * + * basePath - Base Path where agent configuration needs to be stored. Defaults + * to /flume + */ +public abstract class AbstractZooKeeperConfigurationProvider extends + AbstractConfigurationProvider { + + static final String DEFAULT_ZK_BASE_PATH = "/flume"; + + protected final String basePath; + + protected final String zkConnString; + + protected AbstractZooKeeperConfigurationProvider(String agentName, + String zkConnString, String basePath) { + super(agentName); + Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString), + "Invalid Zookeeper Connection String %s", zkConnString); + this.zkConnString = zkConnString; + if (basePath == null || basePath.isEmpty()) { + this.basePath = DEFAULT_ZK_BASE_PATH; + } else { + this.basePath = basePath; + } + } + + protected CuratorFramework createClient() { + return CuratorFrameworkFactory.newClient(zkConnString, + new ExponentialBackoffRetry(1000, 1)); + } + + protected FlumeConfiguration configFromBytes(byte[] configData) + throws IOException { + Map<String, String> configMap; + if (configData == null || configData.length == 0) { + configMap = Collections.emptyMap(); + } else { + String fileContent = new String(configData, Charsets.UTF_8); + Properties properties = new Properties(); + properties.load(new StringReader(fileContent)); + configMap = toMap(properties); + } + return new FlumeConfiguration(configMap); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/Application.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 5250139..832285a 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -53,7 +53,7 @@ import com.google.common.collect.Lists; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; -public class Application { +public class Application { private static final Logger logger = LoggerFactory .getLogger(Application.class); @@ -69,6 +69,7 @@ public class Application { public Application() { this(new ArrayList<LifecycleAware>(0)); } + public Application(List<LifecycleAware> components) { this.components = components; supervisor = new LifecycleSupervisor(); @@ -81,7 +82,6 @@ public class Application { } } - @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); @@ -95,7 +95,6 @@ public class Application { } } - private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); @@ -192,7 +191,6 @@ public class Application { this.loadMonitoring(); } - @SuppressWarnings("unchecked") private void loadMonitoring() { Properties systemProps = System.getProperties(); @@ -231,18 +229,32 @@ public class Application { try { + boolean isZkConfigured = false; + Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); - option = new Option("f", "conf-file", true, "specify a conf file"); - option.setRequired(true); + option = new Option("f", "conf-file", true, + "specify a config file (required if -z missing)"); + option.setRequired(false); options.addOption(option); - option = new Option(null, "no-reload-conf", false, "do not reload " + - "conf file if changed"); + option = new Option(null, "no-reload-conf", false, + "do not reload config file if changed"); + options.addOption(option); + + // Options for Zookeeper + option = new Option("z", "zkConnString", true, + "specify the ZooKeeper connection to use (required if -f missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option("p", "zkBasePath", true, + "specify the base path in ZooKeeper for agent configs"); + option.setRequired(false); options.addOption(option); option = new Option("h", "help", false, "display help text"); @@ -251,47 +263,80 @@ public class Application { CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); - File configurationFile = new File(commandLine.getOptionValue('f')); - String agentName = commandLine.getOptionValue('n'); - boolean reload = !commandLine.hasOption("no-reload-conf"); - if (commandLine.hasOption('h')) { new HelpFormatter().printHelp("flume-ng agent", options, true); return; } - /* - * The following is to ensure that by default the agent - * will fail on startup if the file does not exist. - */ - if (!configurationFile.exists()) { - // If command line invocation, then need to fail fast - if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { - String path = configurationFile.getPath(); - try { - path = configurationFile.getCanonicalPath(); - } catch (IOException ex) { - logger.error("Failed to read canonical path for file: " + path, ex); - } - throw new ParseException( - "The specified configuration file does not exist: " + path); - } + + String agentName = commandLine.getOptionValue('n'); + boolean reload = !commandLine.hasOption("no-reload-conf"); + + if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { + isZkConfigured = true; } - List<LifecycleAware> components = Lists.newArrayList(); - Application application; - if(reload) { - EventBus eventBus = new EventBus(agentName + "-event-bus"); - PollingPropertiesFileConfigurationProvider configurationProvider = - new PollingPropertiesFileConfigurationProvider(agentName, - configurationFile, eventBus, 30); - components.add(configurationProvider); - application = new Application(components); - eventBus.register(application); + Application application = null; + if (isZkConfigured) { + // get options + String zkConnectionStr = commandLine.getOptionValue('z'); + String baseZkPath = commandLine.getOptionValue('p'); + + if (reload) { + EventBus eventBus = new EventBus(agentName + "-event-bus"); + List<LifecycleAware> components = Lists.newArrayList(); + PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = + new PollingZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath, eventBus); + components.add(zookeeperConfigurationProvider); + application = new Application(components); + eventBus.register(application); + } else { + StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = + new StaticZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath); + application = new Application(); + application.handleConfigurationEvent(zookeeperConfigurationProvider + .getConfiguration()); + } } else { - PropertiesFileConfigurationProvider configurationProvider = - new PropertiesFileConfigurationProvider(agentName, - configurationFile); - application = new Application(); - application.handleConfigurationEvent(configurationProvider.getConfiguration()); + File configurationFile = new File(commandLine.getOptionValue('f')); + + /* + * The following is to ensure that by default the agent will fail on + * startup if the file does not exist. + */ + if (!configurationFile.exists()) { + // If command line invocation, then need to fail fast + if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == + null) { + String path = configurationFile.getPath(); + try { + path = configurationFile.getCanonicalPath(); + } catch (IOException ex) { + logger.error("Failed to read canonical path for file: " + path, + ex); + } + throw new ParseException( + "The specified configuration file does not exist: " + path); + } + } + List<LifecycleAware> components = Lists.newArrayList(); + + if (reload) { + EventBus eventBus = new EventBus(agentName + "-event-bus"); + PollingPropertiesFileConfigurationProvider configurationProvider = + new PollingPropertiesFileConfigurationProvider( + agentName, configurationFile, eventBus, 30); + components.add(configurationProvider); + application = new Application(components); + eventBus.register(application); + } else { + PropertiesFileConfigurationProvider configurationProvider = + new PropertiesFileConfigurationProvider( + agentName, configurationFile); + application = new Application(); + application.handleConfigurationEvent(configurationProvider + .getConfiguration()); + } } application.start(); http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..b950b3d --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flume.FlumeException; +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.lifecycle.LifecycleAware; +import org.apache.flume.lifecycle.LifecycleState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.EventBus; + +public class PollingZooKeeperConfigurationProvider extends + AbstractZooKeeperConfigurationProvider implements LifecycleAware { + + private static final Logger LOGGER = LoggerFactory + .getLogger(PollingZooKeeperConfigurationProvider.class); + + private final EventBus eventBus; + + private final CuratorFramework client; + + private NodeCache agentNodeCache; + + private FlumeConfiguration flumeConfiguration; + + private LifecycleState lifecycleState; + + public PollingZooKeeperConfigurationProvider(String agentName, + String zkConnString, String basePath, EventBus eventBus) { + super(agentName, zkConnString, basePath); + this.eventBus = eventBus; + client = createClient(); + agentNodeCache = null; + flumeConfiguration = null; + lifecycleState = LifecycleState.IDLE; + } + + @Override + protected FlumeConfiguration getFlumeConfiguration() { + return flumeConfiguration; + } + + @Override + public void start() { + LOGGER.debug("Starting..."); + try { + client.start(); + try { + agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName()); + agentNodeCache.start(); + agentNodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + refreshConfiguration(); + } + }); + } catch (Exception e) { + client.close(); + throw e; + } + } catch (Exception e) { + lifecycleState = LifecycleState.ERROR; + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new FlumeException(e); + } + } + lifecycleState = LifecycleState.START; + } + + private void refreshConfiguration() throws IOException { + LOGGER.info("Refreshing configuration from ZooKeeper"); + byte[] data = null; + ChildData childData = agentNodeCache.getCurrentData(); + if (childData != null) { + data = childData.getData(); + } + flumeConfiguration = configFromBytes(data); + eventBus.post(getConfiguration()); + } + + @Override + public void stop() { + LOGGER.debug("Stopping..."); + if (agentNodeCache != null) { + try { + agentNodeCache.close(); + } catch (IOException e) { + LOGGER.warn("Encountered exception while stopping", e); + lifecycleState = LifecycleState.ERROR; + } + } + + try { + client.close(); + } catch (Exception e) { + LOGGER.warn("Error stopping Curator client", e); + lifecycleState = LifecycleState.ERROR; + } + + if (lifecycleState != LifecycleState.ERROR) { + lifecycleState = LifecycleState.STOP; + } + } + + @Override + public LifecycleState getLifecycleState() { + return lifecycleState; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java index d7438d9..bc5438a 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java @@ -21,17 +21,13 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.Enumeration; import java.util.HashMap; -import java.util.Map; import java.util.Properties; import org.apache.flume.conf.FlumeConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** * <p> * A configuration provider that uses properties file for specifying @@ -206,15 +202,4 @@ public class PropertiesFileConfigurationProvider extends } return new FlumeConfiguration(new HashMap<String, String>()); } - - private Map<String, String> toMap(Properties properties) { - Map<String, String> result = Maps.newHashMap(); - Enumeration<?> propertyNames = properties.propertyNames(); - while (propertyNames.hasMoreElements()) { - String name = (String) propertyNames.nextElement(); - String value = properties.getProperty(name); - result.put(name, value); - } - return result; - } } http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..551e9dd --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flume.FlumeException; +import org.apache.flume.conf.FlumeConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StaticZooKeeperConfigurationProvider extends + AbstractZooKeeperConfigurationProvider { + + private static final Logger LOGGER = LoggerFactory + .getLogger(StaticZooKeeperConfigurationProvider.class); + + public StaticZooKeeperConfigurationProvider(String agentName, + String zkConnString, String basePath) { + super(agentName, zkConnString, basePath); + } + + @Override + protected FlumeConfiguration getFlumeConfiguration() { + try { + CuratorFramework cf = createClient(); + cf.start(); + try { + byte[] data = cf.getData().forPath(basePath + "/" + getAgentName()); + return configFromBytes(data); + } finally { + cf.close(); + } + } catch (Exception e) { + LOGGER.error("Error getting configuration info from Zookeeper", e); + throw new FlumeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..1ab4127 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Charsets; +import junit.framework.Assert; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.EnsurePath; +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.conf.FlumeConfigurationError; +import org.junit.After; +import org.junit.Before; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public abstract class TestAbstractZooKeeperConfigurationProvider { + + private static final String FLUME_CONF_FILE = "flume-conf.properties"; + + protected static final String AGENT_NAME = "a1"; + + protected static final String AGENT_PATH = + AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH + + "/" + AGENT_NAME; + + protected TestingServer zkServer; + protected CuratorFramework client; + + @Before + public void setUp() throws Exception { + zkServer = new TestingServer(); + client = CuratorFrameworkFactory + .newClient("localhost:" + zkServer.getPort(), + new ExponentialBackoffRetry(1000, 3)); + client.start(); + + EnsurePath ensurePath = new EnsurePath(AGENT_PATH); + ensurePath.ensure(client.getZookeeperClient()); + doSetUp(); + } + + protected abstract void doSetUp() throws Exception; + + @After + public void tearDown() throws Exception { + doTearDown(); + zkServer.close(); + client.close(); + } + + protected abstract void doTearDown() throws Exception; + + protected void addData() throws Exception { + Reader in = new InputStreamReader(getClass().getClassLoader() + .getResourceAsStream(FLUME_CONF_FILE), Charsets.UTF_8); + try { + String config = IOUtils.toString(in); + client.setData().forPath(AGENT_PATH, config.getBytes()); + } finally { + in.close(); + } + } + + protected void verifyProperties(AbstractConfigurationProvider cp) { + FlumeConfiguration configuration = cp.getFlumeConfiguration(); + Assert.assertNotNull(configuration); + + /* + * Test the known errors in the file + */ + List<String> expected = Lists.newArrayList(); + expected.add("host5 CONFIG_ERROR"); + expected.add("host5 INVALID_PROPERTY"); + expected.add("host4 CONFIG_ERROR"); + expected.add("host4 CONFIG_ERROR"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 PROPERTY_VALUE_NULL"); + expected.add("host4 AGENT_CONFIGURATION_INVALID"); + expected.add("ch2 ATTRS_MISSING"); + expected.add("host3 CONFIG_ERROR"); + expected.add("host3 PROPERTY_VALUE_NULL"); + expected.add("host3 AGENT_CONFIGURATION_INVALID"); + expected.add("host2 PROPERTY_VALUE_NULL"); + expected.add("host2 AGENT_CONFIGURATION_INVALID"); + List<String> actual = Lists.newArrayList(); + for (FlumeConfigurationError error : configuration + .getConfigurationErrors()) { + actual.add(error.getComponentName() + " " + + error.getErrorType().toString()); + } + Collections.sort(expected); + Collections.sort(actual); + Assert.assertEquals(expected, actual); + + FlumeConfiguration.AgentConfiguration agentConfiguration = configuration + .getConfigurationFor("host1"); + Assert.assertNotNull(agentConfiguration); + + Set<String> sources = Sets.newHashSet("source1"); + Set<String> sinks = Sets.newHashSet("sink1"); + Set<String> channels = Sets.newHashSet("channel1"); + + Assert.assertEquals(sources, agentConfiguration.getSourceSet()); + Assert.assertEquals(sinks, agentConfiguration.getSinkSet()); + Assert.assertEquals(channels, agentConfiguration.getChannelSet()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..e59a438 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import junit.framework.Assert; + +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; +import org.apache.flume.lifecycle.LifecycleController; +import org.apache.flume.lifecycle.LifecycleState; +import org.junit.Test; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + +public class TestPollingZooKeeperConfigurationProvider extends + TestAbstractZooKeeperConfigurationProvider { + + private EventBus eb; + + private EventSync es; + + private PollingZooKeeperConfigurationProvider cp; + + private class EventSync { + + private boolean notified; + + @Subscribe + public synchronized void notifyEvent(MaterializedConfiguration mConfig) { + notified = true; + notifyAll(); + } + + public synchronized void awaitEvent() throws InterruptedException { + while (!notified) { + wait(); + } + } + + public synchronized void reset() { + notified = false; + } + } + + @Override + protected void doSetUp() throws Exception { + eb = new EventBus("test"); + es = new EventSync(); + es.reset(); + eb.register(es); + cp = new PollingZooKeeperConfigurationProvider(AGENT_NAME, "localhost:" + + zkServer.getPort(), null, eb); + cp.start(); + LifecycleController.waitForOneOf(cp, LifecycleState.START_OR_ERROR); + } + + @Override + protected void doTearDown() throws Exception { + // do nothing + } + + @Test + public void testPolling() throws Exception { + es.awaitEvent(); + es.reset(); + + FlumeConfiguration fc = cp.getFlumeConfiguration(); + Assert.assertTrue(fc.getConfigurationErrors().isEmpty()); + AgentConfiguration ac = fc.getConfigurationFor(AGENT_NAME); + Assert.assertNull(ac); + + addData(); + es.awaitEvent(); + es.reset(); + + verifyProperties(cp); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java new file mode 100644 index 0000000..dddcffe --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.node; + +import org.junit.Test; + +public class TestStaticZooKeeperConfigurationProvider extends + TestAbstractZooKeeperConfigurationProvider { + + private StaticZooKeeperConfigurationProvider configurationProvider; + + @Override + protected void doSetUp() throws Exception { + addData(); + configurationProvider = new StaticZooKeeperConfigurationProvider( + AGENT_NAME, "localhost:" + zkServer.getPort(), null); + } + + @Override + protected void doTearDown() throws Exception { + // do nothing + } + + @Test + public void testPropertyRead() throws Exception { + verifyProperties(configurationProvider); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/e296f0d7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4bdfcac..150db2e 100644 --- a/pom.xml +++ b/pom.xml @@ -1276,6 +1276,26 @@ limitations under the License. <version>1.1.0</version> </dependency> + <!-- Dependency for Zk provider --> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>2.3.0</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>2.3.0</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>2.3.0</version> + <scope>test</scope> + </dependency> + </dependencies> </dependencyManagement>
