This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 48c4db3  Add restart sink to log rotator sink (#3834)
48c4db3 is described below

commit 48c4db35a122a87cd8f5b8aeebd5e26605d2e13d
Author: James Dubee <[email protected]>
AuthorDate: Tue Jul 3 13:28:13 2018 -0400

    Add restart sink to log rotator sink (#3834)
    
    * Add restart sink to log rotator sink
    
    * Review refactor
---
 .../logging/DockerToActivationFileLogStore.scala   | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 23d6657..034b622 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -24,7 +24,7 @@ import akka.NotUsed
 import akka.actor.ActorSystem
 import akka.stream.alpakka.file.scaladsl.LogRotatorSink
 import akka.stream.{Graph, SinkShape, UniformFanOutShape}
-import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, 
Source}
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, 
RestartSink, Sink, Source}
 import akka.util.ByteString
 
 import whisk.common.{AkkaLogging, TransactionId}
@@ -37,6 +37,7 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 
 import scala.concurrent.Future
+import scala.concurrent.duration._
 
 /**
  * Docker based implementation of a LogStore.
@@ -77,23 +78,25 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
   protected val writeToFile: Sink[ByteString, _] = MergeHub
     .source[ByteString]
     .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
-    .to(LogRotatorSink(() => {
-      val maxSize = bufferSize.toBytes
-      var bytesRead = maxSize
-      element =>
-        {
-          val size = element.size
-          if (bytesRead + size > maxSize) {
-            bytesRead = size
-            val newLogFile = 
destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
-            logging.info(this, s"Rotating log file to '$newLogFile'")
-            Some(newLogFile)
-          } else {
-            bytesRead += size
-            None
+    .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff = 
60.seconds, randomFactor = 0.2) { () =>
+      LogRotatorSink(() => {
+        val maxSize = bufferSize.toBytes
+        var bytesRead = maxSize
+        element =>
+          {
+            val size = element.size
+            if (bytesRead + size > maxSize) {
+              bytesRead = size
+              val newLogFile = 
destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
+              logging.info(this, s"Rotating log file to '$newLogFile'")
+              Some(newLogFile)
+            } else {
+              bytesRead += size
+              None
+            }
           }
-        }
-    }))
+      })
+    })
     .run()
 
   override def collectLogs(transid: TransactionId,

Reply via email to