markusthoemmes commented on a change in pull request #2878: Streamingly read
user-logs.
URL:
https://github.com/apache/incubator-openwhisk/pull/2878#discussion_r152798237
##########
File path:
core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
##########
@@ -215,40 +222,99 @@ class DockerContainer(protected val id: ContainerId,
* the result returned from this method.
*
* Only parses and returns as much logs as fit in the passed log limit.
- * Even if the log limit is exceeded, advance the starting position for the
next invocation
- * behind the bytes most recently read - but don't actively read any more
until sentinel
- * markers have been found.
*
* @param limit the limit to apply to the log size
* @param waitForSentinel determines if the processor should wait for a
sentinel to appear
*
* @return a vector of Strings with log lines in our own JSON format
*/
- def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId): Future[Vector[String]] = {
-
- def readLogs(retries: Int): Future[Vector[String]] = {
- docker
- .rawContainerLogs(id, logFileOffset)
- .flatMap { rawLogBytes =>
- val rawLog =
- new String(rawLogBytes.array, rawLogBytes.arrayOffset,
rawLogBytes.position, StandardCharsets.UTF_8)
- val (isComplete, isTruncated, formattedLogs) =
processJsonDriverLogContents(rawLog, waitForSentinel, limit)
-
- if (retries > 0 && !isComplete && !isTruncated) {
- logging.info(this, s"log cursor advanced but missing sentinel,
trying $retries more times")
- akka.pattern.after(filePollInterval,
as.scheduler)(readLogs(retries - 1))
+ def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId): Source[ByteString, Any] = {
+ val source = docker
+ .rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel)
Some(filePollInterval) else None)
+ .via(Framing.delimiter(delimiter, Int.MaxValue))
+ .limitWeighted(limit.toBytes) { obj =>
+ // Adding + 1 since we know there's a newline byte being read
+ val size = obj.size + 1
+ logFileOffset.addAndGet(size)
+ size
+ }
+
+ // Only apply sentinel counting if it is needed
+ val specializedSource = if (waitForSentinel) {
+ source.via(new
CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel),
2))
+ } else {
+ source
+ }
+
+ specializedSource
+ .recover {
+ case _: StreamLimitReachedException =>
+ // While the stream has already ended by failing the limitWeighted
stage above, we inject a truncation
+ // notice downstream, which will be processed as usual. This will be
the last element of the stream.
+ ByteString(LogLine(Instant.now.toString, "stderr",
Messages.truncateLogs(limit)).toJson.compactPrint)
+ case _: OccurrencesNotFoundException | _: FramingException =>
+ // Stream has already ended, since the log-source closed the stream,
which caused a framing exception.
+ // This will be the last element of the stream.
+ ByteString(LogLine(Instant.now.toString, "stderr",
Messages.logFailure).toJson.compactPrint)
+ }
+ .takeWithin(waitForLogs)
+ }
+
+ /* Delimiter used to split logs */
+ private val delimiter = ByteString("\n")
Review comment:
correct.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services