Repository: flume
Updated Branches:
  refs/heads/trunk 3ad7d2764 -> 25e4bc6d8 (forced update)


FLUME-2954. Make raw data appearing in log messages explicit

Flume has built-in functionality to log data flowing through, mainly for
debugging purposes. This functionality appears in several places in the
code base. Such functionality can raise security concerns in production
environments where sensitive information might be ingested so it is
crucial that enabling such functionality be as explicit as possible.

This patch adds two system properties, one to enable logging of Flume
configuration properties and one to enable logging of raw data. If they
are not set, these items are never logged at any log4j logging level.

Reviewers: Balázs Donát Bessenyei, Denes Arvay, Mike Percy

(Attila Simon via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/25e4bc6d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/25e4bc6d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/25e4bc6d

Branch: refs/heads/trunk
Commit: 25e4bc6d80cf475862a1686fb2c3c97fcea27278
Parents: 1e8f265
Author: Attila Simon <[email protected]>
Authored: Mon Aug 29 12:15:56 2016 -0700
Committer: Mike Percy <[email protected]>
Committed: Mon Aug 29 12:34:49 2016 -0700

----------------------------------------------------------------------
 conf/flume-env.ps1.template                     |  5 ++
 conf/flume-env.sh.template                      |  5 ++
 .../jdbc/impl/JdbcChannelProviderImpl.java      |  9 +--
 .../flume/channel/kafka/KafkaChannel.java       |  8 +-
 .../apache/flume/conf/FlumeConfiguration.java   | 13 +--
 .../org/apache/flume/conf/LogPrivacyUtil.java   | 83 ++++++++++++++++++++
 .../org/apache/flume/source/AvroSource.java     | 11 ++-
 .../flume/source/MultiportSyslogTCPSource.java  |  9 ++-
 .../apache/flume/source/http/BLOBHandler.java   |  5 +-
 .../SyslogAvroEventSerializer.java              |  5 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 35 ++++++++-
 .../flume/agent/embedded/EmbeddedAgent.java     |  5 +-
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 20 ++---
 .../flume/sink/solr/morphline/BlobHandler.java  | 20 ++---
 .../sink/solr/morphline/MorphlineSink.java      |  6 +-
 .../apache/flume/source/kafka/KafkaSource.java  | 25 ++++--
 .../flume/source/twitter/TwitterSource.java     |  5 --
 17 files changed, 214 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/conf/flume-env.ps1.template
----------------------------------------------------------------------
diff --git a/conf/flume-env.ps1.template b/conf/flume-env.ps1.template
index 8bf535a..7cbc119 100644
--- a/conf/flume-env.ps1.template
+++ b/conf/flume-env.ps1.template
@@ -18,6 +18,11 @@
 # Give Flume more memory and pre-allocate, enable remote monitoring via JMX
 $JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
 
+# Let Flume write raw event data and configuration information to its log 
files for debugging
+# purposes. Enabling these flags is not recommended in production,
+# as it may result in logging sensitive user information or encryption secrets.
+# $JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true 
-Dorg.apache.flume.log.printconfig=true "
+
 # Foll. classpath will be included in Flume's classpath.
 # Note that the Flume conf directory is always included in the classpath.
 $FLUME_CLASSPATH=""   # Example:  "path1;path2;path3"

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/conf/flume-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/flume-env.sh.template b/conf/flume-env.sh.template
index c8b660f..07182ca 100644
--- a/conf/flume-env.sh.template
+++ b/conf/flume-env.sh.template
@@ -24,6 +24,11 @@
 # Give Flume more memory and pre-allocate, enable remote monitoring via JMX
 # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
 
+# Let Flume write raw event data and configuration information to its log 
files for debugging
+# purposes. Enabling these flags is not recommended in production,
+# as it may result in logging sensitive user information or encryption secrets.
+# export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true 
-Dorg.apache.flume.log.printconfig=true "
+
 # Note that the Flume conf directory is always included in the classpath.
 #FLUME_CLASSPATH=""
 

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
 
b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
index 845b794..01caef3 100644
--- 
a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
+++ 
b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
@@ -94,10 +94,7 @@ public class JdbcChannelProviderImpl implements 
JdbcChannelProvider {
 
   @Override
   public void initialize(Context context) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Initializing JDBC Channel provider with props: "
-          + context);
-    }
+    LOGGER.debug("Initializing JDBC Channel provider");
 
     initializeSystemProperties(context);
     initializeDataSource(context);
