dubee closed pull request #3922: Improve robustness of KafkaConsumerConnector
URL: https://github.com/apache/incubator-openwhisk/pull/3922
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 7111573ae1..7e28f15025 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -31,6 +31,7 @@ import whisk.core.connector.MessageConsumer
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
+import scala.util.Failure
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: 
Int)
 
@@ -66,10 +67,17 @@ class KafkaConsumerConnector(
                     retry: Int = 3): Iterable[(String, Int, Long, 
Array[Byte])] = {
 
     // poll can be infinitely blocked in edge-cases, so we need to wakeup 
explicitly.
-    val wakeUpTask = 
actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 
1.second)(consumer.wakeup())
+    val wakeUpTask = 
actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 
1.second) {
+      consumer.wakeup()
+      logging.info(this, s"woke up consumer for topic '$topic'")
+    }
 
     try {
       val response = consumer.poll(duration.toMillis).asScala
+
+      // Cancel the scheduled wake-up task immediately.
+      wakeUpTask.cancel()
+
       val now = System.currentTimeMillis
 
       response.lastOption.foreach(record => offset = record.offset + 1)
@@ -79,21 +87,31 @@ class KafkaConsumerConnector(
         (r.topic, r.partition, r.offset, r.value)
       }
     } catch {
-      // Happens if the peek hangs.
       case _: WakeupException if retry > 0 =>
+        // Happens if the 'poll()' takes too long.
+        // This exception should occur iff 'poll()' has been woken up by the 
scheduled task.
+        // For this reason, it should not necessary to cancel the task. We 
cancel the task
+        // to be on the safe side because an ineffective `wakeup()` applies to 
the next
+        // consumer call that can be woken up.
+        // The scheduler is expected to safely ignore the cancellation of a 
task that already
+        // has been cancelled or is already complete.
+        wakeUpTask.cancel()
         logging.error(this, s"poll timeout occurred. Retrying $retry more 
times.")
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, 
since `poll` is blocking anyway
         peek(duration, retry - 1)
       case e: RetriableException if retry > 0 =>
-        logging.error(this, s"$e: Retrying $retry more times")
+        // Happens if something goes wrong with 'poll()' and 'poll()' can be 
retried.
         wakeUpTask.cancel()
+        logging.error(this, s"poll returned with failure. Retrying $retry more 
times. Exception: $e")
         Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, 
since `poll` is blocking anyway
         peek(duration, retry - 1)
-      // Every other error results in a restart of the consumer
       case e: Throwable =>
+        // Every other error results in a restart of the consumer
+        wakeUpTask.cancel()
+        logging.error(this, s"poll returned with failure. Recreating the 
consumer. Exception: $e")
         recreateConsumer()
         throw e
-    } finally wakeUpTask.cancel()
+    }
   }
 
   /**
@@ -114,8 +132,8 @@ class KafkaConsumerConnector(
     }
 
   override def close(): Unit = {
+    logging.info(this, s"closing consumer for '$topic'")
     consumer.close()
-    logging.info(this, s"closing '$topic' consumer")
   }
 
   /** Creates a new kafka consumer and subscribes to topic list if given. */
@@ -135,9 +153,19 @@ class KafkaConsumerConnector(
   }
 
   private def recreateConsumer(): Unit = {
-    val oldConsumer = consumer
-    oldConsumer.close()
-    logging.info(this, s"old consumer closed")
+    logging.info(this, s"recreating consumer for '$topic'")
+    try {
+      consumer.close()
+    } catch {
+      // According to documentation, the consumer is force closed if it cannot 
be closed gracefully.
+      // See 
https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
+      //
+      // For the moment, we have no special handling of 'InterruptException' - 
it may be possible or even
+      // needed to re-try the 'close()' when being interrupted.
+      case t: Throwable =>
+        logging.error(this, s"failed to close old consumer while recreating: 
$t")
+    }
+    logging.info(this, s"old consumer closed for '$topic'")
     consumer = createConsumer(topic)
   }
 
@@ -157,6 +185,10 @@ class KafkaConsumerConnector(
           }
         }
       }
+    }.andThen {
+      case Failure(e) =>
+        // Only log level info because failed metric reporting is not critical
+        logging.info(this, s"lag metric reporting failed for topic '$topic': 
$e")
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to