This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ae05ed  [GOBBLIN-1056] Refactor to allow customizing client pool 
population in KafkaSource
3ae05ed is described below

commit 3ae05ed117914122e7e88c7507c399ffd1f68cd2
Author: zhchen <[email protected]>
AuthorDate: Fri Feb 21 11:36:38 2020 -0800

    [GOBBLIN-1056] Refactor to allow customizing client pool population in 
KafkaSource
    
    Closes #2896 from zxcware/clientPool
---
 .../source/extractor/extract/kafka/KafkaSource.java      | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 611e55e..f2ecf9d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -141,8 +141,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
 
   // sharing the kafka consumer may result in contention, so support thread 
local consumers
-  private final ConcurrentLinkedQueue<GobblinKafkaConsumerClient> 
kafkaConsumerClientPool = new ConcurrentLinkedQueue();
-  private static final ThreadLocal<GobblinKafkaConsumerClient> 
kafkaConsumerClient =
+  protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient> 
kafkaConsumerClientPool = new ConcurrentLinkedQueue();
+  protected static final ThreadLocal<GobblinKafkaConsumerClient> 
kafkaConsumerClient =
           new ThreadLocal<GobblinKafkaConsumerClient>();
   private GobblinKafkaConsumerClient sharedKafkaConsumerClient = null;
   private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> 
kafkaConsumerClientResolver =
@@ -241,9 +241,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
         this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
       } else {
         // preallocate one client per thread
-        for (int i = 0; i < numOfThreads; i++) {
-          
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
-        }
+        populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
       }
 
       Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
@@ -295,6 +293,14 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     }
   }
 
+  protected void populateClientPool(int count,
+      GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
+      Config config) {
+    for (int i = 0; i < count; i++) {
+      kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
+    }
+  }
+
   private void addTopicSpecificPropsToWorkUnits(List<WorkUnit> workUnits, 
Map<String, State> topicSpecificStateMap) {
     for (WorkUnit workUnit : workUnits) {
       addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);

Reply via email to