markusthoemmes commented on a change in pull request #2425: Throttle message 
bus consumption.
URL: 
https://github.com/apache/incubator-openwhisk/pull/2425#discussion_r124199644
 
 

 ##########
 File path: 
common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
 ##########
 @@ -38,15 +47,157 @@ trait MessageConsumer {
      * Commits offsets from last peek operation to ensure they are removed
      * from the connector.
      */
-    def commit()
-
-    /**
-     * Calls process for every message received. Process receives a tuple
-     * (topic, partition, offset, and message as byte array).
-     */
-    def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit
+    def commit(): Unit
 
     /** Closes consumer. */
     def close(): Unit
 
 }
+
+object MessageFeed {
+    protected sealed trait FeedState
+    protected[connector] case object Uninitialized extends FeedState
+    protected[connector] case object FillingPipeline extends FeedState
+    protected[connector] case object DrainingPipeline extends FeedState
+
+    protected sealed trait FeedData
+    private case object NoData extends FeedData
+
+    /** Indicated the consumer is ready to accept messages from the message 
bus for processing. */
+    object Ready
+
+    /** Steady state message, indicates capacity in downstream process to 
receive more messages. */
+    object Processed
+}
+
+/**
+ * This actor polls the message bus for new messages and dispatches them to 
the given
+ * handler. The actor tracks the number of messages dispatched and will not 
dispatch new
+ * messages until some number of them are acknowledged.
+ *
+ * This is used by the invoker to pull messages from the message bus and apply 
back pressure
+ * when the invoker does not have resources to complete processing messages 
(i.e., no containers
+ * are available to run new actions). It is also used in the load balancer to 
consume active
+ * ack messages.
+ * When the invoker releases resources (by reclaiming containers) it will send 
a message
+ * to this actor which will then attempt to fill the pipeline with new 
messages.
+ *
+ * The actor tries to fill the pipeline with additional messages while the 
number
+ * of outstanding requests is below the pipeline fill threshold.
+ */
+@throws[IllegalArgumentException]
+class MessageFeed(
+    description: String,
+    logging: Logging,
+    consumer: MessageConsumer,
+    maxPipelineDepth: Int,
+    longPollDuration: FiniteDuration,
+    handler: Array[Byte] => Future[Unit],
+    autoStart: Boolean = true)
+    extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
+    import MessageFeed._
+
+    require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more 
messages per peek than permitted by max depth")
+
+    private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+    private var pipelineOccupancy = 0
+    private implicit val tid = TransactionId.dispatcher
+
+    startWith(Uninitialized, MessageFeed.NoData)
+
+    onTransition {
+        case _ -> FillingPipeline => fillPipeline()
+    }
+
+    when(Uninitialized) {
+        case Event(Ready, _) =>
+            goto(FillingPipeline)
+
+        case _ => stay
+    }
+
+    // fill pipeline either periodically due to a scheduled alarm or when
+    // previously queued messages are released
+    when(FillingPipeline, longPollDuration) {
+        case Event(Processed, _) =>
+            updateOccupancy()
+            if (haveCapacity) {
+                fillPipeline()
+                stay
+            } else {
+                goto(DrainingPipeline)
+            }
+
+        case Event(StateTimeout, _) =>
+            if (haveCapacity) {
+                fillPipeline()
+                stay
+            } else {
+                goto(DrainingPipeline)
+            }
+    }
+
+    when(DrainingPipeline) {
+        case Event(Processed, _) =>
+            // fill if there is room otherwise stay
+            updateOccupancy()
+            if (haveCapacity) {
+                goto(FillingPipeline)
+            } else stay
+
+        case _ => stay
+    }
+
+    if (autoStart) self ! Ready
+
+    private def fillPipeline(): Unit = {
+        if (pipelineOccupancy <= pipelineFillThreshold) {
+            Try {
+                // Grab next batch of messages and commit offsets immediately
+                // essentially marking the activation as having satisfied "at 
most once"
+                // semantics (this is the point at which the activation is 
considered started).
+                // If the commit fails, then messages peeked are peeked again 
on the next poll.
+                // While the commit is synchronous and will block until it 
completes, at steady
+                // state with enough buffering (i.e., maxPipelineDepth > 
maxPeek), the latency
+                // of the commit should be masked.
+                val records = consumer.peek(longPollDuration)
+                consumer.commit()
 
 Review comment:
   Are these blocking operations? They'll block the actor and prevent it from 
receiving potentially state-changing messages. Is that expected?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to