This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 07eb8e7 [GOBBLIN-1373] make HighLevelConsumer poll for records
without waiting
07eb8e7 is described below
commit 07eb8e70ca0a02503bf4965d4156198ad12505d5
Author: Arjun <[email protected]>
AuthorDate: Tue Apr 13 13:13:18 2021 -0700
[GOBBLIN-1373] make HighLevelConsumer poll for records without waiting
Closes #3215 from arjun4084346/consumerImprovement
---
.../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 4eb146b..c9838e0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -107,6 +107,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
private final int offsetsCommitNumRecordsThreshold;
private final int offsetsCommitTimeThresholdSecs;
private long lastCommitTime = System.currentTimeMillis();
+ protected volatile boolean shutdownRequested = false;
private static final Config FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -138,7 +139,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
this.consumerExecutor =
Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
- for(int i=0;i<queues.length;i++) {
+ for(int i=0; i<queues.length; i++) {
this.queues[i] = new LinkedBlockingQueue();
}
this.recordsProcessed = new AtomicInteger(0);
@@ -211,12 +212,11 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
// Method that starts threads that processes queues
processQueues();
// Main thread that constantly polls messages from kafka
- consumerExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
+ consumerExecutor.execute(() -> {
+ while (!shutdownRequested) {
consume();
}
- }, 0, 50, TimeUnit.MILLISECONDS);
+ });
}
/**
@@ -277,6 +277,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
@Override
public void shutDown() {
+ shutdownRequested = true;
ExecutorsUtils.shutdownExecutorService(this.consumerExecutor,
Optional.of(log), 5000, TimeUnit.MILLISECONDS);
ExecutorsUtils.shutdownExecutorService(this.queueExecutor,
Optional.of(log), 5000, TimeUnit.MILLISECONDS);
try {
@@ -304,7 +305,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
public void run() {
log.info("Starting queue processing.. " +
Thread.currentThread().getName());
try {
- while(true) {
+ while (true) {
KafkaConsumerRecord record = queue.take();
messagesRead.inc();
HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);