tysonnorris commented on a change in pull request #2650: Apply standard scala 
formatting.
URL: 
https://github.com/apache/incubator-openwhisk/pull/2650#discussion_r134611006
 
 

 ##########
 File path: 
common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
 ##########
 @@ -92,140 +92,151 @@ object MessageFeed {
  * of outstanding requests is below the pipeline fill threshold.
  */
 @throws[IllegalArgumentException]
-class MessageFeed(
-    description: String,
-    logging: Logging,
-    consumer: MessageConsumer,
-    maximumHandlerCapacity: Int,
-    longPollDuration: FiniteDuration,
-    handler: Array[Byte] => Future[Unit],
-    autoStart: Boolean = true,
-    logHandoff: Boolean = true)
+class MessageFeed(description: String,
+                  logging: Logging,
+                  consumer: MessageConsumer,
+                  maximumHandlerCapacity: Int,
+                  longPollDuration: FiniteDuration,
+                  handler: Array[Byte] => Future[Unit],
+                  autoStart: Boolean = true,
+                  logHandoff: Boolean = true)
     extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
-    import MessageFeed._
-
-    // double-buffer to make up for message bus read overhead
-    val maxPipelineDepth = maximumHandlerCapacity * 2
-    private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
-
-    require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more 
messages per peek than permitted by max depth")
-
-    private val outstandingMessages = mutable.Queue[(String, Int, Long, 
Array[Byte])]()
-    private var handlerCapacity = maximumHandlerCapacity
-
-    private implicit val tid = TransactionId.dispatcher
-
-    logging.info(this, s"handler capacity = $maximumHandlerCapacity, pipeline 
fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
-
-    when(Idle) {
-        case Event(Ready, _) =>
-            fillPipeline()
-            goto(FillingPipeline)
-
-        case _ => stay
-    }
-
-    // wait for fill to complete, and keep filling if there is
-    // capacity otherwise wait to drain
-    when(FillingPipeline) {
-        case Event(Processed, _) =>
-            updateHandlerCapacity()
-            sendOutstandingMessages()
-            stay
-
-        case Event(FillCompleted(messages), _) =>
-            outstandingMessages.enqueue(messages: _*)
-            sendOutstandingMessages()
-
-            if (shouldFillQueue()) {
-                fillPipeline()
-                stay
-            } else {
-                goto(DrainingPipeline)
-            }
-
-        case _ => stay
-    }
-
-    when(DrainingPipeline) {
-        case Event(Processed, _) =>
-            updateHandlerCapacity()
-            sendOutstandingMessages()
-            if (shouldFillQueue()) {
-                fillPipeline()
-                goto(FillingPipeline)
-            } else stay
-
-        case _ => stay
-    }
-
-    onTransition { case _ -> Idle => if (autoStart) self ! Ready }
-    startWith(Idle, MessageFeed.NoData)
-    initialize()
-
-    private implicit val ec = context.system.dispatcher
-
-    private def fillPipeline(): Unit = {
-        if (outstandingMessages.size <= pipelineFillThreshold) {
-            Future {
-                blocking {
-                    // Grab next batch of messages and commit offsets 
immediately
-                    // essentially marking the activation as having satisfied 
"at most once"
-                    // semantics (this is the point at which the activation is 
considered started).
-                    // If the commit fails, then messages peeked are peeked 
again on the next poll.
-                    // While the commit is synchronous and will block until it 
completes, at steady
-                    // state with enough buffering (i.e., maxPipelineDepth > 
maxPeek), the latency
-                    // of the commit should be masked.
-                    val records = consumer.peek(longPollDuration)
-                    consumer.commit()
-                    FillCompleted(records.toSeq)
-                }
-            }.andThen {
-                case Failure(e: CommitFailedException) => logging.error(this, 
s"failed to commit $description consumer offset: $e")
-                case Failure(e: Throwable)             => logging.error(this, 
s"exception while pulling new $description records: $e")
-            }.recover {
-                case _ => FillCompleted(Seq.empty)
-            }.pipeTo(self)
-        } else {
-            logging.error(this, s"dropping fill request until $description 
feed is drained")
+  import MessageFeed._
+
+  // double-buffer to make up for message bus read overhead
+  val maxPipelineDepth = maximumHandlerCapacity * 2
+  private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+
+  require(consumer.maxPeek <= maxPipelineDepth,
+          "consumer may not yield more messages per peek than permitted by max 
depth")
+
+  private val outstandingMessages =
+    mutable.Queue[(String, Int, Long, Array[Byte])]()
+  private var handlerCapacity = maximumHandlerCapacity
+
+  private implicit val tid = TransactionId.dispatcher
+
+  logging.info(
+    this,
+    s"handler capacity = $maximumHandlerCapacity, pipeline fill at = 
$pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
+
+  when(Idle) {
+    case Event(Ready, _) =>
+      fillPipeline()
+      goto(FillingPipeline)
+
+    case _ => stay
+  }
+
+  // wait for fill to complete, and keep filling if there is
+  // capacity otherwise wait to drain
+  when(FillingPipeline) {
+    case Event(Processed, _) =>
+      updateHandlerCapacity()
+      sendOutstandingMessages()
+      stay
+
+    case Event(FillCompleted(messages), _) =>
+      outstandingMessages.enqueue(messages: _*)
+      sendOutstandingMessages()
+
+      if (shouldFillQueue()) {
+        fillPipeline()
+        stay
+      } else {
+        goto(DrainingPipeline)
+      }
+
+    case _ => stay
+  }
+
+  when(DrainingPipeline) {
+    case Event(Processed, _) =>
+      updateHandlerCapacity()
+      sendOutstandingMessages()
+      if (shouldFillQueue()) {
+        fillPipeline()
+        goto(FillingPipeline)
+      } else stay
+
+    case _ => stay
+  }
+
+  onTransition { case _ -> Idle => if (autoStart) self ! Ready }
+  startWith(Idle, MessageFeed.NoData)
+  initialize()
+
+  private implicit val ec = context.system.dispatcher
+
+  private def fillPipeline(): Unit = {
+    if (outstandingMessages.size <= pipelineFillThreshold) {
+      Future {
+        blocking {
+          // Grab next batch of messages and commit offsets immediately
+          // essentially marking the activation as having satisfied "at most 
once"
+          // semantics (this is the point at which the activation is 
considered started).
+          // If the commit fails, then messages peeked are peeked again on the 
next poll.
+          // While the commit is synchronous and will block until it 
completes, at steady
+          // state with enough buffering (i.e., maxPipelineDepth > maxPeek), 
the latency
+          // of the commit should be masked.
+          val records = consumer.peek(longPollDuration)
+          consumer.commit()
+          FillCompleted(records.toSeq)
         }
+      }.andThen {
+          case Failure(e: CommitFailedException) =>
+            logging.error(this, s"failed to commit $description consumer 
offset: $e")
+          case Failure(e: Throwable) =>
+            logging.error(this, s"exception while pulling new $description 
records: $e")
+        }
+        .recover {
+          case _ => FillCompleted(Seq.empty)
+        }
+        .pipeTo(self)
+    } else {
+      logging.error(this, s"dropping fill request until $description feed is 
drained")
     }
+  }
 
-    /** Send as many messages as possible to the handler. */
-    @tailrec
-    private def sendOutstandingMessages(): Unit = {
-        val occupancy = outstandingMessages.size
-        if (occupancy > 0 && handlerCapacity > 0) {
-            val (topic, partition, offset, bytes) = 
outstandingMessages.dequeue()
+  /** Send as many messages as possible to the handler. */
+  @tailrec
+  private def sendOutstandingMessages(): Unit = {
+    val occupancy = outstandingMessages.size
+    if (occupancy > 0 && handlerCapacity > 0) {
+      val (topic, partition, offset, bytes) = outstandingMessages.dequeue()
 
-            if (logHandoff) logging.info(this, s"processing 
$topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-            handler(bytes)
-            handlerCapacity -= 1
+      if (logHandoff)
+        logging.info(this, s"processing $topic[$partition][$offset] 
($occupancy/$handlerCapacity)")
+      handler(bytes)
+      handlerCapacity -= 1
 
-            sendOutstandingMessages()
-        }
+      sendOutstandingMessages()
     }
-
-    private def shouldFillQueue(): Boolean = {
-        val occupancy = outstandingMessages.size
-        if (occupancy <= pipelineFillThreshold) {
-            logging.debug(this, s"$description pipeline has capacity: 
$occupancy <= $pipelineFillThreshold ($handlerCapacity)")
-            true
-        } else {
-            logging.debug(this, s"$description pipeline must drain: $occupancy 
> $pipelineFillThreshold")
-            false
-        }
+  }
+
+  private def shouldFillQueue(): Boolean = {
+    val occupancy = outstandingMessages.size
+    if (occupancy <= pipelineFillThreshold) {
+      logging.debug(
+        this,
+        s"$description pipeline has capacity: $occupancy <= 
$pipelineFillThreshold ($handlerCapacity)")
 
 Review comment:
   is it possible to wrap only the last arg, instead of causing all args to 
appear wrapped? like:
   ```
   logging.debug(this, 
     s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold 
($handlerCapacity)")
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to