tysonnorris commented on a change in pull request #2650: Apply standard scala
formatting.
URL:
https://github.com/apache/incubator-openwhisk/pull/2650#discussion_r134611006
##########
File path:
common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
##########
@@ -92,140 +92,151 @@ object MessageFeed {
* of outstanding requests is below the pipeline fill threshold.
*/
@throws[IllegalArgumentException]
-class MessageFeed(
- description: String,
- logging: Logging,
- consumer: MessageConsumer,
- maximumHandlerCapacity: Int,
- longPollDuration: FiniteDuration,
- handler: Array[Byte] => Future[Unit],
- autoStart: Boolean = true,
- logHandoff: Boolean = true)
+class MessageFeed(description: String,
+ logging: Logging,
+ consumer: MessageConsumer,
+ maximumHandlerCapacity: Int,
+ longPollDuration: FiniteDuration,
+ handler: Array[Byte] => Future[Unit],
+ autoStart: Boolean = true,
+ logHandoff: Boolean = true)
extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
- import MessageFeed._
-
- // double-buffer to make up for message bus read overhead
- val maxPipelineDepth = maximumHandlerCapacity * 2
- private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
-
- require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more
messages per peek than permitted by max depth")
-
- private val outstandingMessages = mutable.Queue[(String, Int, Long,
Array[Byte])]()
- private var handlerCapacity = maximumHandlerCapacity
-
- private implicit val tid = TransactionId.dispatcher
-
- logging.info(this, s"handler capacity = $maximumHandlerCapacity, pipeline
fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
-
- when(Idle) {
- case Event(Ready, _) =>
- fillPipeline()
- goto(FillingPipeline)
-
- case _ => stay
- }
-
- // wait for fill to complete, and keep filling if there is
- // capacity otherwise wait to drain
- when(FillingPipeline) {
- case Event(Processed, _) =>
- updateHandlerCapacity()
- sendOutstandingMessages()
- stay
-
- case Event(FillCompleted(messages), _) =>
- outstandingMessages.enqueue(messages: _*)
- sendOutstandingMessages()
-
- if (shouldFillQueue()) {
- fillPipeline()
- stay
- } else {
- goto(DrainingPipeline)
- }
-
- case _ => stay
- }
-
- when(DrainingPipeline) {
- case Event(Processed, _) =>
- updateHandlerCapacity()
- sendOutstandingMessages()
- if (shouldFillQueue()) {
- fillPipeline()
- goto(FillingPipeline)
- } else stay
-
- case _ => stay
- }
-
- onTransition { case _ -> Idle => if (autoStart) self ! Ready }
- startWith(Idle, MessageFeed.NoData)
- initialize()
-
- private implicit val ec = context.system.dispatcher
-
- private def fillPipeline(): Unit = {
- if (outstandingMessages.size <= pipelineFillThreshold) {
- Future {
- blocking {
- // Grab next batch of messages and commit offsets
immediately
- // essentially marking the activation as having satisfied
"at most once"
- // semantics (this is the point at which the activation is
considered started).
- // If the commit fails, then messages peeked are peeked
again on the next poll.
- // While the commit is synchronous and will block until it
completes, at steady
- // state with enough buffering (i.e., maxPipelineDepth >
maxPeek), the latency
- // of the commit should be masked.
- val records = consumer.peek(longPollDuration)
- consumer.commit()
- FillCompleted(records.toSeq)
- }
- }.andThen {
- case Failure(e: CommitFailedException) => logging.error(this,
s"failed to commit $description consumer offset: $e")
- case Failure(e: Throwable) => logging.error(this,
s"exception while pulling new $description records: $e")
- }.recover {
- case _ => FillCompleted(Seq.empty)
- }.pipeTo(self)
- } else {
- logging.error(this, s"dropping fill request until $description
feed is drained")
+ import MessageFeed._
+
+ // double-buffer to make up for message bus read overhead
+ val maxPipelineDepth = maximumHandlerCapacity * 2
+ private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+
+ require(consumer.maxPeek <= maxPipelineDepth,
+ "consumer may not yield more messages per peek than permitted by max
depth")
+
+ private val outstandingMessages =
+ mutable.Queue[(String, Int, Long, Array[Byte])]()
+ private var handlerCapacity = maximumHandlerCapacity
+
+ private implicit val tid = TransactionId.dispatcher
+
+ logging.info(
+ this,
+ s"handler capacity = $maximumHandlerCapacity, pipeline fill at =
$pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
+
+ when(Idle) {
+ case Event(Ready, _) =>
+ fillPipeline()
+ goto(FillingPipeline)
+
+ case _ => stay
+ }
+
+ // wait for fill to complete, and keep filling if there is
+ // capacity otherwise wait to drain
+ when(FillingPipeline) {
+ case Event(Processed, _) =>
+ updateHandlerCapacity()
+ sendOutstandingMessages()
+ stay
+
+ case Event(FillCompleted(messages), _) =>
+ outstandingMessages.enqueue(messages: _*)
+ sendOutstandingMessages()
+
+ if (shouldFillQueue()) {
+ fillPipeline()
+ stay
+ } else {
+ goto(DrainingPipeline)
+ }
+
+ case _ => stay
+ }
+
+ when(DrainingPipeline) {
+ case Event(Processed, _) =>
+ updateHandlerCapacity()
+ sendOutstandingMessages()
+ if (shouldFillQueue()) {
+ fillPipeline()
+ goto(FillingPipeline)
+ } else stay
+
+ case _ => stay
+ }
+
+ onTransition { case _ -> Idle => if (autoStart) self ! Ready }
+ startWith(Idle, MessageFeed.NoData)
+ initialize()
+
+ private implicit val ec = context.system.dispatcher
+
+ private def fillPipeline(): Unit = {
+ if (outstandingMessages.size <= pipelineFillThreshold) {
+ Future {
+ blocking {
+ // Grab next batch of messages and commit offsets immediately
+ // essentially marking the activation as having satisfied "at most
once"
+ // semantics (this is the point at which the activation is
considered started).
+ // If the commit fails, then messages peeked are peeked again on the
next poll.
+ // While the commit is synchronous and will block until it
completes, at steady
+ // state with enough buffering (i.e., maxPipelineDepth > maxPeek),
the latency
+ // of the commit should be masked.
+ val records = consumer.peek(longPollDuration)
+ consumer.commit()
+ FillCompleted(records.toSeq)
}
+ }.andThen {
+ case Failure(e: CommitFailedException) =>
+ logging.error(this, s"failed to commit $description consumer
offset: $e")
+ case Failure(e: Throwable) =>
+ logging.error(this, s"exception while pulling new $description
records: $e")
+ }
+ .recover {
+ case _ => FillCompleted(Seq.empty)
+ }
+ .pipeTo(self)
+ } else {
+ logging.error(this, s"dropping fill request until $description feed is
drained")
}
+ }
- /** Send as many messages as possible to the handler. */
- @tailrec
- private def sendOutstandingMessages(): Unit = {
- val occupancy = outstandingMessages.size
- if (occupancy > 0 && handlerCapacity > 0) {
- val (topic, partition, offset, bytes) =
outstandingMessages.dequeue()
+ /** Send as many messages as possible to the handler. */
+ @tailrec
+ private def sendOutstandingMessages(): Unit = {
+ val occupancy = outstandingMessages.size
+ if (occupancy > 0 && handlerCapacity > 0) {
+ val (topic, partition, offset, bytes) = outstandingMessages.dequeue()
- if (logHandoff) logging.info(this, s"processing
$topic[$partition][$offset] ($occupancy/$handlerCapacity)")
- handler(bytes)
- handlerCapacity -= 1
+ if (logHandoff)
+ logging.info(this, s"processing $topic[$partition][$offset]
($occupancy/$handlerCapacity)")
+ handler(bytes)
+ handlerCapacity -= 1
- sendOutstandingMessages()
- }
+ sendOutstandingMessages()
}
-
- private def shouldFillQueue(): Boolean = {
- val occupancy = outstandingMessages.size
- if (occupancy <= pipelineFillThreshold) {
- logging.debug(this, s"$description pipeline has capacity:
$occupancy <= $pipelineFillThreshold ($handlerCapacity)")
- true
- } else {
- logging.debug(this, s"$description pipeline must drain: $occupancy
> $pipelineFillThreshold")
- false
- }
+ }
+
+ private def shouldFillQueue(): Boolean = {
+ val occupancy = outstandingMessages.size
+ if (occupancy <= pipelineFillThreshold) {
+ logging.debug(
+ this,
+ s"$description pipeline has capacity: $occupancy <=
$pipelineFillThreshold ($handlerCapacity)")
Review comment:
is it possible to wrap only the last arg, instead of causing all args to
appear wrapped? like:
```
logging.debug(this,
s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold
($handlerCapacity)")
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services