This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new d847d0f Improve robustness of KafkaConsumerConnector (#3922) d847d0f is described below commit d847d0f75ec2192015e3dcda19f8e5d0bc0926d5 Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Tue Jul 31 22:28:03 2018 +0200 Improve robustness of KafkaConsumerConnector (#3922) * Improve robustness of KafkaConsumerConnector * We schedule a wake-up thread once to wake up the long `poll()` call if it takes too long. Make sure that this wake-up thread is cancelled as soon as possible once the `poll()` has returned - no matter whether it was successful or not. Otherwise the wake-up will be remembered by the consumer and applied on the next `poll()`. See https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets-java.util.Collection- * Catch failures when closing the old consumer during re-creation. In the past, re-creation was interrupted when closing failed and no new consumer was created. * Add more logging such that unexpected or error conditions no more happen unnoticed. * Address review feedback * Simplify code --- .../connector/kafka/KafkaConsumerConnector.scala | 50 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 7111573..7e28f15 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -31,6 +31,7 @@ import whisk.core.connector.MessageConsumer import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{blocking, ExecutionContext, Future} +import scala.util.Failure case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int) @@ -66,10 +67,17 @@ class KafkaConsumerConnector( retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = { // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly. - val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) + val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second) { + consumer.wakeup() + logging.info(this, s"woke up consumer for topic '$topic'") + } try { val response = consumer.poll(duration.toMillis).asScala + + // Cancel the scheduled wake-up task immediately. + wakeUpTask.cancel() + val now = System.currentTimeMillis response.lastOption.foreach(record => offset = record.offset + 1) @@ -79,21 +87,31 @@ class KafkaConsumerConnector( (r.topic, r.partition, r.offset, r.value) } } catch { - // Happens if the peek hangs. case _: WakeupException if retry > 0 => + // Happens if the 'poll()' takes too long. + // This exception should occur iff 'poll()' has been woken up by the scheduled task. + // For this reason, it should not necessary to cancel the task. We cancel the task + // to be on the safe side because an ineffective `wakeup()` applies to the next + // consumer call that can be woken up. + // The scheduler is expected to safely ignore the cancellation of a task that already + // has been cancelled or is already complete. + wakeUpTask.cancel() logging.error(this, s"poll timeout occurred. Retrying $retry more times.") Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway peek(duration, retry - 1) case e: RetriableException if retry > 0 => - logging.error(this, s"$e: Retrying $retry more times") + // Happens if something goes wrong with 'poll()' and 'poll()' can be retried. wakeUpTask.cancel() + logging.error(this, s"poll returned with failure. Retrying $retry more times. Exception: $e") Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway peek(duration, retry - 1) - // Every other error results in a restart of the consumer case e: Throwable => + // Every other error results in a restart of the consumer + wakeUpTask.cancel() + logging.error(this, s"poll returned with failure. Recreating the consumer. Exception: $e") recreateConsumer() throw e - } finally wakeUpTask.cancel() + } } /** @@ -114,8 +132,8 @@ class KafkaConsumerConnector( } override def close(): Unit = { + logging.info(this, s"closing consumer for '$topic'") consumer.close() - logging.info(this, s"closing '$topic' consumer") } /** Creates a new kafka consumer and subscribes to topic list if given. */ @@ -135,9 +153,19 @@ class KafkaConsumerConnector( } private def recreateConsumer(): Unit = { - val oldConsumer = consumer - oldConsumer.close() - logging.info(this, s"old consumer closed") + logging.info(this, s"recreating consumer for '$topic'") + try { + consumer.close() + } catch { + // According to documentation, the consumer is force closed if it cannot be closed gracefully. + // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html + // + // For the moment, we have no special handling of 'InterruptException' - it may be possible or even + // needed to re-try the 'close()' when being interrupted. + case t: Throwable => + logging.error(this, s"failed to close old consumer while recreating: $t") + } + logging.info(this, s"old consumer closed for '$topic'") consumer = createConsumer(topic) } @@ -157,6 +185,10 @@ class KafkaConsumerConnector( } } } + }.andThen { + case Failure(e) => + // Only log level info because failed metric reporting is not critical + logging.info(this, s"lag metric reporting failed for topic '$topic': $e") } } }