Repository: apex-malhar
Updated Branches:
  refs/heads/master ef42c52a1 -> e44caa5a5


APEXMALHAR-2148 #resolve #comment Skip some noisy loggings from kafka


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/13394415
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/13394415
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/13394415

Branch: refs/heads/master
Commit: 133944151cff8a62656e1667af214c180512f722
Parents: 67b84dd
Author: Siyuan Hua <[email protected]>
Authored: Fri Aug 19 15:51:19 2016 -0700
Committer: Siyuan Hua <[email protected]>
Committed: Fri Aug 19 15:51:19 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/kafka/AbstractKafkaInputOperator.java       | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/13394415/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 3e709eb..9af5539 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -38,10 +38,13 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -79,6 +82,12 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
 
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
 
+  static {
+    // We create new consumers periodically to pull metadata (Kafka consumer 
keeps metadata in cache)
+    // Skip log4j log for ConsumerConfig class to avoid too much noise in 
application
+    LogManager.getLogger(ConsumerConfig.class).setLevel(Level.WARN);
+  }
+
   public enum InitialOffset
   {
     EARLIEST, // consume from beginning of the partition every time when 
application restart

Reply via email to