[ 
https://issues.apache.org/jira/browse/GOBBLIN-1040?focusedWorklogId=395725&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-395725
 ]

ASF GitHub Bot logged work on GOBBLIN-1040:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Mar/20 02:42
            Start Date: 02/Mar/20 02:42
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on pull request #2900: [GOBBLIN-1040] 
HighLevelConsumer re-design by removing references to …
URL: https://github.com/apache/incubator-gobblin/pull/2900#discussion_r386168996
 
 

 ##########
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 ##########
 @@ -18,63 +18,75 @@
 package org.apache.gobblin.runtime.kafka;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
 
 /**
- * A high level consumer for Kafka topics. Subclasses should implement {@link 
HighLevelConsumer#processMessage(MessageAndMetadata)}
+ * A high level consumer for Kafka topics. Subclasses should implement {@link 
HighLevelConsumer#processMessage(DecodeableKafkaRecord)}
  *
- * Note each thread will block for each message until {@link 
#processMessage(MessageAndMetadata)} returns, so for high
- * volume topics with few partitions {@link 
#processMessage(MessageAndMetadata)} must be fast or itself spawn more
- * threads.
+ * Note: each thread (queue) will block for each message until {@link 
#processMessage(DecodeableKafkaRecord)} returns
  *
- * If threads > partitions in topic, extra threads will be idle.
+ * If threads(queues) > partitions in topic, extra threads(queues) will be 
idle.
  *
- * @param <K> type of the key.
- * @param <V> type of the value.
  */
 @Slf4j
-public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
+public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
+
+  public static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = 
"kafka.consumerClientClassFactory";
+  private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS =
+      "org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory";
+  public static final String ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
+  public static final boolean DEFAULT_AUTO_COMMIT_VALUE = true;
 
 Review comment:
   Should the default be false?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 395725)
    Time Spent: 1h 20m  (was: 1h 10m)

> Fix High level consumer 
> ------------------------
>
>                 Key: GOBBLIN-1040
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1040
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Vikram Bohra
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to