Repository: flume
Updated Branches:
  refs/heads/trunk 7f85df9e4 -> beb11e598


http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java
new file mode 100644
index 0000000..1a7abf5
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/channel/MemoryChannelConfiguration.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.channel;
+
+/**
+ * This is a mock to avoid the circular dependency in tests
+ * TODO fix wrong dependency directions in the project config should not 
depend on an implementation
+ */
+public class MemoryChannelConfiguration extends ChannelConfiguration {
+  public MemoryChannelConfiguration(String componentName) {
+    super(componentName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java
new file mode 100644
index 0000000..d480f44
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/EnvironmentVariableConfigFilterConfiguration.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.configfilter;
+
+/**
+ * This is a mock to avoid the circular dependency in tests
+ * TODO fix wrong dependency directions in the project config should not 
depend on an implementation
+ */
+public class EnvironmentVariableConfigFilterConfiguration extends 
ConfigFilterConfiguration {
+  public EnvironmentVariableConfigFilterConfiguration(String componentName) {
+    super(componentName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java
new file mode 100644
index 0000000..95742a2
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/configfilter/MockConfigFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.configfilter;
+
+import org.apache.flume.configfilter.AbstractConfigFilter;
+
+import java.util.Map;
+
+public class MockConfigFilter extends AbstractConfigFilter {
+  @Override
+  public String filter(String key) {
+
+    if (key.equals("null")) {
+      return null;
+    }
+
+    if (key.equals("throw")) {
+      throw new IllegalStateException("Test exception");
+    }
+
+    return "filtered_" + key;
+  }
+
+  @Override
+  public void initializeWithConfiguration(Map<String, String> configuration) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java
new file mode 100644
index 0000000..36fc249
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/sink/NullSinkConfiguration.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.sink;
+
+/**
+ * This is a mock to avoid the circular dependency in tests
+ * TODO fix wrong dependency directions in the project config should not 
depend on an implementation
+ */
+public class NullSinkConfiguration extends SinkConfiguration {
+  public NullSinkConfiguration(String componentName) {
+    super(componentName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java
 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java
new file mode 100644
index 0000000..1a5f686
--- /dev/null
+++ 
b/flume-ng-configuration/src/test/java/org/apache/flume/conf/source/jms/JMSSourceConfiguration.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.flume.conf.source.jms;
+
+import org.apache.flume.conf.source.SourceConfiguration;
+
+/**
+ * This is a mock to avoid the circular dependency in tests
+ * TODO fix wrong dependency directions in the project config should not 
depend on an implementation
+ */
+public class JMSSourceConfiguration extends SourceConfiguration {
+  public JMSSourceConfiguration(String componentName) {
+    super(componentName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index b3b6faa..1140e1d 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -197,6 +197,22 @@
       <artifactId>flume-taildir-source</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-environment-variable-config-filter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-hadoop-credential-store-config-filter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-external-process-config-filter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-config-filter-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-clients</groupId>
       <artifactId>flume-ng-log4jappender</artifactId>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 909fe4a..4e70bcc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -14,9 +14,9 @@
    limitations under the License.
 
 
-======================================
+===============================
 Flume 1.9.0-SNAPSHOT User Guide
-======================================
+===============================
 
 Introduction
 ============
@@ -926,7 +926,7 @@ invoked directly.  Common values for 'shell' :  '/bin/sh 
-c', '/bin/ksh -c',
   a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
 
 JMS Source
-~~~~~~~~~~~
+~~~~~~~~~~
 
 JMS Source reads messages from a JMS destination such as a queue or topic. 
Being a JMS
 application it should work with any JMS provider but has only been tested with 
ActiveMQ.
@@ -964,7 +964,7 @@ durableSubscriptionName     --           Name used to 
identify the durable subsc
 
 
 Converter
-'''''''''''
+'''''''''
 The JMS source allows pluggable converters, though it's likely the default 
converter will work
 for most purposes. The default converter is able to convert Bytes, Text, and 
Object messages
 to FlumeEvents. In all cases, the properties in the message are added as 
headers to the
@@ -1152,7 +1152,7 @@ deserializer.maxBlobLength  100000000           The 
maximum number of bytes to r
 ==========================  ==================  
=======================================================================
 
 Taildir Source
-~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~
 .. note:: **This source is provided as a preview feature. It does not work on 
Windows.**
 
 Watch the specified files, and tail them in nearly real-time once detected new 
lines appended to the each files.
@@ -1253,7 +1253,7 @@ Example for agent named a1:
   a1.sources.r1.maxBatchDurationMillis = 200
 
 Kafka Source
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~
 
 Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the 
same Consumer Group
@@ -4067,7 +4067,7 @@ Example for agent named a1:
 
 
 Remove Header Interceptor
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~~~~~~~~~~~~
 
 This interceptor manipulates Flume event headers, by removing one or many 
headers. It can remove a statically defined header, headers based on a regular 
expression or headers in a list. If none of these is defined, or if no header 
matches the criteria, the Flume events are not modified.
 
@@ -4085,7 +4085,7 @@ matching               --           All the headers which 
names match this regul
 
 
 UUID Interceptor
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~~~
 
 This interceptor sets a universally unique identifier on all events that are 
intercepted. An example UUID is ``b5755073-77a9-43c1-8fad-b7a586fc1b97``, which 
represents a 128-bit value.
 
@@ -4101,7 +4101,7 @@ prefix            ""       The prefix string constant to 
prepend to each generat
 ================  =======  
========================================================================
 
 Morphline Interceptor
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~~~~~~~~
 
 This interceptor filters the events through a `morphline configuration file 
<http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that 
defines a chain of transformation commands that pipe records from one command 
to another.
 For example the morphline can ignore certain events or alter or insert certain 
event headers via regular expression based pattern matching, or it can 
auto-detect and set a MIME type via Apache Tika on events that are intercepted. 
For example, this kind of packet sniffing can be used for content based dynamic 
routing in a Flume topology.
@@ -4273,6 +4273,160 @@ config file and this is not the first time the file is 
polled, then the
 agent makes no config changes for this polling period. The agent continues
 polling rather than terminating.
 
+Configuration Filters
+=====================
+
+Flume provides a tool for injecting sensitive or generated data into the 
configuration
+in the form of configuration filters. A configuration key can be set as the 
value of configuration properties
+and it will be replaced by the configuration filter with the value it 
represents.
+
+Common usage of config filters
+------------------------------
+
+The format is similar to the Java Expression Language, however
+it is currently not a fully working EL expression parser, just a format that 
looks like it.
+
+.. code-block:: properties
+
+  <agent_name>.configfilters = <filter_name>
+  <agent_name>.configfilters.<filter_name>.type = <filter_type>
+
+  <agent_name>.sources.<source_name>.parameter = 
${<filter_name>['<key_for_sensitive_or_generated_data>']}
+  <agent_name>.sinks.<sink_name>.parameter = 
${<filter_name>['<key_for_sensitive_or_generated_data>']}
+  <agent_name>.<component_type>.<component_name>.parameter = 
${<filter_name>['<key_for_sensitive_or_generated_data>']}
+  #or
+  <agent_name>.<component_type>.<component_name>.parameter = 
${<filter_name>["<key_for_sensitive_or_generated_data>"]}
+  #or
+  <agent_name>.<component_type>.<component_name>.parameter = 
${<filter_name>[<key_for_sensitive_or_generated_data>]}
+  #or
+  <agent_name>.<component_type>.<component_name>.parameter = 
some_constant_data${<filter_name>[<key_for_sensitive_or_generated_data>]}
+
+
+Environment Variable Config Filter
+----------------------------------
+
+================================ ========== 
==============================================
+Property Name                    Default                        Description
+================================ ========== 
==============================================
+**type**                         --         The component type name has to be 
``env``
+================================ ========== 
==============================================
+
+Example
+~~~~~~~
+To hide a password in the configuration set its value as in the following 
example.
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.configfilters = f1
+
+  a1.configfilters.f1.type = env
+
+  a1.sources.r1.channels =  c1
+  a1.sources.r1.type = http
+  a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the 
value Secret123
+
+Here the ``a1.sources.r1.keystorePassword`` configuration property will get 
the value of the ``my_keystore_password``
+environment variable. One way to set the environment variable is to run flume 
agent like this:
+
+``$ my_keystore_password=Secret123 bin/flume-ng agent --conf conf --conf-file 
example.conf ...``
+
+
+
+External Process Config Filter
+------------------------------
+
+================================ ========== 
==============================================
+Property Name                    Default                        Description
+================================ ========== 
==============================================
+**type**                         --         The component type name has to be 
``external``
+**command**                      --         The command that will be executed 
to get the value for the given key. The command will be called like: 
``<command> <key>`` And expected to return a single line value with exit code 
``0``.
+charset                          UTF-8      The characterset of the returned 
string.
+================================ ========== 
==============================================
+
+Example
+~~~~~~~
+To hide a password in the configuration set its value as in the following 
example.
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.configfilters = f1
+
+  a1.configfilters.f1.type = external
+  a1.configfilters.f1.command = /usr/bin/passwordResolver.sh
+  a1.configfilters.f1.charset = UTF-8
+
+  a1.sources.r1.channels =  c1
+  a1.sources.r1.type = http
+  a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the 
value Secret123
+
+In this example flume will run the following command to get the value
+
+``$ /usr/bin/passwordResolver.sh my_keystore_password``
+
+The ``passwordResolver.sh`` will return ``Secret123`` with an exit code ``0``.
+
+Example 2
+~~~~~~~~~
+To generate a part of the directory for rolling file sink set its value as in 
the following example.
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.configfilters = f1
+
+  a1.configfilters.f1.type = external
+  a1.configfilters.f1.command = /usr/bin/generateUniqId.sh
+  a1.configfilters.f1.charset = UTF-8
+
+  a1.sinks = k1
+  a1.sinks.k1.type = file_roll
+  a1.sinks.k1.channel = c1
+  a1.sinks.k1.sink.directory = /var/log/flume/agent_${f1['agent_name']} # will 
be /var/log/flume/agent_1234
+
+In this example flume will run the following command to get the value
+
+``$ /usr/bin/generateUniqId.sh agent_name``
+
+The ``generateUniqId.sh`` will return ``1234`` with an exit code ``0``.
+
+
+
+Hadoop Credential Store Config Filter
+-------------------------------------
+
+=============================================== ========== 
==============================================
+Property Name                                   Default                        
Description
+=============================================== ========== 
==============================================
+**type**                                        --         The component type 
name has to be ``hadoop``
+**credential.provider.path**                    --         The provider path. 
See hadoop documentation _here: 
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html#Configuring_the_Provider_Path
+credstore.java-keystore-provider.password-file  --         The name of the 
password file if a file is used to store the password. The file must e on the 
classpath.
+                                                           Provider password 
can be set with the HADOOP_CREDSTORE_PASSWORD environment variable or left 
empty.
+=============================================== ========== 
==============================================
+
+Example
+~~~~~~~
+To hide a password in the configuration set its value as in the following 
example.
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.configfilters = f1
+
+  a1.configfilters.f1.type = hadoop
+  a1.configfilters.f1.credential.provider.path = jceks://file/<path_to_jceks 
file>
+
+  a1.sources.r1.channels =  c1
+  a1.sources.r1.type = http
+  a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the 
value from the credential store
+
+
+
 Log4J Appender
 ==============
 

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml
index ee3f4f4..91653a0 100644
--- a/flume-ng-tests/pom.xml
+++ b/flume-ng-tests/pom.xml
@@ -84,6 +84,13 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.github.stefanbirkner</groupId>
+      <artifactId>system-rules</artifactId>
+      <version>1.17.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
new file mode 100644
index 0000000..b82c4b6
--- /dev/null
+++ 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestConfigFilters.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import org.apache.flume.test.util.StagedInstall;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.alias.CredentialShell;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestConfigFilters {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TestConfigFilters.class);
+
+  @ClassRule
+  public static final EnvironmentVariables environmentVariables
+      = new EnvironmentVariables();
+
+  private Properties agentProps;
+  private Map<String, String> agentEnv;
+  private File sinkOutputDir1;
+  private File sinkOutputDir2;
+  private File sinkOutputDir3;
+  private File hadoopCredStore;
+
+  @Before
+  public void setup() throws Exception {
+
+    File agentDir = StagedInstall.getInstance().getStageDir();
+    LOGGER.debug("Using agent stage dir: {}", agentDir);
+
+    File testDir = new File(agentDir, TestConfigFilters.class.getName());
+    assertTrue(testDir.mkdirs());
+
+    agentProps = new Properties();
+    agentEnv = new HashMap<>();
+
+    // Create the rest of the properties file
+    agentProps.put("agent.sources.seq-01.type", "seq");
+    agentProps.put("agent.sources.seq-01.totalEvents", "100");
+    agentProps.put("agent.sources.seq-01.channels", "mem-01 mem-02 mem-03");
+    agentProps.put("agent.channels.mem-01.type", "MEMORY");
+    agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000));
+    agentProps.put("agent.channels.mem-02.type", "MEMORY");
+    agentProps.put("agent.channels.mem-02.capacity", String.valueOf(100000));
+    agentProps.put("agent.channels.mem-03.type", "MEMORY");
+    agentProps.put("agent.channels.mem-04.capacity", String.valueOf(100000));
+
+    sinkOutputDir1 = new File(testDir, "out1");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir1.getPath(),
+        sinkOutputDir1.mkdir());
+    sinkOutputDir2 = new File(testDir, "out2");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir2.getPath(),
+        sinkOutputDir2.mkdir());
+    sinkOutputDir3 = new File(testDir, "out3");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir3.getPath(),
+        sinkOutputDir3.mkdir());
+
+    environmentVariables.set("HADOOP_CREDSTORE_PASSWORD", "envSecret");
+
+    agentEnv.put("dirname_env", sinkOutputDir1.getAbsolutePath());
+    agentEnv.put("HADOOP_CREDSTORE_PASSWORD", "envSecret");
+
+    hadoopCredStore = new File(testDir, "credstore.jceks");
+    String providerPath = "jceks://file/" + hadoopCredStore.getAbsolutePath();
+
+    ToolRunner.run(
+        new Configuration(), new CredentialShell(),
+        ("create dirname_hadoop -value " + sinkOutputDir3.getAbsolutePath()
+            + " -provider " + providerPath).split(" "));
+
+
+    agentProps.put("agent.sinks.roll-01.channel", "mem-01");
+    agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-01.sink.directory", 
"${filter-01[\"dirname_env\"]}");
+    agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0");
+    agentProps.put("agent.sinks.roll-02.channel", "mem-02");
+    agentProps.put("agent.sinks.roll-02.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-02.sink.directory",
+        sinkOutputDir2.getParentFile().getAbsolutePath() + 
"/${filter-02['out2']}");
+    agentProps.put("agent.sinks.roll-02.sink.rollInterval", "0");
+    agentProps.put("agent.sinks.roll-03.channel", "mem-03");
+    agentProps.put("agent.sinks.roll-03.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-03.sink.directory", 
"${filter-03[dirname_hadoop]}");
+    agentProps.put("agent.sinks.roll-03.sink.rollInterval", "0");
+
+    agentProps.put("agent.configfilters.filter-01.type", "env");
+    agentProps.put("agent.configfilters.filter-02.type", "external");
+    agentProps.put("agent.configfilters.filter-02.command", "echo");
+    agentProps.put("agent.configfilters.filter-03.type", "hadoop");
+    agentProps.put("agent.configfilters.filter-03.credential.provider.path", 
providerPath);
+
+    agentProps.put("agent.sources", "seq-01");
+    agentProps.put("agent.channels", "mem-01 mem-02 mem-03");
+    agentProps.put("agent.sinks", "roll-01 roll-02 roll-03");
+    agentProps.put("agent.configfilters", "filter-01 filter-02 filter-03");
+  }
+
+  @After
+  public void teardown() throws Exception {
+    StagedInstall.getInstance().stopAgent();
+  }
+
+  private void validateSeenEvents(File outDir, int outFiles, int events)
+      throws IOException {
+    File[] sinkOutputDirChildren = outDir.listFiles();
+    assertEquals("Unexpected number of files in output dir",
+        outFiles, sinkOutputDirChildren.length);
+    Set<String> seenEvents = new HashSet<>();
+    for (File outFile : sinkOutputDirChildren) {
+      Scanner scanner = new Scanner(outFile);
+      while (scanner.hasNext()) {
+        seenEvents.add(scanner.nextLine());
+      }
+    }
+    for (int event = 0; event < events; event++) {
+      assertTrue(
+          "Missing event: {" + event + "}",
+          seenEvents.contains(String.valueOf(event))
+      );
+    }
+  }
+
+  @Test
+  public void testConfigReplacement() throws Exception {
+    LOGGER.debug("testConfigReplacement() started.");
+
+    StagedInstall.getInstance().startAgent("agent", agentProps, agentEnv);
+
+    TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
+
+    // Ensure we received all events.
+    validateSeenEvents(sinkOutputDir1, 1, 100);
+    validateSeenEvents(sinkOutputDir2, 1, 100);
+    validateSeenEvents(sinkOutputDir3, 1, 100);
+    LOGGER.debug("Processed all the events!");
+
+    LOGGER.debug("testConfigReplacement() ended.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java 
b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index ce586b8..50b49cc 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -35,6 +35,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -120,7 +121,12 @@ public class StagedInstall {
     return port;
   }
 
-  public synchronized void startAgent(String name, Properties properties)
+  public synchronized void startAgent(String name, Properties properties) 
throws Exception {
+    startAgent(name, properties, new HashMap<>());
+  }
+
+  public synchronized void startAgent(
+      String name, Properties properties,  Map<String, String> 
environmentVariables)
       throws Exception {
     Preconditions.checkArgument(!name.isEmpty(), "agent name must not be 
empty");
     Preconditions.checkNotNull(properties, "properties object must not be 
null");
@@ -159,6 +165,7 @@ public class StagedInstall {
     ProcessBuilder pb = new ProcessBuilder(cmdArgs);
 
     Map<String, String> env = pb.environment();
+    env.putAll(environmentVariables);
 
     LOGGER.debug("process environment: " + env);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/beb11e59/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0cd174c..0f656a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@ limitations under the License.
     <gson.version>2.2.2</gson.version>
     <guava.version>18.0</guava.version>
     <guava-old.version>11.0.2</guava-old.version>
-    <hadoop2.version>2.4.0</hadoop2.version>
+    <hadoop2.version>2.9.0</hadoop2.version>
     <httpcore.version>4.4.6</httpcore.version>
     <httpclient.version>4.5.3</httpclient.version>
     <irclib.version>1.10</irclib.version>
@@ -129,6 +129,7 @@ limitations under the License.
     <module>flume-tools</module>
     <module>flume-ng-auth</module>
     <module>flume-shared</module>
+      <module>flume-ng-configfilters</module>
   </modules>
 
   <profiles>
@@ -1580,6 +1581,30 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-environment-variable-config-filter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-hadoop-credential-store-config-filter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-external-process-config-filter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-config-filter-api</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.sun.jersey</groupId>
         <artifactId>jersey-core</artifactId>
         <version>${jersey.version}</version>

Reply via email to