This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 97c9f7a Add RestartSource to restart failing Kafka consumer for
user-events service (#4887)
97c9f7a is described below
commit 97c9f7aff7ac58f2230d0fc76849621f1604c233
Author: Seonghyun Oh <[email protected]>
AuthorDate: Thu May 14 21:28:06 2020 +0900
Add RestartSource to restart failing Kafka consumer for user-events service
(#4887)
* Add RestartSource for user-events service
* Add missing configuration
---
.../src/main/resources/application.conf | 6 +++
.../user-events/src/main/resources/reference.conf | 16 ++++++++
.../core/monitoring/metrics/EventConsumer.scala | 45 +++++++++++++---------
.../core/monitoring/metrics/OpenWhiskEvents.scala | 6 ++-
.../src/test/resources/application.conf | 16 ++++++++
.../metrics/PrometheusRecorderTests.scala | 6 +++
6 files changed, 76 insertions(+), 19 deletions(-)
diff --git a/core/monitoring/user-events/src/main/resources/application.conf
b/core/monitoring/user-events/src/main/resources/application.conf
index 8c8cd3e..9e6f0d6 100644
--- a/core/monitoring/user-events/src/main/resources/application.conf
+++ b/core/monitoring/user-events/src/main/resources/application.conf
@@ -15,6 +15,12 @@
# limitations under the License.
#
+akka.kafka.committer {
+
+ max-batch = 20
+
+}
+
akka.kafka.consumer {
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf
b/core/monitoring/user-events/src/main/resources/reference.conf
index 1491e4a..bceaae9 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/main/resources/reference.conf
@@ -31,5 +31,21 @@ whisk {
# rename/relabel prometheus metrics tags
# "namespace" = "ow_namespae"
}
+
+ retry {
+ # minimum (initial) duration until the Kafka consumer is started again
if it is terminated
+ min-backoff = 3 secs
+
+ # the exponential back-off is capped to this duration
+ max-backoff = 30 secs
+
+ # after calculation of the exponential back-off an additional
+ # random delay based on this factor is added, e.g. `0.2` adds up to
`20%` delay
+ random-factor = 0.2
+
+ # the amount of restarts is capped to this amount within a time frame of
minBackoff
+ max-restarts = 10
+ }
+
}
}
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index ab5023c..d890130 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -18,14 +18,14 @@
package org.apache.openwhisk.core.monitoring.metrics
import java.lang.management.ManagementFactory
+import java.util.concurrent.atomic.AtomicReference
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Committer, Consumer}
-import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.{RestartSource, Sink}
import javax.management.ObjectName
import org.apache.kafka.clients.consumer.ConsumerConfig
import kamon.Kamon
@@ -77,25 +77,34 @@ case class EventConsumer(settings: ConsumerSettings[String,
String],
def shutdown(): Future[Done] = {
lagRecorder.cancel()
- control.drainAndShutdown()(system.dispatcher)
+ control.get().drainAndShutdown(result)(system.dispatcher)
}
- def isRunning: Boolean = !control.isShutdown.isCompleted
-
- override def metrics(): Future[Map[MetricName, common.Metric]] =
control.metrics
-
- private val committerSettings = CommitterSettings(system).withMaxBatch(20)
-
- //TODO Use RestartSource
- private val control: DrainingControl[Done] = Consumer
- .committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
- .map { msg =>
- processEvent(msg.record.value())
- msg.committableOffset
+ def isRunning: Boolean = !control.get().isShutdown.isCompleted
+
+ override def metrics(): Future[Map[MetricName, common.Metric]] =
control.get().metrics
+
+ private val committerSettings = CommitterSettings(system)
+ private val control = new
AtomicReference[Consumer.Control](Consumer.NoopControl)
+
+ private val result = RestartSource
+ .onFailuresWithBackoff(
+ minBackoff = metricConfig.retry.minBackoff,
+ maxBackoff = metricConfig.retry.maxBackoff,
+ randomFactor = metricConfig.retry.randomFactor,
+ maxRestarts = metricConfig.retry.maxRestarts) { () =>
+ Consumer
+ .committableSource(updatedSettings,
Subscriptions.topics(userEventTopic))
+ // this is to access to the Consumer.Control
+ // instances of the latest Kafka Consumer source
+ .mapMaterializedValue(c => control.set(c))
+ .map { msg =>
+ processEvent(msg.record.value())
+ msg.committableOffset
+ }
+ .via(Committer.flow(committerSettings))
}
- .toMat(Committer.sink(committerSettings))(Keep.both)
- .mapMaterializedValue(DrainingControl.apply)
- .run()
+ .runWith(Sink.ignore)
private val lagRecorder =
system.scheduler.schedule(10.seconds,
10.seconds)(lagGauge.update(consumerLag))
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index 03f1f3e..034e4e7 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer
import pureconfig._
import pureconfig.generic.auto._
+import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
object OpenWhiskEvents extends SLF4JLogging {
@@ -36,7 +37,10 @@ object OpenWhiskEvents extends SLF4JLogging {
case class MetricConfig(port: Int,
enableKamon: Boolean,
ignoredNamespaces: Set[String],
- renameTags: Map[String, String])
+ renameTags: Map[String, String],
+ retry: RetryConfig)
+
+ case class RetryConfig(minBackoff: FiniteDuration, maxBackoff:
FiniteDuration, randomFactor: Double, maxRestarts: Int)
def start(config: Config)(implicit system: ActorSystem,
materializer: ActorMaterializer):
Future[Http.ServerBinding] = {
diff --git a/core/monitoring/user-events/src/test/resources/application.conf
b/core/monitoring/user-events/src/test/resources/application.conf
index 2c7f65f..957b953 100644
--- a/core/monitoring/user-events/src/test/resources/application.conf
+++ b/core/monitoring/user-events/src/test/resources/application.conf
@@ -29,4 +29,20 @@ user-events {
rename-tags {
#namespace = "ow_namespace"
}
+
+ retry {
+ # minimum (initial) duration until the Kafka consumer is started again if
it is terminated
+ min-backoff = 3 secs
+
+ # the exponential back-off is capped to this duration
+ max-backoff = 30 secs
+
+ # after calculation of the exponential back-off an additional
+ # random delay based on this factor is added, e.g. `0.2` adds up to `20%`
delay
+ random-factor = 0.2
+
+ # the amount of restarts is capped to this amount within a time frame of
minBackoff
+ max-restarts = 10
+ }
+
}
diff --git
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index 3e1e641..a5bec19 100644
---
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -99,6 +99,12 @@ class PrometheusRecorderTests extends KafkaSpecBase with
BeforeAndAfterEach with
| rename-tags {
| namespace = "ow_namespace"
| }
+ | retry {
+ | min-backoff = 3 secs
+ | max-backoff = 30 secs
+ | random-factor = 0.2
+ | max-restarts = 10
+ | }
| }
| }
""".stripMargin)