This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 48c4db3 Add restart sink to log rotator sink (#3834)
48c4db3 is described below
commit 48c4db35a122a87cd8f5b8aeebd5e26605d2e13d
Author: James Dubee <[email protected]>
AuthorDate: Tue Jul 3 13:28:13 2018 -0400
Add restart sink to log rotator sink (#3834)
* Add restart sink to log rotator sink
* Review refactor
---
.../logging/DockerToActivationFileLogStore.scala | 37 ++++++++++++----------
1 file changed, 20 insertions(+), 17 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 23d6657..034b622 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -24,7 +24,7 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.{Graph, SinkShape, UniformFanOutShape}
-import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink,
Source}
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub,
RestartSink, Sink, Source}
import akka.util.ByteString
import whisk.common.{AkkaLogging, TransactionId}
@@ -37,6 +37,7 @@ import spray.json._
import spray.json.DefaultJsonProtocol._
import scala.concurrent.Future
+import scala.concurrent.duration._
/**
* Docker based implementation of a LogStore.
@@ -77,23 +78,25 @@ class DockerToActivationFileLogStore(system: ActorSystem,
destinationDirectory:
protected val writeToFile: Sink[ByteString, _] = MergeHub
.source[ByteString]
.batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
- .to(LogRotatorSink(() => {
- val maxSize = bufferSize.toBytes
- var bytesRead = maxSize
- element =>
- {
- val size = element.size
- if (bytesRead + size > maxSize) {
- bytesRead = size
- val newLogFile =
destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
- logging.info(this, s"Rotating log file to '$newLogFile'")
- Some(newLogFile)
- } else {
- bytesRead += size
- None
+ .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff =
60.seconds, randomFactor = 0.2) { () =>
+ LogRotatorSink(() => {
+ val maxSize = bufferSize.toBytes
+ var bytesRead = maxSize
+ element =>
+ {
+ val size = element.size
+ if (bytesRead + size > maxSize) {
+ bytesRead = size
+ val newLogFile =
destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
+ logging.info(this, s"Rotating log file to '$newLogFile'")
+ Some(newLogFile)
+ } else {
+ bytesRead += size
+ None
+ }
}
- }
- }))
+ })
+ })
.run()
override def collectLogs(transid: TransactionId,