This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae05ed [GOBBLIN-1056] Refactor to allow customizing client pool
population in KafkaSource
3ae05ed is described below
commit 3ae05ed117914122e7e88c7507c399ffd1f68cd2
Author: zhchen <[email protected]>
AuthorDate: Fri Feb 21 11:36:38 2020 -0800
[GOBBLIN-1056] Refactor to allow customizing client pool population in
KafkaSource
Closes #2896 from zxcware/clientPool
---
.../source/extractor/extract/kafka/KafkaSource.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 611e55e..f2ecf9d 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -141,8 +141,8 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
private final AtomicInteger offsetTooLateCount = new AtomicInteger(0);
// sharing the kafka consumer may result in contention, so support thread
local consumers
- private final ConcurrentLinkedQueue<GobblinKafkaConsumerClient>
kafkaConsumerClientPool = new ConcurrentLinkedQueue();
- private static final ThreadLocal<GobblinKafkaConsumerClient>
kafkaConsumerClient =
+ protected final ConcurrentLinkedQueue<GobblinKafkaConsumerClient>
kafkaConsumerClientPool = new ConcurrentLinkedQueue();
+ protected static final ThreadLocal<GobblinKafkaConsumerClient>
kafkaConsumerClient =
new ThreadLocal<GobblinKafkaConsumerClient>();
private GobblinKafkaConsumerClient sharedKafkaConsumerClient = null;
private final ClassAliasResolver<GobblinKafkaConsumerClientFactory>
kafkaConsumerClientResolver =
@@ -241,9 +241,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
- for (int i = 0; i < numOfThreads; i++) {
-
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
- }
+ populateClientPool(numOfThreads, kafkaConsumerClientFactory, config);
}
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
@@ -295,6 +293,14 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
}
}
+ protected void populateClientPool(int count,
+ GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
+ Config config) {
+ for (int i = 0; i < count; i++) {
+ kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
+ }
+ }
+
private void addTopicSpecificPropsToWorkUnits(List<WorkUnit> workUnits,
Map<String, State> topicSpecificStateMap) {
for (WorkUnit workUnit : workUnits) {
addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);