@@ -363,7 +360,7 @@ public class JdbcChannelProviderImpl implements 
JdbcChannelProvider {
   /**
 
    * Initializes the datasource and the underlying connection pool.
-   * @param properties
+   * @param context
    */
   private void initializeDataSource(Context context) {
     driverClassName = getConfigurationString(context,
@@ -592,7 +589,7 @@ public class JdbcChannelProviderImpl implements 
JdbcChannelProvider {
    * @param context
    * @param key the expected configuration key
    * @param oldKey the deprecated configuration key
-   * @param  default value, null if no default
+   * @param defaultValue default value, null if no default
    * @return the value associated with the key
    */
   private String getConfigurationString(Context context, String key,

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 684120f..e7f1f2e 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -35,6 +35,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -194,6 +195,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
     zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT);
 
+    if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+      logger.debug("Kafka properties: {}", ctx);
+    }
+
     if (counter == null) {
       counter = new KafkaChannelCounter(getName());
     }
@@ -257,7 +262,6 @@ public class KafkaChannel extends BasicChannelSemantics {
     //Defaults overridden based on config
     producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootStrapServers);
-    logger.info("Producer properties: " + producerProps.toString());
   }
 
   protected Properties getProducerProps() {
@@ -274,8 +278,6 @@ public class KafkaChannel extends BasicChannelSemantics {
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootStrapServers);
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-    logger.info(consumerProps.toString());
   }
 
   protected Properties getConsumerProps() {

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
index 9b3a434..8ae5bd9 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
@@ -310,9 +310,10 @@ public class FlumeConfiguration {
      * @return true if the configuration is valid, false otherwise
      */
     private boolean isValid() {
-      logger.debug("Starting validation of configuration for agent: "
-          + agentName + ", initial-configuration: " +
-          this.getPrevalidationConfig());
+      logger.debug("Starting validation of configuration for agent: {}", 
agentName);
+      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+        logger.debug("Initial configuration: {}", 
this.getPrevalidationConfig());
+      }
 
       // Make sure that at least one channel is specified
       if (channels == null || channels.trim().length() == 0) {
@@ -368,8 +369,10 @@ public class FlumeConfiguration {
       this.sinks = getSpaceDelimitedList(sinkSet);
       this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);
 
-      logger.debug("Post validation configuration for " + agentName + NEWLINE
-          + this.getPostvalidationConfig());
+      if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+        logger.debug("Post validation configuration for {}", agentName);
+        logger.debug(this.getPostvalidationConfig());
+      }
 
       return true;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java
new file mode 100644
index 0000000..3573025
--- /dev/null
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.conf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to help any Flume component determine whether logging 
potentially sensitive
+ * information is allowed or not.
+ * <p/>
+ * InterfaceAudience.Public<br/>
+ * InterfaceStability.Evolving
+ */
+public class LogPrivacyUtil {
+  private static final Logger logger = 
LoggerFactory.getLogger(LogPrivacyUtil.class);
+  /**
+   * system property name to enable logging of potentially sensitive user data
+   */
+  public static final String LOG_RAWDATA_PROP = "org.apache.flume.log.rawdata";
+  /**
+   * system property name to enable logging of information related to the 
validation of agent
+   * configuration at startup.
+   */
+  public static final String LOG_PRINTCONFIG_PROP = 
"org.apache.flume.log.printconfig";
+
+  static {
+    if (allowLogPrintConfig()) {
+      logger.warn("Logging of configuration details of the agent has been 
turned on by " +
+          "setting {} to true. Please use this setting with extra caution as 
it may result " +
+          "in logging of private data. This setting is not recommended in " +
+          "production environments.",
+          LOG_PRINTCONFIG_PROP);
+    } else {
+      logger.info("Logging of configuration details is disabled. To see 
configuration details " +
+          "in the log run the agent with -D{}=true JVM " +
+          "argument. Please note that this is not recommended in production " +
+          "systems as it may leak private information to the logfile.",
+          LOG_PRINTCONFIG_PROP);
+    }
+
+    if (allowLogRawData()) {
+      logger.warn("Logging raw data has been turned on by setting {} to true. 
Please use it with " +
+          "extra caution as it may result in logging of potentially sensitive 
user data. " +
+          "This setting is not recommended in production environments.",
+          LOG_RAWDATA_PROP);
+    }
+  }
+
+  /**
+   * Tells whether logging of configuration details - including secrets - is 
allowed or not. This
+   * is driven by a system property defined by LOG_PRINTCONFIG_PROP
+   * @return true only if logging is allowed
+   */
+  public static boolean allowLogRawData() {
+    return Boolean.getBoolean(LOG_RAWDATA_PROP);
+  }
+
+  /**
+   * Tells whether logging of potentially sensitive user data is allowed or 
not. This
+   * is driven by a system property defined by LOG_RAWDATA_PROP
+   * @return true only if logging is allowed
+   */
+  public static boolean allowLogPrintConfig() {
+    return Boolean.getBoolean(LOG_PRINTCONFIG_PROP);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 8b9b956..762f690 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -36,6 +36,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.Source;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -344,8 +345,14 @@ public class AvroSource extends AbstractSource implements 
EventDrivenSource,
 
   @Override
   public Status append(AvroFlumeEvent avroEvent) {
-    logger.debug("Avro source {}: Received avro event: {}", getName(),
-        avroEvent);
+    if (logger.isDebugEnabled()) {
+      if (LogPrivacyUtil.allowLogRawData()) {
+        logger.debug("Avro source {}: Received avro event: {}", getName(), 
avroEvent);
+      } else {
+        logger.debug("Avro source {}: Received avro event", getName());
+      }
+    }
+
     sourceCounter.incrementAppendReceivedCount();
     sourceCounter.incrementEventReceivedCount();
 

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index b9f2438..4436094 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -36,6 +36,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.mina.core.buffer.IoBuffer;
@@ -357,7 +358,13 @@ public class MultiportSyslogTCPSource extends 
AbstractSource implements
         return event;
       }
 
-      logger.trace("Seen raw event: {}", msg);
+      if (logger.isTraceEnabled()) {
+        if (LogPrivacyUtil.allowLogRawData()) {
+          logger.trace("Seen raw event: {}", msg);
+        } else {
+          logger.trace("Seen raw event.");
+        }
+      }
 
       Event event;
       try {

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
index e24d4c6..e1891cb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,9 @@ public class BLOBHandler implements HTTPSourceHandler {
     Map<String, String[]> parameters = request.getParameterMap();
     for (String parameter : parameters.keySet()) {
       String value = parameters.get(parameter)[0];
-      LOG.debug("Setting Header [Key, Value] as [{},{}] ",parameter, value);
+      if (LOG.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) {
+        LOG.debug("Setting Header [Key, Value] as [{},{}] ", parameter, value);
+      }
       headers.put(parameter, value);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
index 05af3b1..22e7e9e 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.serialization.SyslogAvroEventSerializer.SyslogEvent;
 import org.apache.flume.source.SyslogUtils;
 import org.joda.time.DateTime;
@@ -147,7 +148,9 @@ public class SyslogAvroEventSerializer
     String actualMessage = msg.substring(seek);
     sle.setMessage(actualMessage);
 
-    logger.debug("Serialized event as: {}", sle);
+    if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) {
+      logger.debug("Serialized event as: {}", sle);
+    }
 
     return sle;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 7e207aa..0fd1ec9 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -234,6 +234,36 @@ The original Flume terminal will output the event in a log 
message.
 
 Congratulations - you've successfully configured and deployed a Flume agent! 
Subsequent sections cover agent configuration in much more detail.
 
+Logging raw data
+~~~~~~~~~~~~~~~~
+
+
+Logging the raw stream of data flowing through the ingest pipeline is not 
desired behaviour in
+many production environments because this may result in leaking sensitive data 
or security related
+configurations, such as secret keys, to Flume log files.
+By default, Flume will not log such information. On the other hand, if the 
data pipeline is broken,
+Flume will attempt to provide clues for debugging the problem.
+
+One way to debug problems with event pipelines is to set up an additional 
`Memory Channel`_
+connected to a `Logger Sink`_, which will output all event data to the Flume 
logs.
+In some situations, however, this approach is insufficient.
+
+In order to enable logging of event- and configuration-related data, some Java 
system properties
+must be set in addition to log4j properties.
+
+To enable configuration-related logging, set the Java system property
+``-Dorg.apache.flume.log.printconfig=true``. This can either be passed on the 
command line or by
+setting this in the ``JAVA_OPTS`` variable in *flume-env.sh*.
+
+To enable data logging, set the Java system property 
``-Dorg.apache.flume.log.rawdata=true``
+in the same way described above. For most components, the log4j logging level 
must also be set to
+DEBUG or TRACE to make event-specific logging appear in the Flume logs.
+
+Here is an example of enabling both configuration logging and raw data logging 
while also
+setting the Log4j loglevel to DEBUG for console output::
+
+  $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 
-Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true 
-Dorg.apache.flume.log.rawdata=true
+
 
 Zookeeper based Configuration
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2001,8 +2031,9 @@ accept tab separated input containing three fields and to 
skip the second field.
 Logger Sink
 ~~~~~~~~~~~
 
-Logs event at INFO level. Typically useful for testing/debugging purpose.
-Required properties are in **bold**.
+Logs event at INFO level. Typically useful for testing/debugging purpose. 
Required properties are
+in **bold**. This sink is the only exception which doesn't require the extra 
configuration
+explained in the `Logging raw data`_ section.
 
 ==============  =======  ===========================================
 Property Name   Default  Description

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
 
b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index ad3e138..d33fa8b 100644
--- 
a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ 
b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -32,6 +32,7 @@ import org.apache.flume.Source;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
@@ -149,7 +150,7 @@ public class EmbeddedAgent {
 
     properties = EmbeddedAgentConfiguration.configure(name, properties);
 
-    if (LOGGER.isDebugEnabled()) {
+    if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
       LOGGER.debug("Agent configuration values");
       for (String key : new TreeSet<String>(properties.keySet())) {
         LOGGER.debug(key + " = " + properties.get(key));
@@ -255,4 +256,4 @@ public class EmbeddedAgent {
     STOPPED(),
     STARTED();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 9453546..89bdd84 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -20,7 +20,6 @@ package org.apache.flume.sink.kafka;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
-
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -32,6 +31,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -174,12 +174,15 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
           eventTopic = topic;
         }
         eventKey = headers.get(KEY_HEADER);
-
-        if (logger.isDebugEnabled()) {
-          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
-              + new String(eventBody, "UTF-8"));
-          logger.debug("event #{}", processedEvents);
+        if (logger.isTraceEnabled()) {
+          if (LogPrivacyUtil.allowLogRawData()) {
+            logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
+                + new String(eventBody, "UTF-8"));
+          } else {
+            logger.trace("{Event} " + eventTopic + " : " + eventKey);
+          }
         }
+        logger.debug("event #{}", processedEvents);
 
         // create a message and add to buffer
         long startTime = System.currentTimeMillis();
@@ -300,8 +303,8 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
 
     setProducerProps(context, bootStrapServers);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("Kafka producer properties: {}" , kafkaProps);
+    if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+      logger.debug("Kafka producer properties: {}", kafkaProps);
     }
 
     if (counter == null) {
@@ -370,7 +373,6 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_SERIAIZER);
     kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
     kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
-    logger.info("Producer properties: {}" , kafkaProps.toString());
   }
 
   protected Properties getKafkaProps() {

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
index ca7614a..fe98746 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.http.HTTPSourceHandler;
 import org.apache.tika.metadata.Metadata;
@@ -69,11 +70,11 @@ public class BlobHandler implements HTTPSourceHandler {
           + " must be greater than zero: " + maxBlobLength);
     }
   }
-  
+
   @SuppressWarnings("resource")
   @Override
   public List<Event> getEvents(HttpServletRequest request) throws Exception {
-    Map<String, String> headers = getHeaders(request);    
+    Map<String, String> headers = getHeaders(request);
     InputStream in = request.getInputStream();
     try {
       ByteArrayOutputStream blob = null;
@@ -88,14 +89,16 @@ public class BlobHandler implements HTTPSourceHandler {
         blobLength += n;
         if (blobLength >= maxBlobLength) {
           LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating 
BLOB event!",
-                      maxBlobLength);
+              maxBlobLength);
           break;
         }
       }
 
       byte[] array = blob != null ? blob.toByteArray() : new byte[0];
       Event event = EventBuilder.withBody(array, headers);
-      LOGGER.debug("blobEvent: {}", event);
+      if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) {
+        LOGGER.debug("blobEvent: {}", event);
+      }
       return Collections.singletonList(event);
     } finally {
       in.close();
@@ -103,15 +106,15 @@ public class BlobHandler implements HTTPSourceHandler {
   }
 
   private Map<String, String> getHeaders(HttpServletRequest request) {
-    if (LOGGER.isDebugEnabled()) {
+    if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) {
       Map requestHeaders = new HashMap();
       Enumeration iter = request.getHeaderNames();
       while (iter.hasMoreElements()) {
         String name = (String) iter.nextElement();
-        requestHeaders.put(name, request.getHeader(name));        
+        requestHeaders.put(name, request.getHeader(name));
       }
       LOGGER.debug("requestHeaders: {}", requestHeaders);
-    }    
+    }
     Map<String, String> headers = new HashMap();
     if (request.getContentType() != null) {
       headers.put(Metadata.CONTENT_TYPE, request.getContentType());
@@ -119,9 +122,8 @@ public class BlobHandler implements HTTPSourceHandler {
     Enumeration iter = request.getParameterNames();
     while (iter.hasMoreElements()) {
       String name = (String) iter.nextElement();
-      headers.put(name, request.getParameter(name));        
+      headers.put(name, request.getParameter(name));
     }
     return headers;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index f7a73f3..0917d39 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -24,6 +24,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.slf4j.Logger;
@@ -136,7 +137,10 @@ public class MorphlineSink extends AbstractSink implements 
Configurable {
         }
         sinkCounter.incrementEventDrainAttemptCount();
         numEventsTaken++;
-        LOGGER.debug("Flume event: {}", event);
+        if (LOGGER.isTraceEnabled() && LogPrivacyUtil.allowLogRawData()) {
+          LOGGER.trace("Flume event arrived {}", event);
+        }
+
         //StreamEvent streamEvent = createStreamEvent(event);
         handler.process(event);
         if (System.currentTimeMillis() >= batchEndTime) {

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 90e4715..86782c3 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -38,6 +38,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
@@ -240,12 +241,18 @@ public class KafkaSource extends AbstractPollableSource
           headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
         }
 
-        if (log.isDebugEnabled()) {
-          log.debug("Topic: {} Partition: {} Message: {}", new String[] {
-              message.topic(),
-              String.valueOf(message.partition()),
-              new String(eventBody)
-          });
+        if (log.isTraceEnabled()) {
+          if (LogPrivacyUtil.allowLogRawData()) {
+            log.trace("Topic: {} Partition: {} Message: {}", new String[]{
+                message.topic(),
+                String.valueOf(message.partition()),
+                new String(eventBody)
+            });
+          } else {
+            log.trace("Topic: {} Partition: {} Message arrived.",
+                message.topic(),
+                String.valueOf(message.partition()));
+          }
         }
 
         event = EventBuilder.withBody(eventBody, headers);
@@ -342,6 +349,10 @@ public class KafkaSource extends AbstractPollableSource
 
     setConsumerProps(context, bootstrapServers);
 
+    if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
+      log.debug("Kafka consumer properties: {}", kafkaProps);
+    }
+
     if (counter == null) {
       counter = new KafkaSourceCounter(getName());
     }
@@ -388,8 +399,6 @@ public class KafkaSource extends AbstractPollableSource
     }
     kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                    KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
-
-    log.info(kafkaProps.toString());
   }
 
   Properties getConsumerProps() {

http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
 
b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
index f5c8328..d812023 100644
--- 
a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
+++ 
b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
@@ -107,11 +107,6 @@ public class TwitterSource
     String accessToken = context.getString("accessToken");
     String accessTokenSecret = context.getString("accessTokenSecret");
 
-    LOGGER.info("Consumer Key:        '" + consumerKey + "'");
-    LOGGER.info("Consumer Secret:     '" + consumerSecret + "'");
-    LOGGER.info("Access Token:        '" + accessToken + "'");
-    LOGGER.info("Access Token Secret: '" + accessTokenSecret + "'");
-
     twitterStream = new TwitterStreamFactory().getInstance();
     twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
     twitterStream.setOAuthAccessToken(new AccessToken(accessToken,

Reply via email to