Repository: apex-malhar Updated Branches: refs/heads/master ef42c52a1 -> e44caa5a5
APEXMALHAR-2148 #resolve #comment Skip some noisy loggings from kafka Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/13394415 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/13394415 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/13394415 Branch: refs/heads/master Commit: 133944151cff8a62656e1667af214c180512f722 Parents: 67b84dd Author: Siyuan Hua <[email protected]> Authored: Fri Aug 19 15:51:19 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Fri Aug 19 15:51:19 2016 -0700 ---------------------------------------------------------------------- .../apex/malhar/kafka/AbstractKafkaInputOperator.java | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13394415/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 3e709eb..9af5539 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -38,10 +38,13 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -79,6 +82,12 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); + static { + // We create new consumers periodically to pull metadata (Kafka consumer keeps metadata in cache) + // Skip log4j log for ConsumerConfig class to avoid too much noise in application + LogManager.getLogger(ConsumerConfig.class).setLevel(Level.WARN); + } + public enum InitialOffset { EARLIEST, // consume from beginning of the partition every time when application restart
