markusthoemmes commented on a change in pull request #2425: Throttle message
bus consumption.
URL:
https://github.com/apache/incubator-openwhisk/pull/2425#discussion_r124199644
##########
File path:
common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
##########
@@ -38,15 +47,157 @@ trait MessageConsumer {
* Commits offsets from last peek operation to ensure they are removed
* from the connector.
*/
- def commit()
-
- /**
- * Calls process for every message received. Process receives a tuple
- * (topic, partition, offset, and message as byte array).
- */
- def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit
+ def commit(): Unit
/** Closes consumer. */
def close(): Unit
}
+
+object MessageFeed {
+ protected sealed trait FeedState
+ protected[connector] case object Uninitialized extends FeedState
+ protected[connector] case object FillingPipeline extends FeedState
+ protected[connector] case object DrainingPipeline extends FeedState
+
+ protected sealed trait FeedData
+ private case object NoData extends FeedData
+
+ /** Indicated the consumer is ready to accept messages from the message
bus for processing. */
+ object Ready
+
+ /** Steady state message, indicates capacity in downstream process to
receive more messages. */
+ object Processed
+}
+
+/**
+ * This actor polls the message bus for new messages and dispatches them to
the given
+ * handler. The actor tracks the number of messages dispatched and will not
dispatch new
+ * messages until some number of them are acknowledged.
+ *
+ * This is used by the invoker to pull messages from the message bus and apply
back pressure
+ * when the invoker does not have resources to complete processing messages
(i.e., no containers
+ * are available to run new actions). It is also used in the load balancer to
consume active
+ * ack messages.
+ * When the invoker releases resources (by reclaiming containers) it will send
a message
+ * to this actor which will then attempt to fill the pipeline with new
messages.
+ *
+ * The actor tries to fill the pipeline with additional messages while the
number
+ * of outstanding requests is below the pipeline fill threshold.
+ */
+@throws[IllegalArgumentException]
+class MessageFeed(
+ description: String,
+ logging: Logging,
+ consumer: MessageConsumer,
+ maxPipelineDepth: Int,
+ longPollDuration: FiniteDuration,
+ handler: Array[Byte] => Future[Unit],
+ autoStart: Boolean = true)
+ extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
+ import MessageFeed._
+
+ require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more
messages per peek than permitted by max depth")
+
+ private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+ private var pipelineOccupancy = 0
+ private implicit val tid = TransactionId.dispatcher
+
+ startWith(Uninitialized, MessageFeed.NoData)
+
+ onTransition {
+ case _ -> FillingPipeline => fillPipeline()
+ }
+
+ when(Uninitialized) {
+ case Event(Ready, _) =>
+ goto(FillingPipeline)
+
+ case _ => stay
+ }
+
+ // fill pipeline either periodically due to a scheduled alarm or when
+ // previously queued messages are released
+ when(FillingPipeline, longPollDuration) {
+ case Event(Processed, _) =>
+ updateOccupancy()
+ if (haveCapacity) {
+ fillPipeline()
+ stay
+ } else {
+ goto(DrainingPipeline)
+ }
+
+ case Event(StateTimeout, _) =>
+ if (haveCapacity) {
+ fillPipeline()
+ stay
+ } else {
+ goto(DrainingPipeline)
+ }
+ }
+
+ when(DrainingPipeline) {
+ case Event(Processed, _) =>
+ // fill if there is room otherwise stay
+ updateOccupancy()
+ if (haveCapacity) {
+ goto(FillingPipeline)
+ } else stay
+
+ case _ => stay
+ }
+
+ if (autoStart) self ! Ready
+
+ private def fillPipeline(): Unit = {
+ if (pipelineOccupancy <= pipelineFillThreshold) {
+ Try {
+ // Grab next batch of messages and commit offsets immediately
+ // essentially marking the activation as having satisfied "at
most once"
+ // semantics (this is the point at which the activation is
considered started).
+ // If the commit fails, then messages peeked are peeked again
on the next poll.
+ // While the commit is synchronous and will block until it
completes, at steady
+ // state with enough buffering (i.e., maxPipelineDepth >
maxPeek), the latency
+ // of the commit should be masked.
+ val records = consumer.peek(longPollDuration)
+ consumer.commit()
Review comment:
Are these blocking operations? They'll block the actor and prevent it from
receiving potentially state-changing messages. Is that expected?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services