This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 97c9f7a  Add RestartSource to restart failing Kafka consumer for 
user-events service (#4887)
97c9f7a is described below

commit 97c9f7aff7ac58f2230d0fc76849621f1604c233
Author: Seonghyun Oh <[email protected]>
AuthorDate: Thu May 14 21:28:06 2020 +0900

    Add RestartSource to restart failing Kafka consumer for user-events service 
(#4887)
    
    * Add RestartSource for user-events service
    
    * Add missing configuration
---
 .../src/main/resources/application.conf            |  6 +++
 .../user-events/src/main/resources/reference.conf  | 16 ++++++++
 .../core/monitoring/metrics/EventConsumer.scala    | 45 +++++++++++++---------
 .../core/monitoring/metrics/OpenWhiskEvents.scala  |  6 ++-
 .../src/test/resources/application.conf            | 16 ++++++++
 .../metrics/PrometheusRecorderTests.scala          |  6 +++
 6 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/core/monitoring/user-events/src/main/resources/application.conf 
b/core/monitoring/user-events/src/main/resources/application.conf
index 8c8cd3e..9e6f0d6 100644
--- a/core/monitoring/user-events/src/main/resources/application.conf
+++ b/core/monitoring/user-events/src/main/resources/application.conf
@@ -15,6 +15,12 @@
 # limitations under the License.
 #
 
+akka.kafka.committer {
+
+  max-batch = 20
+
+}
+
 akka.kafka.consumer {
   # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
   # can be defined in this configuration section.
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf 
b/core/monitoring/user-events/src/main/resources/reference.conf
index 1491e4a..bceaae9 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/main/resources/reference.conf
@@ -31,5 +31,21 @@ whisk {
       # rename/relabel prometheus metrics tags
       # "namespace" = "ow_namespae"
     }
+
+    retry {
+      # minimum (initial) duration until the Kafka consumer is started again 
if it is terminated
+      min-backoff = 3 secs
+
+      # the exponential back-off is capped to this duration
+      max-backoff = 30 secs
+
+      # after calculation of the exponential back-off an additional
+      # random delay based on this factor is added, e.g. `0.2` adds up to 
`20%` delay
+      random-factor = 0.2
+
+      # the amount of restarts is capped to this amount within a time frame of 
minBackoff
+      max-restarts = 10
+    }
+
   }
 }
diff --git 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index ab5023c..d890130 100644
--- 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -18,14 +18,14 @@
 package org.apache.openwhisk.core.monitoring.metrics
 
 import java.lang.management.ManagementFactory
+import java.util.concurrent.atomic.AtomicReference
 
 import akka.Done
 import akka.actor.ActorSystem
 import akka.kafka.scaladsl.{Committer, Consumer}
-import akka.kafka.scaladsl.Consumer.DrainingControl
 import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.{RestartSource, Sink}
 import javax.management.ObjectName
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import kamon.Kamon
@@ -77,25 +77,34 @@ case class EventConsumer(settings: ConsumerSettings[String, 
String],
 
   def shutdown(): Future[Done] = {
     lagRecorder.cancel()
-    control.drainAndShutdown()(system.dispatcher)
+    control.get().drainAndShutdown(result)(system.dispatcher)
   }
 
-  def isRunning: Boolean = !control.isShutdown.isCompleted
-
-  override def metrics(): Future[Map[MetricName, common.Metric]] = 
control.metrics
-
-  private val committerSettings = CommitterSettings(system).withMaxBatch(20)
-
-  //TODO Use RestartSource
-  private val control: DrainingControl[Done] = Consumer
-    .committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
-    .map { msg =>
-      processEvent(msg.record.value())
-      msg.committableOffset
+  def isRunning: Boolean = !control.get().isShutdown.isCompleted
+
+  override def metrics(): Future[Map[MetricName, common.Metric]] = 
control.get().metrics
+
+  private val committerSettings = CommitterSettings(system)
+  private val control = new 
AtomicReference[Consumer.Control](Consumer.NoopControl)
+
+  private val result = RestartSource
+    .onFailuresWithBackoff(
+      minBackoff = metricConfig.retry.minBackoff,
+      maxBackoff = metricConfig.retry.maxBackoff,
+      randomFactor = metricConfig.retry.randomFactor,
+      maxRestarts = metricConfig.retry.maxRestarts) { () =>
+      Consumer
+        .committableSource(updatedSettings, 
Subscriptions.topics(userEventTopic))
+        // this is to access to the Consumer.Control
+        // instances of the latest Kafka Consumer source
+        .mapMaterializedValue(c => control.set(c))
+        .map { msg =>
+          processEvent(msg.record.value())
+          msg.committableOffset
+        }
+        .via(Committer.flow(committerSettings))
     }
-    .toMat(Committer.sink(committerSettings))(Keep.both)
-    .mapMaterializedValue(DrainingControl.apply)
-    .run()
+    .runWith(Sink.ignore)
 
   private val lagRecorder =
     system.scheduler.schedule(10.seconds, 
10.seconds)(lagGauge.update(consumerLag))
diff --git 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index 03f1f3e..034e4e7 100644
--- 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++ 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
@@ -36,7 +37,10 @@ object OpenWhiskEvents extends SLF4JLogging {
   case class MetricConfig(port: Int,
                           enableKamon: Boolean,
                           ignoredNamespaces: Set[String],
-                          renameTags: Map[String, String])
+                          renameTags: Map[String, String],
+                          retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: 
FiniteDuration, randomFactor: Double, maxRestarts: Int)
 
   def start(config: Config)(implicit system: ActorSystem,
                             materializer: ActorMaterializer): 
Future[Http.ServerBinding] = {
diff --git a/core/monitoring/user-events/src/test/resources/application.conf 
b/core/monitoring/user-events/src/test/resources/application.conf
index 2c7f65f..957b953 100644
--- a/core/monitoring/user-events/src/test/resources/application.conf
+++ b/core/monitoring/user-events/src/test/resources/application.conf
@@ -29,4 +29,20 @@ user-events {
   rename-tags {
     #namespace = "ow_namespace"
   }
+
+  retry {
+    # minimum (initial) duration until the Kafka consumer is started again if 
it is terminated
+    min-backoff = 3 secs
+
+    # the exponential back-off is capped to this duration
+    max-backoff = 30 secs
+
+    # after calculation of the exponential back-off an additional
+    # random delay based on this factor is added, e.g. `0.2` adds up to `20%` 
delay
+    random-factor = 0.2
+
+    # the amount of restarts is capped to this amount within a time frame of 
minBackoff
+    max-restarts = 10
+  }
+
 }
diff --git 
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
 
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index 3e1e641..a5bec19 100644
--- 
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ 
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -99,6 +99,12 @@ class PrometheusRecorderTests extends KafkaSpecBase with 
BeforeAndAfterEach with
             |    rename-tags {
             |      namespace = "ow_namespace"
             |    }
+            |    retry {
+            |      min-backoff = 3 secs
+            |      max-backoff = 30 secs
+            |      random-factor = 0.2
+            |      max-restarts = 10
+            |    }
             |  }
             | }
          """.stripMargin)

Reply via email to