This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 07eb8e7  [GOBBLIN-1373] make HighLevelConsumer poll for records 
without waiting
07eb8e7 is described below

commit 07eb8e70ca0a02503bf4965d4156198ad12505d5
Author: Arjun <[email protected]>
AuthorDate: Tue Apr 13 13:13:18 2021 -0700

    [GOBBLIN-1373] make HighLevelConsumer poll for records without waiting
    
    Closes #3215 from arjun4084346/consumerImprovement
---
 .../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 4eb146b..c9838e0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -107,6 +107,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
   private final int offsetsCommitNumRecordsThreshold;
   private final int offsetsCommitTimeThresholdSecs;
   private long lastCommitTime = System.currentTimeMillis();
+  protected volatile boolean shutdownRequested = false;
 
   private static final Config FALLBACK =
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -138,7 +139,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
     this.consumerExecutor = 
Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log),
 Optional.of("HighLevelConsumerThread")));
     this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, 
ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("QueueProcessor-%d")));
     this.queues = new LinkedBlockingQueue[numThreads];
-    for(int i=0;i<queues.length;i++) {
+    for(int i=0; i<queues.length; i++) {
       this.queues[i] = new LinkedBlockingQueue();
     }
     this.recordsProcessed = new AtomicInteger(0);
@@ -211,12 +212,11 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
     // Method that starts threads that processes queues
     processQueues();
     // Main thread that constantly polls messages from kafka
-    consumerExecutor.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
+    consumerExecutor.execute(() -> {
+      while (!shutdownRequested) {
         consume();
       }
-    }, 0, 50, TimeUnit.MILLISECONDS);
+    });
   }
 
   /**
@@ -277,6 +277,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
 
   @Override
   public void shutDown() {
+    shutdownRequested = true;
     ExecutorsUtils.shutdownExecutorService(this.consumerExecutor, 
Optional.of(log), 5000, TimeUnit.MILLISECONDS);
     ExecutorsUtils.shutdownExecutorService(this.queueExecutor, 
Optional.of(log), 5000, TimeUnit.MILLISECONDS);
     try {
@@ -304,7 +305,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
     public void run() {
       log.info("Starting queue processing.. " + 
Thread.currentThread().getName());
       try {
-        while(true) {
+        while (true) {
           KafkaConsumerRecord record = queue.take();
           messagesRead.inc();
           HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);

Reply via email to