This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d847d0f  Improve robustness of KafkaConsumerConnector (#3922)
d847d0f is described below

commit d847d0f75ec2192015e3dcda19f8e5d0bc0926d5
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Tue Jul 31 22:28:03 2018 +0200

    Improve robustness of KafkaConsumerConnector (#3922)
    
    * Improve robustness of KafkaConsumerConnector
    
    * We schedule a wake-up thread once to wake up the long `poll()` call if it 
takes too long. Make sure that this wake-up thread is cancelled as soon as 
possible once the `poll()` has returned - no matter whether it was successful 
or not. Otherwise the wake-up will be remembered by the consumer and applied on 
the next `poll()`. See 
https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets-java.util.Collection-
    * Catch failures when closing the old consumer during re-creation. In the 
past, re-creation was interrupted when closing failed and no new consumer was 
created.
    * Add more logging such that unexpected or error conditions no more happen 
unnoticed.
    
    * Address review feedback
    
    * Simplify code
---
 .../connector/kafka/KafkaConsumerConnector.scala   | 50 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 7111573..7e28f15 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -31,6 +31,7 @@ import whisk.core.connector.MessageConsumer
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
+import scala.util.Failure
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: 
Int)
 
@@ -66,10 +67,17 @@ class KafkaConsumerConnector(
                     retry: Int = 3): Iterable[(String, Int, Long, 
Array[Byte])] = {
 
     // poll can be infinitely blocked in edge-cases, so we need to wakeup 
explicitly.
-    val wakeUpTask = 
actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 
1.second)(consumer.wakeup())
+    val wakeUpTask = 
actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 
1.second) {
+      consumer.wakeup()
+      logging.info(this, s"woke up consumer for topic '$topic'")
+    }
 
     try {
       val response = consumer.poll(duration.toMillis).asScala
+
+      // Cancel the scheduled wake-up task immediately.
+      wakeUpTask.cancel()
+
       val now = System.currentTimeMillis
 
       response.lastOption.foreach(record => offset = record.offset + 1)
@@ -79,21 +87,31 @@ class KafkaConsumerConnector(
         (r.topic, r.partition, r.offset, r.value)
       }
     } catch {
-      // Happens if the peek hangs.
       case _: WakeupException if retry > 0 =>
+        // Happens if the 'poll()' takes too long.
+        // This exception should occur iff 'poll()' has been woken up by the 
scheduled task.
+        // For this reason, it should not necessary to cancel the task. We 
cancel the task
+        // to be on the safe side because an ineffective `wakeup()` applies to 
the next
+        // consumer call that can be woken up.
+        // The scheduler is expected to safely ignore the cancellation of a 
task that already
+        // has been cancelled or is already complete.
+        wakeUpTask.cancel()
         logging.error(this, s"poll timeout occurred. Retrying $retry more 
times.")
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, 
since `poll` is blocking anyway
         peek(duration, retry - 1)
       case e: RetriableException if retry > 0 =>
-        logging.error(this, s"$e: Retrying $retry more times")
+        // Happens if something goes wrong with 'poll()' and 'poll()' can be 
retried.
         wakeUpTask.cancel()
+        logging.error(this, s"poll returned with failure. Retrying $retry more 
times. Exception: $e")
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, 
since `poll` is blocking anyway
         peek(duration, retry - 1)
-      // Every other error results in a restart of the consumer
       case e: Throwable =>
+        // Every other error results in a restart of the consumer
+        wakeUpTask.cancel()
+        logging.error(this, s"poll returned with failure. Recreating the 
consumer. Exception: $e")
         recreateConsumer()
         throw e
-    } finally wakeUpTask.cancel()
+    }
   }
 
   /**
@@ -114,8 +132,8 @@ class KafkaConsumerConnector(
     }
 
   override def close(): Unit = {
+    logging.info(this, s"closing consumer for '$topic'")
     consumer.close()
-    logging.info(this, s"closing '$topic' consumer")
   }
 
   /** Creates a new kafka consumer and subscribes to topic list if given. */
@@ -135,9 +153,19 @@ class KafkaConsumerConnector(
   }
 
   private def recreateConsumer(): Unit = {
-    val oldConsumer = consumer
-    oldConsumer.close()
-    logging.info(this, s"old consumer closed")
+    logging.info(this, s"recreating consumer for '$topic'")
+    try {
+      consumer.close()
+    } catch {
+      // According to documentation, the consumer is force closed if it cannot 
be closed gracefully.
+      // See 
https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
+      //
+      // For the moment, we have no special handling of 'InterruptException' - 
it may be possible or even
+      // needed to re-try the 'close()' when being interrupted.
+      case t: Throwable =>
+        logging.error(this, s"failed to close old consumer while recreating: 
$t")
+    }
+    logging.info(this, s"old consumer closed for '$topic'")
     consumer = createConsumer(topic)
   }
 
@@ -157,6 +185,10 @@ class KafkaConsumerConnector(
           }
         }
       }
+    }.andThen {
+      case Failure(e) =>
+        // Only log level info because failed metric reporting is not critical
+        logging.info(this, s"lag metric reporting failed for topic '$topic': 
$e")
     }
   }
 }

Reply via email to