dubee closed pull request #3922: Improve robustness of KafkaConsumerConnector URL: https://github.com/apache/incubator-openwhisk/pull/3922
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 7111573ae1..7e28f15025 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -31,6 +31,7 @@ import whisk.core.connector.MessageConsumer import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{blocking, ExecutionContext, Future} +import scala.util.Failure case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int) @@ -66,10 +67,17 @@ class KafkaConsumerConnector( retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = { // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly. - val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup()) + val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second) { + consumer.wakeup() + logging.info(this, s"woke up consumer for topic '$topic'") + } try { val response = consumer.poll(duration.toMillis).asScala + + // Cancel the scheduled wake-up task immediately. + wakeUpTask.cancel() + val now = System.currentTimeMillis response.lastOption.foreach(record => offset = record.offset + 1) @@ -79,21 +87,31 @@ class KafkaConsumerConnector( (r.topic, r.partition, r.offset, r.value) } } catch { - // Happens if the peek hangs. case _: WakeupException if retry > 0 => + // Happens if the 'poll()' takes too long. + // This exception should occur iff 'poll()' has been woken up by the scheduled task. + // For this reason, it should not necessary to cancel the task. We cancel the task + // to be on the safe side because an ineffective `wakeup()` applies to the next + // consumer call that can be woken up. + // The scheduler is expected to safely ignore the cancellation of a task that already + // has been cancelled or is already complete. + wakeUpTask.cancel() logging.error(this, s"poll timeout occurred. Retrying $retry more times.") Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway peek(duration, retry - 1) case e: RetriableException if retry > 0 => - logging.error(this, s"$e: Retrying $retry more times") + // Happens if something goes wrong with 'poll()' and 'poll()' can be retried. wakeUpTask.cancel() + logging.error(this, s"poll returned with failure. Retrying $retry more times. Exception: $e") Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway peek(duration, retry - 1) - // Every other error results in a restart of the consumer case e: Throwable => + // Every other error results in a restart of the consumer + wakeUpTask.cancel() + logging.error(this, s"poll returned with failure. Recreating the consumer. Exception: $e") recreateConsumer() throw e - } finally wakeUpTask.cancel() + } } /** @@ -114,8 +132,8 @@ class KafkaConsumerConnector( } override def close(): Unit = { + logging.info(this, s"closing consumer for '$topic'") consumer.close() - logging.info(this, s"closing '$topic' consumer") } /** Creates a new kafka consumer and subscribes to topic list if given. */ @@ -135,9 +153,19 @@ class KafkaConsumerConnector( } private def recreateConsumer(): Unit = { - val oldConsumer = consumer - oldConsumer.close() - logging.info(this, s"old consumer closed") + logging.info(this, s"recreating consumer for '$topic'") + try { + consumer.close() + } catch { + // According to documentation, the consumer is force closed if it cannot be closed gracefully. + // See https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html + // + // For the moment, we have no special handling of 'InterruptException' - it may be possible or even + // needed to re-try the 'close()' when being interrupted. + case t: Throwable => + logging.error(this, s"failed to close old consumer while recreating: $t") + } + logging.info(this, s"old consumer closed for '$topic'") consumer = createConsumer(topic) } @@ -157,6 +185,10 @@ class KafkaConsumerConnector( } } } + }.andThen { + case Failure(e) => + // Only log level info because failed metric reporting is not critical + logging.info(this, s"lag metric reporting failed for topic '$topic': $e") } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services