This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch FLUME-3335 in repository https://gitbox.apache.org/repos/asf/flume.git
commit 2b4d546e41f1f8256633dfa3f62810aa2ebffdf3 Author: Ralph Goers <[email protected]> AuthorDate: Tue Oct 26 23:54:12 2021 -0700 FLUME-3335 - Support configuration via HTTP(S) --- flume-ng-doc/sphinx/FlumeUserGuide.rst | 99 ++++++++- flume-ng-node/pom.xml | 24 ++- .../flume/node/AbstractConfigurationProvider.java | 3 + .../java/org/apache/flume/node/Application.java | 237 ++++++++++++++++----- .../flume/node/ClasspathConfigurationSource.java | 71 ++++++ .../node/ClasspathConfigurationSourceFactory.java | 41 ++++ .../org/apache/flume/node/ConfigurationSource.java | 56 +++++ .../flume/node/ConfigurationSourceFactory.java | 50 +++++ .../flume/node/EnvVarResolverProperties.java | 5 + .../apache/flume/node/FileConfigurationSource.java | 110 ++++++++++ .../flume/node/FileConfigurationSourceFactory.java | 42 ++++ .../apache/flume/node/HttpConfigurationSource.java | 150 +++++++++++++ .../flume/node/HttpConfigurationSourceFactory.java | 42 ++++ .../java/org/apache/flume/node/MapResolver.java | 70 ++++++ .../flume/node/MaterializedConfiguration.java | 12 +- ...PollingPropertiesFileConfigurationProvider.java | 143 +------------ .../node/PropertiesFileConfigurationProvider.java | 58 +---- ...Provider.java => UriConfigurationProvider.java} | 237 ++++++++++++++++----- .../flume/node/net/AuthorizationProvider.java | 27 +++ .../flume/node/net/BasicAuthorizationProvider.java | 45 ++++ .../apache/flume/node/net/LaxHostnameVerifier.java | 38 ++++ .../flume/node/net/UrlConnectionFactory.java | 104 +++++++++ ...rg.apache.flume.node.ConfigurationSourceFactory | 17 ++ .../node/TestClasspathConfigurationSource.java | 73 +++++++ .../java/org/apache/flume/node/TestEnvLookup.java | 58 +++++ .../flume/node/TestHttpConfigurationSource.java | 141 ++++++++++++ .../org/apache/flume/node/TestOverrideFile.java | 62 ++++++ ...PollingPropertiesFileConfigurationProvider.java | 8 +- .../TestPropertiesFileConfigurationProvider.java | 8 +- .../org/apache/flume/node/TestRecursiveLookup.java | 58 +++++ .../test/resources/flume-conf-override.properties | 21 ++ .../resources/flume-conf-with-envLookup.properties | 35 +++ .../flume-conf-with-recursiveLookup.properties | 37 ++++ pom.xml | 33 ++- 34 files changed, 1898 insertions(+), 317 deletions(-) diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 3fdfc67..315e5c4 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -118,12 +118,11 @@ Setup Setting up an agent ------------------- -Flume agent configuration is stored in a local configuration file. This is a -text file that follows the Java properties file format. -Configurations for one or more agents can be specified in the same -configuration file. The configuration file includes properties of each source, -sink and channel in an agent and how they are wired together to form data -flows. +Flume agent configuration is stored in one or more configuration files that +follow the Java properties file format. Configurations for one or more agents +can be specified in these configuration files. The configuration includes +properties of each source, sink and channel in an agent and how they are wired +together to form data flows. Configuring individual components ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -225,25 +224,105 @@ The original Flume terminal will output the event in a log message. Congratulations - you've successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail. -Using environment variables in configuration files +Configuration from URIs +~~~~~~~~~~~~~~~~~~~~~~~ +As of version 1.10.0 Flume supports being configured using URIs instead of just from local files. Direct support +for HTTP(S), file, and classpath URIs is included. The HTTP support includes support for authentication using +basic authorization but other authorization mechanisms may be supported by specifying the fully qualified name +of the class that implements the AuthorizationProvider interface using the --auth-provider option. HTTP also +supports reloading of configuration files using polling if the target server properly responds to the If-Modified-Since +header. + +To specify credentials for HTTP authentication add:: + + --conf-user userid --conf-password password + +to the startup command. + +Multiple Configuration Files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +As of version 1.10.0 Flume supports being configured from multiple configuration files instead of just one. +This more easily allows values to be overridden or added based on specific environments. Each file should +be configured using its own --conf-file or --conf-uri option. However, all files should either be provided +with --conf-file or with --conf-uri. If --conf-file and --conf-uri appear together as options all --conf-uri +configurations will be processed before any of the --conf-file configurations are merged. + +For example, a configuration of:: + + $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-uri http://localhost:80/flume.conf --conf-uri http://localhost:80/override.conf --name a1 -Dflume.root.logger=INFO,console + +will cause flume.conf to be read first, override.conf to be merged with it and finally example.conf would be +merged last. If it is desirec to have example.conf be the base configuration it should be specified using the +--conf-uri option either as:: + + --conf-uri classpath://example.conf + or + --conf-uri file:///example.conf + +depending on how it should be accessed. + +Using environment variables, system properies, or other properties configuration files ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Flume has the ability to substitute environment variables in the configuration. For example:: a1.sources = r1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 - a1.sources.r1.port = ${NC_PORT} + a1.sources.r1.port = ${env:NC_PORT} a1.sources.r1.channels = c1 NB: it currently works for values only, not for keys. (Ie. only on the "right side" of the `=` mark of the config lines.) -This can be enabled via Java system properties on agent invocation by setting `propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties`. +As of version 1.10.0 Flume resolves configuration values using Apache Commons Text's StringSubstitutor +class using the default set of Lookups along with a lookup that uses the configuration files as a +source for replacement values. For example:: - $ NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties + $ NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console Note the above is just an example, environment variables can be configured in other ways, including being set in `conf/flume-env.sh`. +As noted, system properties are also supported, so the configuration:: + + a1.sources = r1 + a1.sources.r1.type = netcat + a1.sources.r1.bind = 0.0.0.0 + a1.sources.r1.port = ${sys:NC_PORT} + a1.sources.r1.channels = c1 + +could be used and the startup command could be:: + + $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console -DNC_PORT=44444 + +Furthermore, because multiple configuration files are allowed the first file could contain:: + + a1.sources = r1 + a1.sources.r1.type = netcat + a1.sources.r1.bind = 0.0.0.0 + a1.sources.r1.port = ${NC_PORT} + a1.sources.r1.channels = c1 + +and the override file could contain:: + + NC_PORT = 44444 + +In this case the startup command could be:: + + $ bin/flume-ng agent --conf conf --conf-file example.conf --conf-file override.conf --name a1 -Dflume.root.logger=INFO,console + +Note that the method for specifying environment variables as was done in prior versions will stil work +but has been deprecated in favor of using ${env:varName}. + +Using a command options file +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Instead of specifying all the command options on the command line as of version 1.10.0 command +options may be placed in either /etc/flume/flume.opts or flume.opts on the classpath. An example +might be:: + + conf-file = example.conf + conf-file = override.conf + name = a1 + Logging raw data ~~~~~~~~~~~~~~~~ diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml index 65fcb50..1db864a 100644 --- a/flume-ng-node/pom.xml +++ b/flume-ng-node/pom.xml @@ -106,6 +106,11 @@ </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-text</artifactId> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> @@ -134,15 +139,26 @@ </exclusions> </dependency> - <dependency> + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> - </dependency> + </dependency> - <dependency> + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> - </dependency> + </dependency> + + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-annotations</artifactId> + <optional>true</optional> + </dependency> </dependencies> diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index caf4522..2f0b643 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -69,6 +69,9 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +@SuppressFBWarnings("REC_CATCH_EXCEPTION") public abstract class AbstractConfigurationProvider implements ConfigurationProvider { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class); diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 406bb7d..1f4df59 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -19,19 +19,29 @@ package org.apache.flume.node; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; -import org.apache.flume.Constants; import org.apache.flume.Context; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; @@ -41,19 +51,16 @@ import org.apache.flume.lifecycle.LifecycleAware; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.lifecycle.LifecycleSupervisor; import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy; +import org.apache.flume.node.net.AuthorizationProvider; +import org.apache.flume.node.net.BasicAuthorizationProvider; import org.apache.flume.util.SSLUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; public class Application { @@ -63,6 +70,8 @@ public class Application { public static final String CONF_MONITOR_CLASS = "flume.monitoring.type"; public static final String CONF_MONITOR_PREFIX = "flume.monitoring."; + private static final int DEFAULT_INTERVAL = 300; + private static final int DEFAULT_FILE_INTERVAL = 30; private final List<LifecycleAware> components; private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; @@ -109,8 +118,8 @@ public class Application { public void stop() { lifecycleLock.lock(); - stopAllComponents(); try { + stopAllComponents(); supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); @@ -231,7 +240,7 @@ public class Application { //Not a known type, use FQCN klass = (Class<? extends MonitorService>) Class.forName(monitorType); } - this.monitorServer = klass.newInstance(); + this.monitorServer = klass.getConstructor().newInstance(); Context context = new Context(); for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { @@ -242,14 +251,14 @@ public class Application { monitorServer.configure(context); monitorServer.start(); } - } catch (Exception e) { + } catch (ReflectiveOperationException e) { logger.warn("Error starting monitoring. " + "Monitoring might not be available.", e); } - } public static void main(String[] args) { + Properties initProps = loadConfigOpts(); try { SSLUtil.initGlobalSSLParameters(); @@ -261,7 +270,40 @@ public class Application { options.addOption(option); option = new Option("f", "conf-file", true, - "specify a config file (required if -z missing)"); + "specify a config file (required if -c, -u, and -z are missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option("u", "conf-uri", true, + "specify a config uri (required if -c, -f and -z are missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option("a", "auth-provider", true, + "specify an authorization provider class"); + option.setRequired(false); + options.addOption(option); + + option = new Option("c", "conf-provider", true, + "specify a configuration provider class (required if -f, -u, and -z are missing)"); + option.setRequired(false); + options.addOption(option); + + option = new Option("n", "conf-user", true, "user name to access configuration uri"); + option.setRequired(false); + options.addOption(option); + + option = new Option("p", "conf-password", true, "password to access configuration uri"); + option.setRequired(false); + options.addOption(option); + + option = new Option("i", "poll-interval", true, + "number of seconds between checks for a configuration change"); + option.setRequired(false); + options.addOption(option); + + option = new Option("b", "backup-directory", true, + "directory in which to store the backup configuration file"); option.setRequired(false); options.addOption(option); @@ -271,7 +313,7 @@ public class Application { // Options for Zookeeper option = new Option("z", "zkConnString", true, - "specify the ZooKeeper connection to use (required if -f missing)"); + "specify the ZooKeeper connection to use (required if -c, -f, and -u are missing)"); option.setRequired(false); options.addOption(option); @@ -283,8 +325,8 @@ public class Application { option = new Option("h", "help", false, "display help text"); options.addOption(option); - CommandLineParser parser = new GnuParser(); - CommandLine commandLine = parser.parse(options, args); + DefaultParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args, initProps); if (commandLine.hasOption('h')) { new HelpFormatter().printHelp("flume-ng agent", options, true); @@ -299,8 +341,41 @@ public class Application { isZkConfigured = true; } + List<URI> confUri = null; + ConfigurationProvider provider = null; + int defaultInterval = DEFAULT_FILE_INTERVAL; + if (commandLine.hasOption('u') || commandLine.hasOption("conf-uri")) { + confUri = new ArrayList<>(); + for (String uri : commandLine.getOptionValues("conf-uri")) { + if (uri.toLowerCase(Locale.ROOT).startsWith("http")) { + defaultInterval = DEFAULT_INTERVAL; + } + confUri.add(new URI(uri)); + } + } else if (commandLine.hasOption("f") || commandLine.hasOption("conf-file")) { + confUri = new ArrayList<>(); + for (String filePath : commandLine.getOptionValues("conf-file")) { + confUri.add(new File(filePath).toURI()); + } + } + + if (commandLine.hasOption("c") || commandLine.hasOption("conf-provider")) { + String className = commandLine.getOptionValue("conf-provider"); + try { + Class<?> clazz = Application.class.getClassLoader().loadClass(className); + Constructor<?> constructor = clazz.getConstructor(String[].class); + provider = (ConfigurationProvider) constructor.newInstance((Object[]) args); + } catch (ReflectiveOperationException ex) { + logger.error("Error creating ConfigurationProvider {}", className, ex); + } + } + Application application; - if (isZkConfigured) { + if (provider != null) { + List<LifecycleAware> components = Lists.newArrayList(); + application = new Application(components); + application.handleConfigurationEvent(provider.getConfiguration()); + } else if (isZkConfigured) { // get options String zkConnectionStr = commandLine.getOptionValue('z'); String baseZkPath = commandLine.getOptionValue('p'); @@ -321,44 +396,65 @@ public class Application { application = new Application(); application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration()); } - } else { - File configurationFile = new File(commandLine.getOptionValue('f')); - - /* - * The following is to ensure that by default the agent will fail on - * startup if the file does not exist. - */ - if (!configurationFile.exists()) { - // If command line invocation, then need to fail fast - if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == - null) { - String path = configurationFile.getPath(); - try { - path = configurationFile.getCanonicalPath(); - } catch (IOException ex) { - logger.error("Failed to read canonical path for file: " + path, - ex); + } else if (confUri != null) { + String confUser = commandLine.getOptionValue("conf-user"); + String confPassword = commandLine.getOptionValue("conf-password"); + String pollInterval = commandLine.getOptionValue("poll-interval"); + String backupDirectory = commandLine.getOptionValue("backup-directory"); + int interval = StringUtils.isNotEmpty(pollInterval) ? Integer.parseInt(pollInterval) : 0; + String verify = commandLine.getOptionValue("verify-host", "true"); + boolean verifyHost = Boolean.parseBoolean(verify); + AuthorizationProvider authorizationProvider = null; + String authProviderClass = commandLine.getOptionValue("auth-provider"); + if (authProviderClass != null) { + try { + Class<?> clazz = Class.forName(authProviderClass); + Object obj = clazz.getDeclaredConstructor(String[].class) + .newInstance((Object[]) args); + if (obj instanceof AuthorizationProvider) { + authorizationProvider = (AuthorizationProvider) obj; + } else { + logger.error( + "The supplied authorization provider does not implement AuthorizationProvider"); + return; } - throw new ParseException( - "The specified configuration file does not exist: " + path); + } catch (ReflectiveOperationException ex) { + logger.error("Unable to create authorization provider: {}", ex.getMessage()); + return; + } + } + if (authorizationProvider == null && StringUtils.isNotEmpty(confUser) + && StringUtils.isNotEmpty(confPassword)) { + authorizationProvider = new BasicAuthorizationProvider(confUser, confPassword); + } + EventBus eventBus = null; + if (reload) { + eventBus = new EventBus(agentName + "-event-bus"); + if (interval == 0) { + interval = defaultInterval; + } + } + List<ConfigurationSource> configurationSources = new ArrayList<>(); + for (URI uri : confUri) { + ConfigurationSource configurationSource = + ConfigurationSourceFactory.getConfigurationSource(uri, authorizationProvider, + verifyHost); + if (configurationSource != null) { + configurationSources.add(configurationSource); } } List<LifecycleAware> components = Lists.newArrayList(); + UriConfigurationProvider configurationProvider = new UriConfigurationProvider(agentName, + configurationSources, backupDirectory, eventBus, interval); + components.add(configurationProvider); - if (reload) { - EventBus eventBus = new EventBus(agentName + "-event-bus"); - PollingPropertiesFileConfigurationProvider configurationProvider = - new PollingPropertiesFileConfigurationProvider( - agentName, configurationFile, eventBus, 30); - components.add(configurationProvider); - application = new Application(components); + application = new Application(components); + if (eventBus != null) { eventBus.register(application); - } else { - PropertiesFileConfigurationProvider configurationProvider = - new PropertiesFileConfigurationProvider(agentName, configurationFile); - application = new Application(); - application.handleConfigurationEvent(configurationProvider.getConfiguration()); } + application.handleConfigurationEvent(configurationProvider.getConfiguration()); + } else { + throw new ParseException("No configuiration was provided"); } application.start(); @@ -370,8 +466,35 @@ public class Application { } }); - } catch (Exception e) { + } catch (ParseException | URISyntaxException | RuntimeException e) { logger.error("A fatal error occurred while running. Exception follows.", e); } } + @SuppressWarnings("PMD") + private static Properties loadConfigOpts() { + Properties initProps = new Properties(); + InputStream is = null; + try { + is = new FileInputStream("/etc/flume/flume.opts"); + } catch (IOException ex) { + // Ignore the exception. + } + if (is == null) { + is = Application.class.getClassLoader().getResourceAsStream("flume.opts"); + } + if (is != null) { + try { + initProps.load(is); + } catch (Exception ex) { + logger.warn("Unable to load options file due to: {}", ex.getMessage()); + } finally { + try { + is.close(); + } catch (IOException ex) { + // Ignore this error. + } + } + } + return initProps; + } } \ No newline at end of file diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java new file mode 100644 index 0000000..1e456b5 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSource.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.InputStream; +import java.net.URI; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.conf.ConfigurationException; + +public class ClasspathConfigurationSource implements ConfigurationSource { + + private final String path; + private final URI uri; + + public ClasspathConfigurationSource(URI uri) { + this.uri = uri; + if (StringUtils.isNotEmpty(uri.getPath())) { + // classpath:///filename && classpath:/filename + this.path = uri.getPath().substring(1); + } else if (StringUtils.isNotEmpty(uri.getAuthority())) { + // classpath://filename + this.path = uri.getAuthority(); + } else if (StringUtils.isNotEmpty(uri.getSchemeSpecificPart())) { + // classpath:filename + this.path = uri.getSchemeSpecificPart(); + } else { + throw new ConfigurationException("Invalid uri: " + uri); + } + } + + @Override + public InputStream getInputStream() { + return this.getClass().getClassLoader().getResourceAsStream(path); + } + + @Override + public String getUri() { + return this.uri.toString(); + } + + @Override + public String getExtension() { + int length = uri.getPath().indexOf("."); + if (length <= 1) { + return PROPERTIES; + } + return uri.getPath().substring(length + 1); + } + + @Override + public String toString() { + return "{ classpath: " + path + "}"; + } + +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java new file mode 100644 index 0000000..2921f35 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/ClasspathConfigurationSourceFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.net.URI; +import java.util.List; + +import org.apache.flume.node.net.AuthorizationProvider; + +import com.google.common.collect.Lists; + +/** + * Creates a ConfigurationSource from a file on the classpath.. + */ +public class ClasspathConfigurationSourceFactory implements ConfigurationSourceFactory { + + private static final List<String> SCHEMES = Lists.newArrayList("classpath"); + + public List<String> getSchemes() { + return SCHEMES; + } + + public ConfigurationSource createConfigurationSource(URI uri, + AuthorizationProvider authorizationProvider, boolean verifyHost) { + return new ClasspathConfigurationSource(uri); + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java new file mode 100644 index 0000000..ccd849e --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.io.InputStream; + +/** + * Interface for retrieving configuration data. + */ +public interface ConfigurationSource { + + static final String PROPERTIES = "properties"; + static final String JSON = "json"; + static final String YAML = "yaml"; + static final String XML = "xml"; + + /** + * Returns the InputStream if it hasn't already been processed. + * @return The InputStream or null. + */ + InputStream getInputStream(); + + /** + * Returns the URI string. + * @return The string URI. + */ + String getUri(); + + /** + * Determine if the configuration data source has been modified since it was last checked. + * @return true if the data was modified. + */ + default boolean isModified() { + return false; + } + + /** + * Return the "file" extension for the specified uri. + * @return The file extension. + */ + String getExtension(); +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java new file mode 100644 index 0000000..1d57949 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationSourceFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.net.URI; +import java.util.List; +import java.util.ServiceLoader; + +import org.apache.flume.node.net.AuthorizationProvider; + +/** + * Creates ConfigurationSources. + */ +public interface ConfigurationSourceFactory { + + static ConfigurationSource getConfigurationSource(URI uri, + AuthorizationProvider authorizationProvider, boolean verifyHost) { + + String protocol = uri.getScheme(); + final ServiceLoader<ConfigurationSourceFactory> serviceLoader = + ServiceLoader.load(ConfigurationSourceFactory.class, + ConfigurationSourceFactory.class.getClassLoader()); + for (final ConfigurationSourceFactory configurationSourceFactory : serviceLoader) { + if (configurationSourceFactory.getSchemes().contains(protocol)) { + return configurationSourceFactory.createConfigurationSource(uri, authorizationProvider, + verifyHost); + } + } + return null; + } + + List<String> getSchemes(); + + ConfigurationSource createConfigurationSource(URI uri, + AuthorizationProvider authorizationProvider, boolean verifyHost); +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java index e0b0d22..ce64de8 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/EnvVarResolverProperties.java @@ -28,8 +28,13 @@ import java.util.regex.Pattern; * A class that extends the Java built-in Properties overriding * {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style environment * variable inclusions + * @deprecated Use ${env:key} instead. */ +@Deprecated public class EnvVarResolverProperties extends Properties { + + private static final long serialVersionUID = -9134232469049352862L; + /** * @param input The input string with ${ENV_VAR_NAME}-style environment variable names * @return The output string with ${ENV_VAR_NAME} replaced with their environment variable values diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java new file mode 100644 index 0000000..4db66a0 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSource.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.flume.CounterGroup; +import org.apache.flume.conf.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileConfigurationSource implements ConfigurationSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileConfigurationSource.class); + + private final Path path; + private final URI uri; + private final CounterGroup counterGroup; + private byte[] data; + private long lastChange; + + public FileConfigurationSource(URI uri) { + this.uri = uri; + this.path = Paths.get(uri); + counterGroup = new CounterGroup(); + try { + this.lastChange = path.toFile().lastModified(); + data = Files.readAllBytes(this.path); + } catch (IOException ioe) { + LOGGER.error("Unable to read {}: {}", path.toString(), ioe.getMessage()); + throw new ConfigurationException("Unable to read file " + path.toString(), ioe); + } + } + + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(data); + } + + @Override + public String getUri() { + return this.uri.toString(); + } + + @Override + public String getExtension() { + int length = uri.getPath().indexOf("."); + if (length <= 1) { + return PROPERTIES; + } + return uri.getPath().substring(length + 1); + } + + @Override + public boolean isModified() { + LOGGER.debug("Checking file:{} for changes", path.toString()); + + counterGroup.incrementAndGet("file.checks"); + + long lastModified = path.toFile().lastModified(); + + if (lastModified > lastChange) { + LOGGER.info("Reloading configuration file:{}", path.toString()); + + counterGroup.incrementAndGet("file.loads"); + + lastChange = lastModified; + + try { + data = Files.readAllBytes(path); + return true; + } catch (Exception e) { + LOGGER.error("Failed to load configuration data. Exception follows.", e); + } catch (NoClassDefFoundError e) { + LOGGER.error("Failed to start agent because dependencies were not found in classpath." + + "Error follows.", e); + } catch (Throwable t) { + // caught because the caller does not handle or log Throwables + LOGGER.error("Unhandled error", t); + } + } + return false; + } + + @Override + public String toString() { + return "{ file:" + path.toString() + "}"; + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java new file mode 100644 index 0000000..6325fe8 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/FileConfigurationSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.net.URI; +import java.util.List; + +import org.apache.flume.node.net.AuthorizationProvider; + +import com.google.common.collect.Lists; + +/** + * Creates a FileConfigurationSource. + */ +public class FileConfigurationSourceFactory implements ConfigurationSourceFactory { + + @SuppressWarnings(value = {"EI_EXPOSE_REP"}) + private static final List<String> SCHEMES = Lists.newArrayList("file"); + + public List<String> getSchemes() { + return SCHEMES; + } + + public ConfigurationSource createConfigurationSource(URI uri, + AuthorizationProvider authorizationProvider, boolean verifyHost) { + return new FileConfigurationSource(uri); + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java new file mode 100644 index 0000000..6beafe3 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSource.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URI; + +import org.apache.commons.io.IOUtils; +import org.apache.flume.CounterGroup; +import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.node.net.AuthorizationProvider; +import org.apache.flume.node.net.UrlConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpConfigurationSource implements ConfigurationSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpConfigurationSource.class); + private static final int NOT_MODIFIED = 304; + private static final int OK = 200; + private static final int BUF_SIZE = 1024; + + private final URI uri; + private final CounterGroup counterGroup; + private final AuthorizationProvider authorizationProvider; + private final boolean verifyHost; + private long lastModified = 0; + private byte[] data = null; + + public HttpConfigurationSource(URI uri, AuthorizationProvider authorizationProvider, + boolean verifyHost) { + this.authorizationProvider = authorizationProvider; + this.uri = uri; + this.verifyHost = verifyHost; + counterGroup = new CounterGroup(); + readInputStream(); + } + + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(data); + } + + @Override + public String getUri() { + return this.uri.toString(); + } + + @Override + public String getExtension() { + int length = uri.getPath().indexOf("."); + if (length <= 1) { + return PROPERTIES; + } + return uri.getPath().substring(length + 1); + } + + @Override + public boolean isModified() { + LOGGER.debug("Checking {} for changes", uri); + + counterGroup.incrementAndGet("uri.checks"); + try { + LOGGER.info("Reloading configuration from:{}", uri); + if (readInputStream()) { + counterGroup.incrementAndGet("uri.loads"); + return true; + } + } catch (ConfigurationException ex) { + LOGGER.error("Unable to access configuration due to {}: ", ex.getMessage()); + } + return false; + } + + private boolean readInputStream() { + try { + HttpURLConnection connection = UrlConnectionFactory.createConnection(uri.toURL(), + authorizationProvider, lastModified, verifyHost); + connection.connect(); + + int code = connection.getResponseCode(); + switch (code) { + case NOT_MODIFIED: { + LOGGER.debug("Configuration Not Modified"); + return false; + } + case OK: { + try (InputStream is = connection.getInputStream()) { + lastModified = connection.getLastModified(); + LOGGER.debug("Content was modified for {}", uri.toString()); + data = IOUtils.toByteArray(is); + return true; + } catch (final IOException e) { + try (InputStream es = connection.getErrorStream()) { + LOGGER.info("Error accessing configuration at {}: {}", uri, readStream(es)); + } catch (final IOException ioe) { + LOGGER.error("Error accessing configuration at {}: {}", uri, e.getMessage()); + } + throw new ConfigurationException("Unable to access " + uri.toString(), e); + } + } + default: { + if (code < 0) { + LOGGER.info("Invalid response code returned"); + } else { + LOGGER.info("Unexpected response code returned {}", code); + } + return false; + } + } + } catch (IOException e) { + LOGGER.warn("Error accessing {}: {}", uri.toString(), e.getMessage()); + throw new ConfigurationException("Unable to access " + uri.toString(), e); + } + } + + private byte[] readStream(InputStream is) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); + byte[] buffer = new byte[BUF_SIZE]; + int length; + while ((length = is.read(buffer)) != -1) { + result.write(buffer, 0, length); + } + return result.toByteArray(); + } + + @Override + public String toString() { + return "{ uri:" + uri + "}"; + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java new file mode 100644 index 0000000..10725f0 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/HttpConfigurationSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.net.URI; +import java.util.List; + +import org.apache.flume.node.net.AuthorizationProvider; + +import com.google.common.collect.Lists; + +/** + * Creates an HttpConfigurationSource. + */ +public class HttpConfigurationSourceFactory implements ConfigurationSourceFactory { + + @SuppressWarnings(value = {"EI_EXPOSE_REP"}) + private static final List<String> SCHEMES = Lists.newArrayList("http", "https"); + + public List<String> getSchemes() { + return SCHEMES; + } + + public ConfigurationSource createConfigurationSource(URI uri, + AuthorizationProvider authorizationProvider, boolean verifyHost) { + return new HttpConfigurationSource(uri, authorizationProvider, verifyHost); + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java new file mode 100644 index 0000000..f7571b9 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/MapResolver.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.commons.text.lookup.StringLookup; +import org.apache.commons.text.lookup.StringLookupFactory; + +/** + * Resolves replaceable tokens to create a Map. + * <p> + * Needs org.apache.commons:commons-lang3 on classpath + */ +final class MapResolver { + + private static final String PROPS_IMPL_KEY = "propertiesImplementation"; + private static final String ENV_VAR_PROPERTY = "org.apache.flume.node.EnvVarResolverProperties"; + + public static Map<String, String> resolveProperties(Properties properties) { + Map<String, String> map = new HashMap<>(); + boolean useEnvVars = ENV_VAR_PROPERTY.equals(System.getProperty(PROPS_IMPL_KEY)); + StringLookup defaultLookup = useEnvVars ? new DefaultLookup(map) : + StringLookupFactory.INSTANCE.mapStringLookup(map); + StringLookup lookup = StringLookupFactory.INSTANCE.interpolatorStringLookup(defaultLookup); + StringSubstitutor substitutor = new StringSubstitutor(lookup); + substitutor.setEnableSubstitutionInVariables(true); + properties.stringPropertyNames().forEach((k) -> map.put(k, + substitutor.replace(properties.getProperty(k)))); + return map; + } + + private static class DefaultLookup implements StringLookup { + private final Map<String, String> properties; + + DefaultLookup(Map<String, String> properties) { + this.properties = properties; + } + + /** + * Provide compatibility with EnvVarResolverProperties. + * + * @param key The key. + * @return The value associated with the key or null. + */ + @Override + public String lookup(String key) { + return properties.containsKey(key) ? + properties.get(key) : System.getenv(key); + } + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java index fa3ef55..80d5497 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java @@ -32,16 +32,16 @@ import java.util.Map; */ public interface MaterializedConfiguration { - public void addSourceRunner(String name, SourceRunner sourceRunner); + void addSourceRunner(String name, SourceRunner sourceRunner); - public void addSinkRunner(String name, SinkRunner sinkRunner); + void addSinkRunner(String name, SinkRunner sinkRunner); - public void addChannel(String name, Channel channel); + void addChannel(String name, Channel channel); - public Map<String, SourceRunner> getSourceRunners(); + Map<String, SourceRunner> getSourceRunners(); - public Map<String, SinkRunner> getSinkRunners(); + Map<String, SinkRunner> getSinkRunners(); - public Map<String, Channel> getChannels(); + Map<String, Channel> getChannels(); } diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java index 13cb38f..db040b5 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java @@ -18,143 +18,20 @@ package org.apache.flume.node; import java.io.File; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.flume.CounterGroup; -import org.apache.flume.lifecycle.LifecycleAware; -import org.apache.flume.lifecycle.LifecycleState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.eventbus.EventBus; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -public class PollingPropertiesFileConfigurationProvider - extends PropertiesFileConfigurationProvider - implements LifecycleAware { - - private static final Logger LOGGER = - LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class); - - private final EventBus eventBus; - private final File file; - private final int interval; - private final CounterGroup counterGroup; - private LifecycleState lifecycleState; - - private ScheduledExecutorService executorService; - - public PollingPropertiesFileConfigurationProvider(String agentName, - File file, EventBus eventBus, int interval) { - super(agentName, file); - this.eventBus = eventBus; - this.file = file; - this.interval = interval; - counterGroup = new CounterGroup(); - lifecycleState = LifecycleState.IDLE; - } - - @Override - public void start() { - LOGGER.info("Configuration provider starting"); - - Preconditions.checkState(file != null, - "The parameter file must not be null"); - - executorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") - .build()); - - FileWatcherRunnable fileWatcherRunnable = - new FileWatcherRunnable(file, counterGroup); - - executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, - TimeUnit.SECONDS); - - lifecycleState = LifecycleState.START; - - LOGGER.debug("Configuration provider started"); - } - - @Override - public void stop() { - LOGGER.info("Configuration provider stopping"); - - executorService.shutdown(); - try { - if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { - LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor."); - executorService.shutdownNow(); - while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { - LOGGER.debug("Waiting for file watcher to terminate"); - } - } - } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for file watcher to terminate"); - Thread.currentThread().interrupt(); - } - lifecycleState = LifecycleState.STOP; - LOGGER.debug("Configuration provider stopped"); - } - - @Override - public synchronized LifecycleState getLifecycleState() { - return lifecycleState; - } - - - @Override - public String toString() { - return "{ file:" + file + " counterGroup:" + counterGroup + " provider:" - + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }"; - } - - public class FileWatcherRunnable implements Runnable { - - private final File file; - private final CounterGroup counterGroup; - - private long lastChange; - public FileWatcherRunnable(File file, CounterGroup counterGroup) { - super(); - this.file = file; - this.counterGroup = counterGroup; - this.lastChange = 0L; - } - - @Override - public void run() { - LOGGER.debug("Checking file:{} for changes", file); - - counterGroup.incrementAndGet("file.checks"); - - long lastModified = file.lastModified(); - - if (lastModified > lastChange) { - LOGGER.info("Reloading configuration file:{}", file); - - counterGroup.incrementAndGet("file.loads"); - - lastChange = lastModified; +/** + * @deprecated Use UriConfigurationProvider instead. + */ +@Deprecated +public class PollingPropertiesFileConfigurationProvider extends UriConfigurationProvider { - try { - eventBus.post(getConfiguration()); - } catch (Exception e) { - LOGGER.error("Failed to load configuration data. Exception follows.", - e); - } catch (NoClassDefFoundError e) { - LOGGER.error("Failed to start agent because dependencies were not " + - "found in classpath. Error follows.", e); - } catch (Throwable t) { - // caught because the caller does not handle or log Throwables - LOGGER.error("Unhandled error", t); - } - } - } + public PollingPropertiesFileConfigurationProvider(String agentName, File file, EventBus eventBus, + int interval) { + super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), null, + eventBus, interval); } } diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java index 75f45db..a2b0409 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java @@ -17,16 +17,9 @@ */ package org.apache.flume.node; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.HashMap; -import java.util.Properties; -import org.apache.flume.conf.FlumeConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; /** * <p> @@ -165,52 +158,13 @@ import org.slf4j.LoggerFactory; * </p> * * @see java.util.Properties#load(java.io.Reader) + * @deprecated Use UriConfigurationProvider. */ -public class PropertiesFileConfigurationProvider extends - AbstractConfigurationProvider { - - private static final Logger LOGGER = LoggerFactory - .getLogger(PropertiesFileConfigurationProvider.class); - private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = "java.util.Properties"; - - private final File file; +@Deprecated +public class PropertiesFileConfigurationProvider extends UriConfigurationProvider { public PropertiesFileConfigurationProvider(String agentName, File file) { - super(agentName); - this.file = file; - } - - @Override - public FlumeConfiguration getFlumeConfiguration() { - BufferedReader reader = null; - try { - reader = new BufferedReader(new FileReader(file)); - String resolverClassName = System.getProperty("propertiesImplementation", - DEFAULT_PROPERTIES_IMPLEMENTATION); - Class<? extends Properties> propsclass = Class.forName(resolverClassName) - .asSubclass(Properties.class); - Properties properties = propsclass.newInstance(); - properties.load(reader); - return new FlumeConfiguration(toMap(properties)); - } catch (IOException ex) { - LOGGER.error("Unable to load file:" + file - + " (I/O failure) - Exception follows.", ex); - } catch (ClassNotFoundException e) { - LOGGER.error("Configuration resolver class not found", e); - } catch (InstantiationException e) { - LOGGER.error("Instantiation exception", e); - } catch (IllegalAccessException e) { - LOGGER.error("Illegal access exception", e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException ex) { - LOGGER.warn( - "Unable to close file reader for file: " + file, ex); - } - } - } - return new FlumeConfiguration(new HashMap<String, String>()); + super(agentName, Lists.newArrayList(new FileConfigurationSource(file.toURI())), null, null, 0); + super.start(); } } diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java similarity index 52% copy from flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java copy to flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java index 75f45db..e4c74c6 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/UriConfigurationProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,27 +17,41 @@ */ package org.apache.flume.node; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; -import java.util.HashMap; +import java.io.InputStream; +import java.io.OutputStream; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.flume.CounterGroup; +import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.lifecycle.LifecycleAware; +import org.apache.flume.lifecycle.LifecycleState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.EventBus; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * <p> - * A configuration provider that uses properties file for specifying - * configuration. The configuration files follow the Java properties file syntax - * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every - * configuration value specified in the properties file is prefixed by an + * A configuration provider that uses properties for specifying + * configuration. The configurations follow the Java properties file syntax + * rules specified at {@link Properties#load(java.io.Reader)}. Every + * configuration value specified in the properties is prefixed by an * <em>Agent Name</em> which helps isolate an individual agent's namespace. * </p> * <p> - * Valid configuration files must observe the following rules for every agent + * Valid configurations must observe the following rules for every agent * namespace. * <ul> * <li>For every <agent name> there must be three lists specified that @@ -101,7 +115,7 @@ import org.slf4j.LoggerFactory; * <li>Sinks not assigned to a group will be assigned to default single sink * groups.</li> * </ul> - * + * <p> * Apart from the above required configuration values, each source, sink or * channel can have its own set of arbitrary configuration as required by the * implementation. Each of these configuration values are expressed by fully @@ -164,53 +178,180 @@ import org.slf4j.LoggerFactory; * * </p> * - * @see java.util.Properties#load(java.io.Reader) + * @see Properties#load(java.io.Reader) */ -public class PropertiesFileConfigurationProvider extends - AbstractConfigurationProvider { +public class UriConfigurationProvider extends AbstractConfigurationProvider + implements LifecycleAware { - private static final Logger LOGGER = LoggerFactory - .getLogger(PropertiesFileConfigurationProvider.class); - private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = "java.util.Properties"; + private static final Logger LOGGER = LoggerFactory.getLogger(UriConfigurationProvider.class); - private final File file; + private final List<ConfigurationSource> configurationSources; + private final File backupDirectory; + private final EventBus eventBus; + private final int interval; + private final CounterGroup counterGroup; + private LifecycleState lifecycleState = LifecycleState.IDLE; + private ScheduledExecutorService executorService; - public PropertiesFileConfigurationProvider(String agentName, File file) { + public UriConfigurationProvider(String agentName, List<ConfigurationSource> sourceList, + String backupDirectory, EventBus eventBus, int pollInterval) { super(agentName); - this.file = file; + this.configurationSources = sourceList; + this.backupDirectory = backupDirectory != null ? new File(backupDirectory) : null; + this.eventBus = eventBus; + this.interval = pollInterval; + counterGroup = new CounterGroup(); + } + + @Override + public void start() { + if (eventBus != null && interval > 0) { + executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") + .build()); + + WatcherRunnable watcherRunnable = new WatcherRunnable(configurationSources, counterGroup, + eventBus); + + executorService.scheduleWithFixedDelay(watcherRunnable, 0, interval, + TimeUnit.SECONDS); + } + lifecycleState = LifecycleState.START; + } + + @Override + public void stop() { + if (executorService != null) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor."); + executorService.shutdownNow(); + while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("Waiting for file watcher to terminate"); + } + } + } catch (InterruptedException e) { + LOGGER.debug("Interrupted while waiting for file watcher to terminate"); + Thread.currentThread().interrupt(); + } + } + lifecycleState = LifecycleState.STOP; + } + + @Override + public LifecycleState getLifecycleState() { + return lifecycleState; + } + + protected List<ConfigurationSource> getConfigurationSources() { + return configurationSources; } @Override public FlumeConfiguration getFlumeConfiguration() { - BufferedReader reader = null; - try { - reader = new BufferedReader(new FileReader(file)); - String resolverClassName = System.getProperty("propertiesImplementation", - DEFAULT_PROPERTIES_IMPLEMENTATION); - Class<? extends Properties> propsclass = Class.forName(resolverClassName) - .asSubclass(Properties.class); - Properties properties = propsclass.newInstance(); - properties.load(reader); - return new FlumeConfiguration(toMap(properties)); - } catch (IOException ex) { - LOGGER.error("Unable to load file:" + file - + " (I/O failure) - Exception follows.", ex); - } catch (ClassNotFoundException e) { - LOGGER.error("Configuration resolver class not found", e); - } catch (InstantiationException e) { - LOGGER.error("Instantiation exception", e); - } catch (IllegalAccessException e) { - LOGGER.error("Illegal access exception", e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException ex) { - LOGGER.warn( - "Unable to close file reader for file: " + file, ex); + Map<String, String> configMap = null; + Properties properties = new Properties(); + for (ConfigurationSource configurationSource : configurationSources) { + try (InputStream is = configurationSource.getInputStream()) { + if (is != null) { + switch (configurationSource.getExtension()) { + case ConfigurationSource.JSON: case ConfigurationSource.YAML: + case ConfigurationSource.XML: { + LOGGER.warn("File extension type {} is unsupported", + configurationSource.getExtension()); + break; + } + default: { + properties.load(is); + break; + } + } + } + } catch (IOException ioe) { + LOGGER.warn("Unable to load properties from {}: {}", configurationSource.getUri(), + ioe.getMessage()); + } + if (properties.size() > 0) { + configMap = MapResolver.resolveProperties(properties); + } + } + if (configMap != null) { + Properties props = new Properties(); + props.putAll(configMap); + if (backupDirectory != null) { + if (backupDirectory.mkdirs()) { + // This is only being logged to keep Spotbugs happy. We can't ignore the result of mkdirs. + LOGGER.debug("Created directories for {}", backupDirectory.toString()); + } + File backupFile = getBackupFile(backupDirectory, getAgentName()); + try (OutputStream os = new FileOutputStream(backupFile)) { + props.store(os, "Backup created at " + LocalDateTime.now().toString()); + } catch (IOException ioe) { + LOGGER.warn("Unable to create backup properties file: {}" + ioe.getMessage()); + } + } + } else { + if (backupDirectory != null) { + File backup = getBackupFile(backupDirectory, getAgentName()); + if (backup.exists()) { + Properties props = new Properties(); + try (InputStream is = new FileInputStream(backup)) { + LOGGER.warn("Unable to access primary configuration. Trying backup"); + props.load(is); + configMap = MapResolver.resolveProperties(props); + } catch (IOException ex) { + LOGGER.warn("Error reading backup file: {}", ex.getMessage()); + } + } + } + } + if (configMap != null) { + return new FlumeConfiguration(configMap); + } else { + LOGGER.error("No configuration could be found"); + return null; + } + } + + private File getBackupFile(File backupDirectory, String agentName) { + if (backupDirectory != null) { + return new File(backupDirectory, "." + agentName + ".properties"); + } + return null; + } + + private class WatcherRunnable implements Runnable { + + private List<ConfigurationSource> configurationSources; + private final CounterGroup counterGroup; + private final EventBus eventBus; + + public WatcherRunnable(List<ConfigurationSource> sources, CounterGroup counterGroup, + EventBus eventBus) { + this.configurationSources = sources; + this.counterGroup = counterGroup; + this.eventBus = eventBus; + } + + @Override + public void run() { + LOGGER.debug("Checking for changes to sources"); + + counterGroup.incrementAndGet("uri.checks"); + try { + boolean isModified = false; + for (ConfigurationSource source : configurationSources) { + if (source.isModified()) { + isModified = true; + } + } + if (isModified) { + eventBus.post(getConfiguration()); } + } catch (ConfigurationException ex) { + LOGGER.warn("Unable to update configuration: {}", ex.getMessage()); } } - return new FlumeConfiguration(new HashMap<String, String>()); } } diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java new file mode 100644 index 0000000..24542d4 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/AuthorizationProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node.net; + +import java.net.URLConnection; + +/** + * Interface to be implemented to add an Authorization header to an HTTP request. + */ +public interface AuthorizationProvider { + + void addAuthorization(URLConnection urlConnection); +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java new file mode 100644 index 0000000..aa1fc9a --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/BasicAuthorizationProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node.net; + +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +/** + * Provides the Basic Authorization header to a request. + */ +public class BasicAuthorizationProvider implements AuthorizationProvider { + + private static final Base64.Encoder encoder = Base64.getEncoder(); + + private String authString = null; + + public BasicAuthorizationProvider(String userName, String password) { + if (userName != null && password != null) { + String toEncode = userName + ":" + password; + authString = "Basic " + encoder.encodeToString(toEncode.getBytes(StandardCharsets.UTF_8)); + } + } + + @Override + public void addAuthorization(URLConnection urlConnection) { + if (authString != null) { + urlConnection.setRequestProperty("Authorization", authString); + } + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java new file mode 100644 index 0000000..d1cbaaf --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/LaxHostnameVerifier.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node.net; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLSession; + +/** + * An HostnameVerifier which accepts everything. + */ +public final class LaxHostnameVerifier implements HostnameVerifier { + /** + * Singleton instance. + */ + public static final HostnameVerifier INSTANCE = new LaxHostnameVerifier(); + + private LaxHostnameVerifier() { + } + + @Override + public boolean verify(final String s, final SSLSession sslSession) { + return true; + } +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java new file mode 100644 index 0000000..21e4f26 --- /dev/null +++ b/flume-ng-node/src/main/java/org/apache/flume/node/net/UrlConnectionFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node.net; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import javax.net.ssl.HttpsURLConnection; + +/** + * Constructs an HTTPURLConnection. + */ +public class UrlConnectionFactory { + + private static int DEFAULT_TIMEOUT = 60000; + private static int connectTimeoutMillis = DEFAULT_TIMEOUT; + private static int readTimeoutMillis = DEFAULT_TIMEOUT; + private static final String XML = "application/xml"; + private static final String YAML = "application/yaml"; + private static final String JSON = "application/json"; + private static final String PROPERTIES = "text/x-java-properties"; + private static final String TEXT = "text/plain"; + public static final String HTTP = "http"; + public static final String HTTPS = "https"; + + public static HttpURLConnection createConnection(URL url, + AuthorizationProvider authorizationProvider, long lastModifiedMillis, boolean verifyHost) + throws IOException { + final HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(); + if (HTTPS.equals(url.getProtocol()) && !verifyHost) { + ((HttpsURLConnection) urlConnection).setHostnameVerifier(LaxHostnameVerifier.INSTANCE); + } + if (authorizationProvider != null) { + authorizationProvider.addAuthorization(urlConnection); + } + urlConnection.setAllowUserInteraction(false); + urlConnection.setDoOutput(true); + urlConnection.setDoInput(true); + urlConnection.setRequestMethod("GET"); + if (connectTimeoutMillis > 0) { + urlConnection.setConnectTimeout(connectTimeoutMillis); + } + if (readTimeoutMillis > 0) { + urlConnection.setReadTimeout(readTimeoutMillis); + } + urlConnection.setRequestProperty("Content-Type", getContentType(url)); + if (lastModifiedMillis > 0) { + urlConnection.setIfModifiedSince(lastModifiedMillis); + } + return urlConnection; + } + + public static URLConnection createConnection(URL url) throws IOException { + return createConnection(url, null, 0, true); + } + + public static URLConnection createConnection(URL url, AuthorizationProvider authorizationProvider) + throws IOException { + URLConnection urlConnection = null; + if (url.getProtocol().equals(HTTPS) || url.getProtocol().equals(HTTP)) { + urlConnection = createConnection(url, authorizationProvider, 0, true); + } else { + urlConnection = url.openConnection(); + } + return urlConnection; + } + + private static String getContentType(URL url) { + String[] fileParts = url.getFile().split("\\."); + String type = fileParts[fileParts.length - 1].trim(); + switch (type) { + case "properties": { + return PROPERTIES; + } + case "json": { + return JSON; + } + case "yaml": case "yml": { + return YAML; + } + case "xml": { + return XML; + } + default: { + return TEXT; + } + } + } +} diff --git a/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory new file mode 100644 index 0000000..c49998c --- /dev/null +++ b/flume-ng-node/src/main/resources/META-INF/services/org.apache.flume.node.ConfigurationSourceFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache license, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the license for the specific language governing permissions and +# limitations under the license. +org.apache.flume.node.ClasspathConfigurationSourceFactory +org.apache.flume.node.FileConfigurationSourceFactory +org.apache.flume.node.HttpConfigurationSourceFactory \ No newline at end of file diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java new file mode 100644 index 0000000..5fb208f --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestClasspathConfigurationSource.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.net.URI; +import java.util.Properties; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that files can be loaded from the Classpath. + */ +public class TestClasspathConfigurationSource { + + @Test + public void testClasspath() throws Exception { + URI confFile = new URI("classpath:///flume-conf.properties"); + ConfigurationSource source = new ClasspathConfigurationSource(confFile); + Assert.assertNotNull("No configuration returned", source); + Properties props = new Properties(); + props.load(source.getInputStream()); + String value = props.getProperty("host1.sources"); + Assert.assertNotNull("Missing key", value); + } + + @Test + public void testOddClasspath() throws Exception { + URI confFile = new URI("classpath:/flume-conf.properties"); + ConfigurationSource source = new ClasspathConfigurationSource(confFile); + Assert.assertNotNull("No configuration returned", source); + Properties props = new Properties(); + props.load(source.getInputStream()); + String value = props.getProperty("host1.sources"); + Assert.assertNotNull("Missing key", value); + } + + @Test + public void testImproperClasspath() throws Exception { + URI confFile = new URI("classpath://flume-conf.properties"); + ConfigurationSource source = new ClasspathConfigurationSource(confFile); + Assert.assertNotNull("No configuration returned", source); + Properties props = new Properties(); + props.load(source.getInputStream()); + String value = props.getProperty("host1.sources"); + Assert.assertNotNull("Missing key", value); + } + + @Test + public void testShorthandClasspath() throws Exception { + URI confFile = new URI("classpath:flume-conf.properties"); + ConfigurationSource source = new ClasspathConfigurationSource(confFile); + Assert.assertNotNull("No configuration returned", source); + Properties props = new Properties(); + props.load(source.getInputStream()); + String value = props.getProperty("host1.sources"); + Assert.assertNotNull("Missing key", value); + } +} diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java new file mode 100644 index 0000000..36f95b1 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestEnvLookup.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.File; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; + +import com.google.common.collect.Lists; + +import junit.framework.Assert; + +public class TestEnvLookup { + private static final File TESTFILE = new File( + TestEnvLookup.class.getClassLoader() + .getResource("flume-conf-with-envLookup.properties").getFile()); + private static final String NC_PORT = "6667"; + + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + private UriConfigurationProvider provider; + + @Before + public void setUp() throws Exception { + environmentVariables.set("NC_PORT", NC_PORT); + List<ConfigurationSource> sourceList = + Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI())); + provider = new UriConfigurationProvider("a1", sourceList, null, + null, 0); + } + + @Test + public void getProperty() throws Exception { + + Assert.assertEquals(NC_PORT, provider.getFlumeConfiguration() + .getConfigurationFor("a1") + .getSourceContext().get("r1").getParameters().get("port")); + } +} diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java new file mode 100644 index 0000000..b128116 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestHttpConfigurationSource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.node; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.util.Base64; +import java.util.Enumeration; +import java.util.Properties; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.node.net.AuthorizationProvider; +import org.apache.flume.node.net.BasicAuthorizationProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests that files can be loaded via http. + */ +public class TestHttpConfigurationSource { + + private static final String BASIC = "Basic "; + private static final String expectedCreds = "flume:flume"; + private static Server server; + private static Base64.Decoder decoder = Base64.getDecoder(); + + @BeforeClass + public static void startServer() throws Exception { + try { + server = new Server(1080); + ServletContextHandler context = new ServletContextHandler(); + ServletHolder defaultServ = new ServletHolder("default", TestServlet.class); + defaultServ.setInitParameter("resourceBase", System.getProperty("user.dir")); + defaultServ.setInitParameter("dirAllowed", "true"); + context.addServlet(defaultServ, "/"); + server.setHandler(context); + + // Start Server + server.start(); + } catch (Throwable ex) { + ex.printStackTrace(); + throw ex; + } + } + + @AfterClass + public static void stopServer() throws Exception { + server.stop(); + } + + + @Test(expected = ConfigurationException.class) + public void testBadCrdentials() throws Exception { + URI confFile = new URI("http://localhost/flume-conf.properties"); + AuthorizationProvider authProvider = new BasicAuthorizationProvider("foo", "bar"); + ConfigurationSource source = new HttpConfigurationSource(confFile, authProvider, true); + } + + @Test + public void testGet() throws Exception { + URI confFile = new URI("http://localhost:1080/flume-conf.properties"); + AuthorizationProvider authProvider = new BasicAuthorizationProvider("flume", "flume"); + ConfigurationSource source = new HttpConfigurationSource(confFile, authProvider, true); + Assert.assertNotNull("No configuration returned", source); + InputStream is = source.getInputStream(); + Assert.assertNotNull("No data returned", is); + Properties props = new Properties(); + props.load(is); + String value = props.getProperty("host1.sources"); + Assert.assertNotNull("Missing key", value); + Assert.assertFalse(source.isModified()); + File file = new File("target/test-classes/flume-conf.properties"); + if (file.setLastModified(System.currentTimeMillis())) { + Assert.assertTrue(source.isModified()); + } + } + + public static class TestServlet extends DefaultServlet { + + private static final long serialVersionUID = -2885158530511450659L; + + @Override + protected void doGet(HttpServletRequest request, + HttpServletResponse response) throws ServletException, IOException { + Enumeration<String> headers = request.getHeaders(HttpHeader.AUTHORIZATION.toString()); + if (headers == null) { + response.sendError(401, "No Auth header"); + return; + } + while (headers.hasMoreElements()) { + String authData = headers.nextElement(); + Assert.assertTrue("Not a Basic auth header", authData.startsWith(BASIC)); + String credentials = new String(decoder.decode(authData.substring(BASIC.length()))); + Assert.assertEquals(expectedCreds, credentials); + } + if (request.getServletPath().equals("/flume-conf.properties")) { + File file = new File("target/test-classes/flume-conf.properties"); + long modifiedSince = request.getDateHeader(HttpHeader.IF_MODIFIED_SINCE.toString()); + long lastModified = file.lastModified(); + if (modifiedSince > 0 && lastModified <= modifiedSince) { + response.setStatus(304); + return; + } + response.setDateHeader(HttpHeader.LAST_MODIFIED.toString(), lastModified); + response.setContentLengthLong(file.length()); + Files.copy(file.toPath(), response.getOutputStream()); + response.getOutputStream().flush(); + response.setStatus(200); + } else { + response.sendError(400, "Unsupported request"); + } + } + } +} diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java new file mode 100644 index 0000000..0b5e5e0 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestOverrideFile.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.File; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; + +import com.google.common.collect.Lists; + +import junit.framework.Assert; + +public class TestOverrideFile { + private static final File TESTFILE = new File( + TestOverrideFile.class.getClassLoader() + .getResource("flume-conf-with-recursiveLookup.properties").getFile()); + private static final File OVERRIDEFILE = new File( + TestOverrideFile.class.getClassLoader() + .getResource("flume-conf-override.properties").getFile()); + private static final String BIND = "192.168.13.101"; + + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + private UriConfigurationProvider provider; + + @Before + public void setUp() throws Exception { + System.setProperty("env", "DEV"); + List<ConfigurationSource> sourceList = + Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI()), + new FileConfigurationSource(OVERRIDEFILE.toURI())); + provider = new UriConfigurationProvider("a1", sourceList, null, + null, 0); + } + + @Test + public void getProperty() throws Exception { + + Assert.assertEquals(BIND, provider.getFlumeConfiguration() + .getConfigurationFor("a1") + .getSourceContext().get("r1").getParameters().get("bind")); + } +} diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java index 480f6a5..d20fa35 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java @@ -40,7 +40,7 @@ public class TestPollingPropertiesFileConfigurationProvider { TestPollingPropertiesFileConfigurationProvider.class.getClassLoader() .getResource("flume-conf.properties").getFile()); - private PollingPropertiesFileConfigurationProvider provider; + private UriConfigurationProvider provider; private File baseDir; private File configFile; private EventBus eventBus; @@ -54,9 +54,9 @@ public class TestPollingPropertiesFileConfigurationProvider { Files.copy(TESTFILE, configFile); eventBus = new EventBus("test"); - provider = - new PollingPropertiesFileConfigurationProvider("host1", - configFile, eventBus, 1); + ConfigurationSource source = new FileConfigurationSource(configFile.toURI()); + provider = new UriConfigurationProvider("host1", Lists.newArrayList(source), null, + eventBus, 1); provider.start(); LifecycleController.waitForOneOf(provider, LifecycleState.START_OR_ERROR); } diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java index 4875c56..95c7777 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java @@ -43,11 +43,15 @@ public class TestPropertiesFileConfigurationProvider { TestPropertiesFileConfigurationProvider.class.getClassLoader() .getResource("flume-conf.properties").getFile()); - private PropertiesFileConfigurationProvider provider; + private UriConfigurationProvider provider; + private List<ConfigurationSource> sources; @Before public void setUp() throws Exception { - provider = new PropertiesFileConfigurationProvider("test", TESTFILE); + ConfigurationSource source = new FileConfigurationSource(TESTFILE.toURI()); + sources = Lists.newArrayList(source); + provider = new UriConfigurationProvider("test", sources, null, null, 0); + provider.start(); } @After diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java new file mode 100644 index 0000000..de78ea6 --- /dev/null +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestRecursiveLookup.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.node; + +import java.io.File; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; + +import com.google.common.collect.Lists; + +import junit.framework.Assert; + +public class TestRecursiveLookup { + private static final File TESTFILE = new File( + TestRecursiveLookup.class.getClassLoader() + .getResource("flume-conf-with-recursiveLookup.properties").getFile()); + private static final String BIND = "192.168.11.101"; + + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + private UriConfigurationProvider provider; + + @Before + public void setUp() throws Exception { + System.setProperty("env", "DEV"); + List<ConfigurationSource> sourceList = + Lists.newArrayList(new FileConfigurationSource(TESTFILE.toURI())); + provider = new UriConfigurationProvider("a1", sourceList, null, + null, 0); + } + + @Test + public void getProperty() throws Exception { + + Assert.assertEquals(BIND, provider.getFlumeConfiguration() + .getConfigurationFor("a1") + .getSourceContext().get("r1").getParameters().get("bind")); + } +} diff --git a/flume-ng-node/src/test/resources/flume-conf-override.properties b/flume-ng-node/src/test/resources/flume-conf-override.properties new file mode 100644 index 0000000..1d86d4b --- /dev/null +++ b/flume-ng-node/src/test/resources/flume-conf-override.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +PROD_BIND=192.168.12.110 +DEV_BIND=192.168.13.101 + diff --git a/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties new file mode 100644 index 0000000..20848e9 --- /dev/null +++ b/flume-ng-node/src/test/resources/flume-conf-with-envLookup.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +a1.sources = r1 +a1.sources.r1.type = netcat +a1.sources.r1.bind = 0.0.0.0 +a1.sources.r1.port = ${env:NC_PORT} +a1.sources.r1.channels = c1 + +a1.channels = c1 +a1.channels.c1.type = memory +a1.channels.c1.capacity = 10000 +a1.channels.c1.transactionCapacity = 10000 +a1.channels.c1.byteCapacityBufferPercentage = 20 +a1.channels.c1.byteCapacity = 800000 + +a1.channels = c1 +a1.sinks = k1 +a1.sinks.k1.type = logger +a1.sinks.k1.channel = c1 diff --git a/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties new file mode 100644 index 0000000..c679384 --- /dev/null +++ b/flume-ng-node/src/test/resources/flume-conf-with-recursiveLookup.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +PROD_BIND=192.168.10.110 +DEV_BIND=192.168.11.101 +a1.sources = r1 +a1.sources.r1.type = netcat +a1.sources.r1.bind = ${${sys:env}_BIND} +a1.sources.r1.port = 6667 +a1.sources.r1.channels = c1 + +a1.channels = c1 +a1.channels.c1.type = memory +a1.channels.c1.capacity = 10000 +a1.channels.c1.transactionCapacity = 10000 +a1.channels.c1.byteCapacityBufferPercentage = 20 +a1.channels.c1.byteCapacity = 800000 + +a1.channels = c1 +a1.sinks = k1 +a1.sinks.k1.type = logger +a1.sinks.k1.channel = c1 diff --git a/pom.xml b/pom.xml index 2dc3dba..9a9e492 100644 --- a/pom.xml +++ b/pom.xml @@ -52,13 +52,14 @@ limitations under the License. <bundle-plugin.version>2.3.7</bundle-plugin.version> <checkstyle.tool.version>8.12</checkstyle.tool.version> <codehaus.jackson.version>1.9.13</codehaus.jackson.version> - <commons-cli.version>1.2</commons-cli.version> + <commons-cli.version>1.4</commons-cli.version> <commons-codec.version>1.8</commons-codec.version> <commons-collections.version>3.2.2</commons-collections.version> <commons-compress.version>1.4.1</commons-compress.version> <commons-dbcp.version>1.4</commons-dbcp.version> <commons-io.version>2.1</commons-io.version> <commons-lang.version>2.5</commons-lang.version> + <commons-text.version>1.9</commons-text.version> <curator.version>5.1.0</curator.version> <derby.version>10.14.1.0</derby.version> <dropwizard-metrics.version>4.1.18</dropwizard-metrics.version> @@ -767,6 +768,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-text</artifactId> + <version>${commons-text.version}</version> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava-old.version}</version> @@ -1765,6 +1772,30 @@ limitations under the License. <version>${dropwizard-metrics.version}</version> </dependency> + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>1.0</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-annotations</artifactId> + <version>${mvn-spotbugs-plugin.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.mock-server</groupId> + <artifactId>mockserver-netty</artifactId> + <version>3.10.8</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mock-server</groupId> + <artifactId>mockserver-client-java</artifactId> + <version>3.10.8</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement>
