Repository: flume Updated Branches: refs/heads/trunk 3ad7d2764 -> 25e4bc6d8 (forced update)
FLUME-2954. Make raw data appearing in log messages explicit Flume has built-in functionality to log data flowing through, mainly for debugging purposes. This functionality appears in several places in the code base. Such functionality can raise security concerns in production environments where sensitive information might be ingested so it is crucial that enabling such functionality be as explicit as possible. This patch adds two system properties, one to enable logging of Flume configuration properties and one to enable logging of raw data. If they are not set, these items are never logged at any log4j logging level. Reviewers: Balázs Donát Bessenyei, Denes Arvay, Mike Percy (Attila Simon via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/25e4bc6d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/25e4bc6d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/25e4bc6d Branch: refs/heads/trunk Commit: 25e4bc6d80cf475862a1686fb2c3c97fcea27278 Parents: 1e8f265 Author: Attila Simon <[email protected]> Authored: Mon Aug 29 12:15:56 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Mon Aug 29 12:34:49 2016 -0700 ---------------------------------------------------------------------- conf/flume-env.ps1.template | 5 ++ conf/flume-env.sh.template | 5 ++ .../jdbc/impl/JdbcChannelProviderImpl.java | 9 +-- .../flume/channel/kafka/KafkaChannel.java | 8 +- .../apache/flume/conf/FlumeConfiguration.java | 13 +-- .../org/apache/flume/conf/LogPrivacyUtil.java | 83 ++++++++++++++++++++ .../org/apache/flume/source/AvroSource.java | 11 ++- .../flume/source/MultiportSyslogTCPSource.java | 9 ++- .../apache/flume/source/http/BLOBHandler.java | 5 +- .../SyslogAvroEventSerializer.java | 5 +- flume-ng-doc/sphinx/FlumeUserGuide.rst | 35 ++++++++- .../flume/agent/embedded/EmbeddedAgent.java | 5 +- .../org/apache/flume/sink/kafka/KafkaSink.java | 20 ++--- .../flume/sink/solr/morphline/BlobHandler.java | 20 ++--- .../sink/solr/morphline/MorphlineSink.java | 6 +- .../apache/flume/source/kafka/KafkaSource.java | 25 ++++-- .../flume/source/twitter/TwitterSource.java | 5 -- 17 files changed, 214 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/conf/flume-env.ps1.template ---------------------------------------------------------------------- diff --git a/conf/flume-env.ps1.template b/conf/flume-env.ps1.template index 8bf535a..7cbc119 100644 --- a/conf/flume-env.ps1.template +++ b/conf/flume-env.ps1.template @@ -18,6 +18,11 @@ # Give Flume more memory and pre-allocate, enable remote monitoring via JMX $JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote" +# Let Flume write raw event data and configuration information to its log files for debugging +# purposes. Enabling these flags is not recommended in production, +# as it may result in logging sensitive user information or encryption secrets. +# $JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true " + # Foll. classpath will be included in Flume's classpath. # Note that the Flume conf directory is always included in the classpath. $FLUME_CLASSPATH="" # Example: "path1;path2;path3" http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/conf/flume-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/flume-env.sh.template b/conf/flume-env.sh.template index c8b660f..07182ca 100644 --- a/conf/flume-env.sh.template +++ b/conf/flume-env.sh.template @@ -24,6 +24,11 @@ # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" +# Let Flume write raw event data and configuration information to its log files for debugging +# purposes. Enabling these flags is not recommended in production, +# as it may result in logging sensitive user information or encryption secrets. +# export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true " + # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH="" http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java index 845b794..01caef3 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java @@ -94,10 +94,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { @Override public void initialize(Context context) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Initializing JDBC Channel provider with props: " - + context); - } + LOGGER.debug("Initializing JDBC Channel provider"); initializeSystemProperties(context); initializeDataSource(context); @@ -363,7 +360,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { /** * Initializes the datasource and the underlying connection pool. - * @param properties + * @param context */ private void initializeDataSource(Context context) { driverClassName = getConfigurationString(context, @@ -592,7 +589,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { * @param context * @param key the expected configuration key * @param oldKey the deprecated configuration key - * @param default value, null if no default + * @param defaultValue default value, null if no default * @return the value associated with the key */ private String getConfigurationString(Context context, String key, http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 684120f..e7f1f2e 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -35,6 +35,7 @@ import org.apache.flume.FlumeException; import org.apache.flume.channel.BasicChannelSemantics; import org.apache.flume.channel.BasicTransactionSemantics; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaChannelCounter; import org.apache.flume.source.avro.AvroFlumeEvent; @@ -194,6 +195,10 @@ public class KafkaChannel extends BasicChannelSemantics { DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS); zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT); + if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + logger.debug("Kafka properties: {}", ctx); + } + if (counter == null) { counter = new KafkaChannelCounter(getName()); } @@ -257,7 +262,6 @@ public class KafkaChannel extends BasicChannelSemantics { //Defaults overridden based on config producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); - logger.info("Producer properties: " + producerProps.toString()); } protected Properties getProducerProps() { @@ -274,8 +278,6 @@ public class KafkaChannel extends BasicChannelSemantics { consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - - logger.info(consumerProps.toString()); } protected Properties getConsumerProps() { http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java index 9b3a434..8ae5bd9 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java @@ -310,9 +310,10 @@ public class FlumeConfiguration { * @return true if the configuration is valid, false otherwise */ private boolean isValid() { - logger.debug("Starting validation of configuration for agent: " - + agentName + ", initial-configuration: " + - this.getPrevalidationConfig()); + logger.debug("Starting validation of configuration for agent: {}", agentName); + if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + logger.debug("Initial configuration: {}", this.getPrevalidationConfig()); + } // Make sure that at least one channel is specified if (channels == null || channels.trim().length() == 0) { @@ -368,8 +369,10 @@ public class FlumeConfiguration { this.sinks = getSpaceDelimitedList(sinkSet); this.sinkgroups = getSpaceDelimitedList(sinkgroupSet); - logger.debug("Post validation configuration for " + agentName + NEWLINE - + this.getPostvalidationConfig()); + if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + logger.debug("Post validation configuration for {}", agentName); + logger.debug(this.getPostvalidationConfig()); + } return true; } http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java new file mode 100644 index 0000000..3573025 --- /dev/null +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/LogPrivacyUtil.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.conf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to help any Flume component determine whether logging potentially sensitive + * information is allowed or not. + * <p/> + * InterfaceAudience.Public<br/> + * InterfaceStability.Evolving + */ +public class LogPrivacyUtil { + private static final Logger logger = LoggerFactory.getLogger(LogPrivacyUtil.class); + /** + * system property name to enable logging of potentially sensitive user data + */ + public static final String LOG_RAWDATA_PROP = "org.apache.flume.log.rawdata"; + /** + * system property name to enable logging of information related to the validation of agent + * configuration at startup. + */ + public static final String LOG_PRINTCONFIG_PROP = "org.apache.flume.log.printconfig"; + + static { + if (allowLogPrintConfig()) { + logger.warn("Logging of configuration details of the agent has been turned on by " + + "setting {} to true. Please use this setting with extra caution as it may result " + + "in logging of private data. This setting is not recommended in " + + "production environments.", + LOG_PRINTCONFIG_PROP); + } else { + logger.info("Logging of configuration details is disabled. To see configuration details " + + "in the log run the agent with -D{}=true JVM " + + "argument. Please note that this is not recommended in production " + + "systems as it may leak private information to the logfile.", + LOG_PRINTCONFIG_PROP); + } + + if (allowLogRawData()) { + logger.warn("Logging raw data has been turned on by setting {} to true. Please use it with " + + "extra caution as it may result in logging of potentially sensitive user data. " + + "This setting is not recommended in production environments.", + LOG_RAWDATA_PROP); + } + } + + /** + * Tells whether logging of configuration details - including secrets - is allowed or not. This + * is driven by a system property defined by LOG_PRINTCONFIG_PROP + * @return true only if logging is allowed + */ + public static boolean allowLogRawData() { + return Boolean.getBoolean(LOG_RAWDATA_PROP); + } + + /** + * Tells whether logging of potentially sensitive user data is allowed or not. This + * is driven by a system property defined by LOG_RAWDATA_PROP + * @return true only if logging is allowed + */ + public static boolean allowLogPrintConfig() { + return Boolean.getBoolean(LOG_PRINTCONFIG_PROP); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index 8b9b956..762f690 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -36,6 +36,7 @@ import org.apache.flume.FlumeException; import org.apache.flume.Source; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.avro.AvroFlumeEvent; @@ -344,8 +345,14 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, @Override public Status append(AvroFlumeEvent avroEvent) { - logger.debug("Avro source {}: Received avro event: {}", getName(), - avroEvent); + if (logger.isDebugEnabled()) { + if (LogPrivacyUtil.allowLogRawData()) { + logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent); + } else { + logger.debug("Avro source {}: Received avro event", getName()); + } + } + sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java index b9f2438..4436094 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -36,6 +36,7 @@ import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.apache.mina.core.buffer.IoBuffer; @@ -357,7 +358,13 @@ public class MultiportSyslogTCPSource extends AbstractSource implements return event; } - logger.trace("Seen raw event: {}", msg); + if (logger.isTraceEnabled()) { + if (LogPrivacyUtil.allowLogRawData()) { + logger.trace("Seen raw event: {}", msg); + } else { + logger.trace("Seen raw event."); + } + } Event event; try { http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java index e24d4c6..e1891cb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java @@ -29,6 +29,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,9 @@ public class BLOBHandler implements HTTPSourceHandler { Map<String, String[]> parameters = request.getParameterMap(); for (String parameter : parameters.keySet()) { String value = parameters.get(parameter)[0]; - LOG.debug("Setting Header [Key, Value] as [{},{}] ",parameter, value); + if (LOG.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) { + LOG.debug("Setting Header [Key, Value] as [{},{}] ", parameter, value); + } headers.put(parameter, value); } http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java index 05af3b1..22e7e9e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.serialization.SyslogAvroEventSerializer.SyslogEvent; import org.apache.flume.source.SyslogUtils; import org.joda.time.DateTime; @@ -147,7 +148,9 @@ public class SyslogAvroEventSerializer String actualMessage = msg.substring(seek); sle.setMessage(actualMessage); - logger.debug("Serialized event as: {}", sle); + if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) { + logger.debug("Serialized event as: {}", sle); + } return sle; } http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7e207aa..0fd1ec9 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -234,6 +234,36 @@ The original Flume terminal will output the event in a log message. Congratulations - you've successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail. +Logging raw data +~~~~~~~~~~~~~~~~ + + +Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour in +many production environments because this may result in leaking sensitive data or security related +configurations, such as secret keys, to Flume log files. +By default, Flume will not log such information. On the other hand, if the data pipeline is broken, +Flume will attempt to provide clues for debugging the problem. + +One way to debug problems with event pipelines is to set up an additional `Memory Channel`_ +connected to a `Logger Sink`_, which will output all event data to the Flume logs. +In some situations, however, this approach is insufficient. + +In order to enable logging of event- and configuration-related data, some Java system properties +must be set in addition to log4j properties. + +To enable configuration-related logging, set the Java system property +``-Dorg.apache.flume.log.printconfig=true``. This can either be passed on the command line or by +setting this in the ``JAVA_OPTS`` variable in *flume-env.sh*. + +To enable data logging, set the Java system property ``-Dorg.apache.flume.log.rawdata=true`` +in the same way described above. For most components, the log4j logging level must also be set to +DEBUG or TRACE to make event-specific logging appear in the Flume logs. + +Here is an example of enabling both configuration logging and raw data logging while also +setting the Log4j loglevel to DEBUG for console output:: + + $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true + Zookeeper based Configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -2001,8 +2031,9 @@ accept tab separated input containing three fields and to skip the second field. Logger Sink ~~~~~~~~~~~ -Logs event at INFO level. Typically useful for testing/debugging purpose. -Required properties are in **bold**. +Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are +in **bold**. This sink is the only exception which doesn't require the extra configuration +explained in the `Logging raw data`_ section. ============== ======= =========================================== Property Name Default Description http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java index ad3e138..d33fa8b 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java @@ -32,6 +32,7 @@ import org.apache.flume.Source; import org.apache.flume.SourceRunner; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.lifecycle.LifecycleAware; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.lifecycle.LifecycleSupervisor; @@ -149,7 +150,7 @@ public class EmbeddedAgent { properties = EmbeddedAgentConfiguration.configure(name, properties); - if (LOGGER.isDebugEnabled()) { + if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { LOGGER.debug("Agent configuration values"); for (String key : new TreeSet<String>(properties.keySet())) { LOGGER.debug(key + " = " + properties.get(key)); @@ -255,4 +256,4 @@ public class EmbeddedAgent { STOPPED(), STARTED(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 9453546..89bdd84 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -20,7 +20,6 @@ package org.apache.flume.sink.kafka; import com.google.common.base.Optional; import com.google.common.base.Throwables; - import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -32,6 +31,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.instrumentation.kafka.KafkaSinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.flume.source.avro.AvroFlumeEvent; @@ -174,12 +174,15 @@ public class KafkaSink extends AbstractSink implements Configurable { eventTopic = topic; } eventKey = headers.get(KEY_HEADER); - - if (logger.isDebugEnabled()) { - logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " - + new String(eventBody, "UTF-8")); - logger.debug("event #{}", processedEvents); + if (logger.isTraceEnabled()) { + if (LogPrivacyUtil.allowLogRawData()) { + logger.trace("{Event} " + eventTopic + " : " + eventKey + " : " + + new String(eventBody, "UTF-8")); + } else { + logger.trace("{Event} " + eventTopic + " : " + eventKey); + } } + logger.debug("event #{}", processedEvents); // create a message and add to buffer long startTime = System.currentTimeMillis(); @@ -300,8 +303,8 @@ public class KafkaSink extends AbstractSink implements Configurable { setProducerProps(context, bootStrapServers); - if (logger.isDebugEnabled()) { - logger.debug("Kafka producer properties: {}" , kafkaProps); + if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + logger.debug("Kafka producer properties: {}", kafkaProps); } if (counter == null) { @@ -370,7 +373,6 @@ public class KafkaSink extends AbstractSink implements Configurable { kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX)); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); - logger.info("Producer properties: {}" , kafkaProps.toString()); } protected Properties getKafkaProps() { http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java index ca7614a..fe98746 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java @@ -29,6 +29,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.http.HTTPSourceHandler; import org.apache.tika.metadata.Metadata; @@ -69,11 +70,11 @@ public class BlobHandler implements HTTPSourceHandler { + " must be greater than zero: " + maxBlobLength); } } - + @SuppressWarnings("resource") @Override public List<Event> getEvents(HttpServletRequest request) throws Exception { - Map<String, String> headers = getHeaders(request); + Map<String, String> headers = getHeaders(request); InputStream in = request.getInputStream(); try { ByteArrayOutputStream blob = null; @@ -88,14 +89,16 @@ public class BlobHandler implements HTTPSourceHandler { blobLength += n; if (blobLength >= maxBlobLength) { LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", - maxBlobLength); + maxBlobLength); break; } } byte[] array = blob != null ? blob.toByteArray() : new byte[0]; Event event = EventBuilder.withBody(array, headers); - LOGGER.debug("blobEvent: {}", event); + if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) { + LOGGER.debug("blobEvent: {}", event); + } return Collections.singletonList(event); } finally { in.close(); @@ -103,15 +106,15 @@ public class BlobHandler implements HTTPSourceHandler { } private Map<String, String> getHeaders(HttpServletRequest request) { - if (LOGGER.isDebugEnabled()) { + if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogRawData()) { Map requestHeaders = new HashMap(); Enumeration iter = request.getHeaderNames(); while (iter.hasMoreElements()) { String name = (String) iter.nextElement(); - requestHeaders.put(name, request.getHeader(name)); + requestHeaders.put(name, request.getHeader(name)); } LOGGER.debug("requestHeaders: {}", requestHeaders); - } + } Map<String, String> headers = new HashMap(); if (request.getContentType() != null) { headers.put(Metadata.CONTENT_TYPE, request.getContentType()); @@ -119,9 +122,8 @@ public class BlobHandler implements HTTPSourceHandler { Enumeration iter = request.getParameterNames(); while (iter.hasMoreElements()) { String name = (String) iter.nextElement(); - headers.put(name, request.getParameter(name)); + headers.put(name, request.getParameter(name)); } return headers; } - } http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index f7a73f3..0917d39 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -24,6 +24,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; @@ -136,7 +137,10 @@ public class MorphlineSink extends AbstractSink implements Configurable { } sinkCounter.incrementEventDrainAttemptCount(); numEventsTaken++; - LOGGER.debug("Flume event: {}", event); + if (LOGGER.isTraceEnabled() && LogPrivacyUtil.allowLogRawData()) { + LOGGER.trace("Flume event arrived {}", event); + } + //StreamEvent streamEvent = createStreamEvent(event); handler.process(event); if (System.currentTimeMillis() >= batchEndTime) { http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 90e4715..86782c3 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -38,6 +38,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; import org.apache.flume.source.AbstractPollableSource; @@ -240,12 +241,18 @@ public class KafkaSource extends AbstractPollableSource headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey); } - if (log.isDebugEnabled()) { - log.debug("Topic: {} Partition: {} Message: {}", new String[] { - message.topic(), - String.valueOf(message.partition()), - new String(eventBody) - }); + if (log.isTraceEnabled()) { + if (LogPrivacyUtil.allowLogRawData()) { + log.trace("Topic: {} Partition: {} Message: {}", new String[]{ + message.topic(), + String.valueOf(message.partition()), + new String(eventBody) + }); + } else { + log.trace("Topic: {} Partition: {} Message arrived.", + message.topic(), + String.valueOf(message.partition())); + } } event = EventBuilder.withBody(eventBody, headers); @@ -342,6 +349,10 @@ public class KafkaSource extends AbstractPollableSource setConsumerProps(context, bootstrapServers); + if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { + log.debug("Kafka consumer properties: {}", kafkaProps); + } + if (counter == null) { counter = new KafkaSourceCounter(getName()); } @@ -388,8 +399,6 @@ public class KafkaSource extends AbstractPollableSource } kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT); - - log.info(kafkaProps.toString()); } Properties getConsumerProps() { http://git-wip-us.apache.org/repos/asf/flume/blob/25e4bc6d/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java index f5c8328..d812023 100644 --- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -107,11 +107,6 @@ public class TwitterSource String accessToken = context.getString("accessToken"); String accessTokenSecret = context.getString("accessTokenSecret"); - LOGGER.info("Consumer Key: '" + consumerKey + "'"); - LOGGER.info("Consumer Secret: '" + consumerSecret + "'"); - LOGGER.info("Access Token: '" + accessToken + "'"); - LOGGER.info("Access Token Secret: '" + accessTokenSecret + "'"); - twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.setOAuthConsumer(consumerKey, consumerSecret); twitterStream.setOAuthAccessToken(new AccessToken(accessToken,
