Repository: flume Updated Branches: refs/heads/trunk 7f85df9e4 -> beb11e598
http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java new file mode 100644 index 0000000..1a7abf5 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.channel; + +/** + * This is a mock to avoid the circular dependency in tests + * TODO fix wrong dependency directions in the project config should not depend on an implementation + */ +public class MemoryChannelConfiguration extends ChannelConfiguration { + public MemoryChannelConfiguration(String componentName) { + super(componentName); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java new file mode 100644 index 0000000..d480f44 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.configfilter; + +/** + * This is a mock to avoid the circular dependency in tests + * TODO fix wrong dependency directions in the project config should not depend on an implementation + */ +public class EnvironmentVariableConfigFilterConfiguration extends ConfigFilterConfiguration { + public EnvironmentVariableConfigFilterConfiguration(String componentName) { + super(componentName); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java new file mode 100644 index 0000000..95742a2 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.configfilter; + +import org.apache.flume.configfilter.AbstractConfigFilter; + +import java.util.Map; + +public class MockConfigFilter extends AbstractConfigFilter { + @Override + public String filter(String key) { + + if (key.equals("null")) { + return null; + } + + if (key.equals("throw")) { + throw new IllegalStateException("Test exception"); + } + + return "filtered_" + key; + } + + @Override + public void initializeWithConfiguration(Map<String, String> configuration) { + + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java new file mode 100644 index 0000000..36fc249 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.sink; + +/** + * This is a mock to avoid the circular dependency in tests + * TODO fix wrong dependency directions in the project config should not depend on an implementation + */ +public class NullSinkConfiguration extends SinkConfiguration { + public NullSinkConfiguration(String componentName) { + super(componentName); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java new file mode 100644 index 0000000..1a5f686 --- /dev/null +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.source.jms; + +import org.apache.flume.conf.source.SourceConfiguration; + +/** + * This is a mock to avoid the circular dependency in tests + * TODO fix wrong dependency directions in the project config should not depend on an implementation + */ +public class JMSSourceConfiguration extends SourceConfiguration { + public JMSSourceConfiguration(String componentName) { + super(componentName); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index b3b6faa..1140e1d 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -197,6 +197,22 @@ <artifactId>flume-taildir-source</artifactId> </dependency> <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-environment-variable-config-filter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-hadoop-credential-store-config-filter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-external-process-config-filter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-config-filter-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <exclusions> http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 909fe4a..4e70bcc 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -14,9 +14,9 @@ limitations under the License. -====================================== +=============================== Flume 1.9.0-SNAPSHOT User Guide -====================================== +=============================== Introduction ============ @@ -926,7 +926,7 @@ invoked directly. Common values for 'shell' : '/bin/sh -c', '/bin/ksh -c', a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done JMS Source -~~~~~~~~~~~ +~~~~~~~~~~ JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. @@ -964,7 +964,7 @@ durableSubscriptionName -- Name used to identify the durable subsc Converter -''''''''''' +''''''''' The JMS source allows pluggable converters, though it's likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the @@ -1152,7 +1152,7 @@ deserializer.maxBlobLength 100000000 The maximum number of bytes to r ========================== ================== ======================================================================= Taildir Source -~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~ .. note:: **This source is provided as a preview feature. It does not work on Windows.** Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. @@ -1253,7 +1253,7 @@ Example for agent named a1: a1.sources.r1.maxBatchDurationMillis = 200 Kafka Source -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~ Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group @@ -4067,7 +4067,7 @@ Example for agent named a1: Remove Header Interceptor -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified. @@ -4085,7 +4085,7 @@ matching -- All the headers which names match this regul UUID Interceptor -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~ This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is ``b5755073-77a9-43c1-8fad-b7a586fc1b97``, which represents a 128-bit value. @@ -4101,7 +4101,7 @@ prefix "" The prefix string constant to prepend to each generat ================ ======= ======================================================================== Morphline Interceptor -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~ This interceptor filters the events through a `morphline configuration file <http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that defines a chain of transformation commands that pipe records from one command to another. For example the morphline can ignore certain events or alter or insert certain event headers via regular expression based pattern matching, or it can auto-detect and set a MIME type via Apache Tika on events that are intercepted. For example, this kind of packet sniffing can be used for content based dynamic routing in a Flume topology. @@ -4273,6 +4273,160 @@ config file and this is not the first time the file is polled, then the agent makes no config changes for this polling period. The agent continues polling rather than terminating. +Configuration Filters +===================== + +Flume provides a tool for injecting sensitive or generated data into the configuration +in the form of configuration filters. A configuration key can be set as the value of configuration properties +and it will be replaced by the configuration filter with the value it represents. + +Common usage of config filters +------------------------------ + +The format is similar to the Java Expression Language, however +it is currently not a fully working EL expression parser, just a format that looks like it. + +.. code-block:: properties + + <agent_name>.configfilters = <filter_name> + <agent_name>.configfilters.<filter_name>.type = <filter_type> + + <agent_name>.sources.<source_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']} + <agent_name>.sinks.<sink_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']} + <agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']} + #or + <agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>["<key_for_sensitive_or_generated_data>"]} + #or + <agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>[<key_for_sensitive_or_generated_data>]} + #or + <agent_name>.<component_type>.<component_name>.parameter = some_constant_data${<filter_name>[<key_for_sensitive_or_generated_data>]} + + +Environment Variable Config Filter +---------------------------------- + +================================ ========== ============================================== +Property Name Default Description +================================ ========== ============================================== +**type** -- The component type name has to be ``env`` +================================ ========== ============================================== + +Example +~~~~~~~ +To hide a password in the configuration set its value as in the following example. + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.configfilters = f1 + + a1.configfilters.f1.type = env + + a1.sources.r1.channels = c1 + a1.sources.r1.type = http + a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123 + +Here the ``a1.sources.r1.keystorePassword`` configuration property will get the value of the ``my_keystore_password`` +environment variable. One way to set the environment variable is to run flume agent like this: + +``$ my_keystore_password=Secret123 bin/flume-ng agent --conf conf --conf-file example.conf ...`` + + + +External Process Config Filter +------------------------------ + +================================ ========== ============================================== +Property Name Default Description +================================ ========== ============================================== +**type** -- The component type name has to be ``external`` +**command** -- The command that will be executed to get the value for the given key. The command will be called like: ``<command> <key>`` And expected to return a single line value with exit code ``0``. +charset UTF-8 The characterset of the returned string. +================================ ========== ============================================== + +Example +~~~~~~~ +To hide a password in the configuration set its value as in the following example. + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.configfilters = f1 + + a1.configfilters.f1.type = external + a1.configfilters.f1.command = /usr/bin/passwordResolver.sh + a1.configfilters.f1.charset = UTF-8 + + a1.sources.r1.channels = c1 + a1.sources.r1.type = http + a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123 + +In this example flume will run the following command to get the value + +``$ /usr/bin/passwordResolver.sh my_keystore_password`` + +The ``passwordResolver.sh`` will return ``Secret123`` with an exit code ``0``. + +Example 2 +~~~~~~~~~ +To generate a part of the directory for rolling file sink set its value as in the following example. + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.configfilters = f1 + + a1.configfilters.f1.type = external + a1.configfilters.f1.command = /usr/bin/generateUniqId.sh + a1.configfilters.f1.charset = UTF-8 + + a1.sinks = k1 + a1.sinks.k1.type = file_roll + a1.sinks.k1.channel = c1 + a1.sinks.k1.sink.directory = /var/log/flume/agent_${f1['agent_name']} # will be /var/log/flume/agent_1234 + +In this example flume will run the following command to get the value + +``$ /usr/bin/generateUniqId.sh agent_name`` + +The ``generateUniqId.sh`` will return ``1234`` with an exit code ``0``. + + + +Hadoop Credential Store Config Filter +------------------------------------- + +=============================================== ========== ============================================== +Property Name Default Description +=============================================== ========== ============================================== +**type** -- The component type name has to be ``hadoop`` +**credential.provider.path** -- The provider path. See hadoop documentation _here: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html#Configuring_the_Provider_Path +credstore.java-keystore-provider.password-file -- The name of the password file if a file is used to store the password. The file must e on the classpath. + Provider password can be set with the HADOOP_CREDSTORE_PASSWORD environment variable or left empty. +=============================================== ========== ============================================== + +Example +~~~~~~~ +To hide a password in the configuration set its value as in the following example. + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.configfilters = f1 + + a1.configfilters.f1.type = hadoop + a1.configfilters.f1.credential.provider.path = jceks://file/<path_to_jceks file> + + a1.sources.r1.channels = c1 + a1.sources.r1.type = http + a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value from the credential store + + + Log4J Appender ============== http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml index ee3f4f4..91653a0 100644 --- a/flume-ng-tests/pom.xml +++ b/flume-ng-tests/pom.xml @@ -84,6 +84,13 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + + <dependency> + <groupId>com.github.stefanbirkner</groupId> + <artifactId>system-rules</artifactId> + <version>1.17.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java new file mode 100644 index 0000000..b82c4b6 --- /dev/null +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.test.agent; + +import org.apache.flume.test.util.StagedInstall; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.alias.CredentialShell; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Scanner; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestConfigFilters { + private static final Logger LOGGER = + LoggerFactory.getLogger(TestConfigFilters.class); + + @ClassRule + public static final EnvironmentVariables environmentVariables + = new EnvironmentVariables(); + + private Properties agentProps; + private Map<String, String> agentEnv; + private File sinkOutputDir1; + private File sinkOutputDir2; + private File sinkOutputDir3; + private File hadoopCredStore; + + @Before + public void setup() throws Exception { + + File agentDir = StagedInstall.getInstance().getStageDir(); + LOGGER.debug("Using agent stage dir: {}", agentDir); + + File testDir = new File(agentDir, TestConfigFilters.class.getName()); + assertTrue(testDir.mkdirs()); + + agentProps = new Properties(); + agentEnv = new HashMap<>(); + + // Create the rest of the properties file + agentProps.put("agent.sources.seq-01.type", "seq"); + agentProps.put("agent.sources.seq-01.totalEvents", "100"); + agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03"); + agentProps.put("agent.channels.mem-01.type", "MEMORY"); + agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000)); + agentProps.put("agent.channels.mem-02.type", "MEMORY"); + agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000)); + agentProps.put("agent.channels.mem-03.type", "MEMORY"); + agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000)); + + sinkOutputDir1 = new File(testDir, "out1"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(), + sinkOutputDir1.mkdir()); + sinkOutputDir2 = new File(testDir, "out2"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(), + sinkOutputDir2.mkdir()); + sinkOutputDir3 = new File(testDir, "out3"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(), + sinkOutputDir3.mkdir()); + + environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret"); + + agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath()); + agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret"); + + hadoopCredStore = new File(testDir, "credstore.jceks"); + String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath(); + + ToolRunner.run( + new Configuration(), new CredentialShell(), + ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath() + + " -provider " + providerPath).split(" ")); + + + agentProps.put("agent.sinks.roll-01.channel", "mem-01"); + agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-01.sink.directory", "${filter-01[\"dirname_env\"]}"); + agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0"); + agentProps.put("agent.sinks.roll-02.channel", "mem-02"); + agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-02.sink.directory", + sinkOutputDir2.getParentFile().getAbsolutePath() + "/${filter-02['out2']}"); + agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0"); + agentProps.put("agent.sinks.roll-03.channel", "mem-03"); + agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-03.sink.directory", "${filter-03[dirname_hadoop]}"); + agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0"); + + agentProps.put("agent.configfilters.filter-01.type", "env"); + agentProps.put("agent.configfilters.filter-02.type", "external"); + agentProps.put("agent.configfilters.filter-02.command", "echo"); + agentProps.put("agent.configfilters.filter-03.type", "hadoop"); + agentProps.put("agent.configfilters.filter-03.credential.provider.path", providerPath); + + agentProps.put("agent.sources", "seq-01"); + agentProps.put("agent.channels", "mem-01 mem-02 mem-03"); + agentProps.put("agent.sinks", "roll-01 roll-02 roll-03"); + agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03"); + } + + @After + public void teardown() throws Exception { + StagedInstall.getInstance().stopAgent(); + } + + private void validateSeenEvents(File outDir, int outFiles, int events) + throws IOException { + File[] sinkOutputDirChildren = outDir.listFiles(); + assertEquals("Unexpected number of files in output dir", + outFiles, sinkOutputDirChildren.length); + Set<String> seenEvents = new HashSet<>(); + for (File outFile : sinkOutputDirChildren) { + Scanner scanner = new Scanner(outFile); + while (scanner.hasNext()) { + seenEvents.add(scanner.nextLine()); + } + } + for (int event = 0; event < events; event++) { + assertTrue( + "Missing event: {" + event + "}", + seenEvents.contains(String.valueOf(event)) + ); + } + } + + @Test + public void testConfigReplacement() throws Exception { + LOGGER.debug("testConfigReplacement() started."); + + StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv); + + TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files + + // Ensure we received all events. + validateSeenEvents(sinkOutputDir1, 1, 100); + validateSeenEvents(sinkOutputDir2, 1, 100); + validateSeenEvents(sinkOutputDir3, 1, 100); + LOGGER.debug("Processed all the events!"); + + LOGGER.debug("testConfigReplacement() ended."); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java index ce586b8..50b49cc 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java @@ -35,6 +35,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -120,7 +121,12 @@ public class StagedInstall { return port; } - public synchronized void startAgent(String name, Properties properties) + public synchronized void startAgent(String name, Properties properties) throws Exception { + startAgent(name, properties, new HashMap<>()); + } + + public synchronized void startAgent( + String name, Properties properties, Map<String, String> environmentVariables) throws Exception { Preconditions.checkArgument(!name.isEmpty(), "agent name must not be empty"); Preconditions.checkNotNull(properties, "properties object must not be null"); @@ -159,6 +165,7 @@ public class StagedInstall { ProcessBuilder pb = new ProcessBuilder(cmdArgs); Map<String, String> env = pb.environment(); + env.putAll(environmentVariables); LOGGER.debug("process environment: " + env); http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0cd174c..0f656a5 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ limitations under the License. <gson.version>2.2.2</gson.version> <guava.version>18.0</guava.version> <guava-old.version>11.0.2</guava-old.version> - <hadoop2.version>2.4.0</hadoop2.version> + <hadoop2.version>2.9.0</hadoop2.version> <httpcore.version>4.4.6</httpcore.version> <httpclient.version>4.5.3</httpclient.version> <irclib.version>1.10</irclib.version> @@ -129,6 +129,7 @@ limitations under the License. <module>flume-tools</module> <module>flume-ng-auth</module> <module>flume-shared</module> + <module>flume-ng-configfilters</module> </modules> <profiles> @@ -1580,6 +1581,30 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-environment-variable-config-filter</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-hadoop-credential-store-config-filter</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-external-process-config-filter</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-config-filter-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-core</artifactId> <version>${jersey.version}</version>