This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e99911f Close the consumer when WakeupExcpetion happened (#4459)
e99911f is described below
commit e99911fdf0ea5908b2ed924141c167f72db203c1
Author: ningyougang <[email protected]>
AuthorDate: Thu Jul 25 16:08:05 2019 +0800
Close the consumer when WakeupExcpetion happened (#4459)
---
.../apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index b75c689..db4cfe3 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -132,6 +132,9 @@ class KafkaConsumerConnector(
} else {
throw e
}
+ case e: WakeupException =>
+ logging.info(this, s"WakeupException happened when do commit action
for topic ${topic}")
+ recreateConsumer()
}
override def close(): Unit = synchronized {
@@ -196,6 +199,8 @@ class KafkaConsumerConnector(
}
}
}.andThen {
+ case Failure(_: WakeupException) =>
+ recreateConsumer()
case Failure(e) =>
// Only log level info because failed metric reporting is not critical
logging.info(this, s"lag metric reporting failed for topic '$topic':
$e")