This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a44db4b  Save waitTime value for sequence and conductor action (allow 
parent/child transaction ids) (#4819)
a44db4b is described below

commit a44db4bdb44288413946c21ab2007cbe22ce2950
Author: Seonghyun Oh <[email protected]>
AuthorDate: Wed Jun 3 10:57:07 2020 +0900

    Save waitTime value for sequence and conductor action (allow parent/child 
transaction ids) (#4819)
    
    * Allow parent/child transaction ids
    
    * Create child transaction for sequence action
    
    * Fix test case
    
    * Fix test case of SchedulerTests
    
    * Refactor transactionId
    
    * Check waitTime is defined in test case
    
    * Update annotations.md
    
    * Use StringBuilder to generate tid
    
    * Remove unsed import
    
    * Avoid `.get` and use pattern matching
---
 .../org/apache/openwhisk/common/Logging.scala      |   8 +-
 .../apache/openwhisk/common/TransactionId.scala    |  65 ++++++++++--
 .../apache/openwhisk/http/BasicHttpService.scala   |   9 +-
 .../core/controller/actions/PrimitiveActions.scala |  51 +++++++---
 .../core/controller/actions/SequenceActions.scala  |  26 +++--
 .../core/containerpool/ContainerProxy.scala        |  24 ++---
 docs/annotations.md                                |   2 +-
 .../apache/openwhisk/common/SchedulerTests.scala   |   2 +-
 .../openwhisk/common/TransactionIdTests.scala      | 111 +++++++++++++++++++++
 .../scala/system/basic/WskConductorTests.scala     |   3 +
 .../test/scala/system/basic/WskSequenceTests.scala |   3 +
 11 files changed, 242 insertions(+), 62 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 7f27b26..15c1bb3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -100,7 +100,10 @@ class AkkaLogging(loggingAdapter: LoggingAdapter) extends 
Logging {
     }
   }
 
-  protected def format(id: TransactionId, name: String, logmsg: String) = 
s"[$id] [$name] $logmsg"
+  protected def format(id: TransactionId, name: String, logmsg: String) = {
+    val currentId = if (id.hasParent) id else ""
+    s"[${id.root}] [$currentId] [$name] $logmsg"
+  }
 }
 
 /**
@@ -124,8 +127,9 @@ class PrintStreamLogging(outputStream: PrintStream = 
Console.out) extends Loggin
       case msg if msg.nonEmpty =>
         msg.split('\n').map(_.trim).mkString(" ")
     }
+    val currentId = if (id.hasParent) id else ""
 
-    val parts = Seq(s"[$time]", s"[$level]", s"[$id]") ++ Seq(s"[$name]") ++ 
logMessage
+    val parts = Seq(s"[$time]", s"[$level]", s"[${id.root}]", s"[$currentId]") 
++ Seq(s"[$name]") ++ logMessage
     outputStream.println(parts.mkString(" "))
   }
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 6fcc566..257dd07 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -36,8 +36,9 @@ import scala.util.Try
  * metadata is stored indirectly in the referenced meta object.
  */
 case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
+  def root = findRoot(meta)
   def id = meta.id
-  override def toString = s"#tid_${meta.id}"
+  override def toString = meta.toString
 
   def toHeader = RawHeader(TransactionId.generatorConfig.header, meta.id)
 
@@ -172,6 +173,8 @@ case class TransactionId private (meta: 
TransactionMetadata) extends AnyVal {
   def deltaToMarker(startMarker: StartMarker, endTime: Instant = 
Instant.now(Clock.systemUTC)) =
     Duration.between(startMarker.start, endTime).toMillis
 
+  def hasParent = meta.parent.isDefined
+
   /**
    * Formats log message to include marker.
    *
@@ -179,6 +182,15 @@ case class TransactionId private (meta: 
TransactionMetadata) extends AnyVal {
    * @param marker: The marker to add to the message
    */
   private def createMessageWithMarker(message: String, marker: LogMarker): 
String = s"$message $marker"
+
+  /**
+   * Find root transaction metadata
+   */
+  private def findRoot(meta: TransactionMetadata): TransactionMetadata =
+    meta.parent match {
+      case Some(parent) => findRoot(parent)
+      case _            => meta
+    }
 }
 
 /**
@@ -197,7 +209,12 @@ case class StartMarker(start: Instant, startMarker: 
LogMarkerToken)
  * @param start the timestamp when the request processing commenced
  * @param extraLogging enables logging, if set to true
  */
-protected case class TransactionMetadata(id: String, start: Instant, 
extraLogging: Boolean = false)
+protected case class TransactionMetadata(id: String,
+                                         start: Instant,
+                                         extraLogging: Boolean = false,
+                                         parent: Option[TransactionMetadata] = 
None) {
+  override def toString = s"#tid_$id"
+}
 
 case class MetricConfig(prometheusEnabled: Boolean,
                         kamonEnabled: Boolean,
@@ -227,28 +244,54 @@ object TransactionId {
   val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
   val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
 
+  private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
+
   def apply(tid: String, extraLogging: Boolean = false): TransactionId = {
     val now = Instant.now(Clock.systemUTC()).inMills
     TransactionId(TransactionMetadata(tid, now, extraLogging))
   }
 
+  def childOf(parentTid: TransactionId): TransactionId = {
+    val now = Instant.now(Clock.systemUTC()).inMills
+    val tid = generateTid()
+    TransactionId(TransactionMetadata(tid, now, parentTid.meta.extraLogging, 
Some(parentTid.meta)))
+  }
+
+  def generateTid(): String = {
+    val sb = new StringBuilder
+    for (_ <- 1 to 32) sb.append(dict(util.Random.nextInt(dict.length)))
+    sb.toString
+  }
+
   implicit val serdes = new RootJsonFormat[TransactionId] {
-    def write(t: TransactionId) = {
-      if (t.meta.extraLogging)
-        JsArray(JsString(t.meta.id), JsNumber(t.meta.start.toEpochMilli), 
JsBoolean(t.meta.extraLogging))
-      else
-        JsArray(JsString(t.meta.id), JsNumber(t.meta.start.toEpochMilli))
+
+    private def writeMetadata(meta: TransactionMetadata): JsArray = {
+      val base = Vector(JsString(meta.id), JsNumber(meta.start.toEpochMilli))
+      val extraLogging = if (meta.extraLogging) 
Vector(JsBoolean(meta.extraLogging)) else Vector.empty
+      val parent = meta.parent match {
+        case Some(p) => Vector(writeMetadata(p))
+        case _       => Vector.empty
+      }
+      JsArray(base ++ extraLogging ++ parent)
     }
 
-    def read(value: JsValue) =
+    private def readMetadata(value: JsValue): Option[TransactionMetadata] = {
       Try {
         value match {
           case JsArray(Vector(JsString(id), JsNumber(start))) =>
-            TransactionId(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), false))
+            Some(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), false))
           case JsArray(Vector(JsString(id), JsNumber(start), 
JsBoolean(extraLogging))) =>
-            TransactionId(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), extraLogging))
+            Some(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), extraLogging))
+          case JsArray(Vector(JsString(id), JsNumber(start), 
JsBoolean(extraLogging), parent)) =>
+            Some(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), extraLogging, readMetadata(parent)))
+          case JsArray(Vector(JsString(id), JsNumber(start), parent)) =>
+            Some(TransactionMetadata(id, 
Instant.ofEpochMilli(start.longValue), false, readMetadata(parent)))
         }
-      } getOrElse unknown
+      } getOrElse Option.empty
+    }
+
+    def write(t: TransactionId): JsArray = writeMetadata(t.meta)
+    def read(value: JsValue): TransactionId = readMetadata(value).map(meta => 
TransactionId(meta)).getOrElse(unknown)
   }
 }
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
index 9e9d965..27f1be9 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
@@ -17,8 +17,6 @@
 
 package org.apache.openwhisk.http
 
-import java.util.concurrent.ThreadLocalRandom
-
 import akka.actor.ActorSystem
 import akka.event.Logging
 import akka.http.scaladsl.{Http, HttpConnectionContext}
@@ -94,10 +92,6 @@ trait BasicHttpService extends Directives {
     }
   }
 
-  // Scala random should be enough here, as the generation for the tid is only 
a fallback. In addition the tid only has
-  // to be unique within a few minutes.
-  private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
-
   /** Assigns transaction id to every request. */
   protected def assignId = 
HeaderDirectives.optionalHeaderValueByName(OW_EXTRA_LOGGING_HEADER) flatMap { 
headerValue =>
     val extraLogging = headerValue match {
@@ -115,8 +109,7 @@ trait BasicHttpService extends Directives {
           .filterNot(_.startsWith(TransactionId.systemPrefix))
           .getOrElse {
             // As this is only a fallback, because the tid should be generated 
by nginx, this shouldn't be used.
-            // Therefore we didn't take a deep look into performance here.
-            (0 until 32).map(_ => 
dict(ThreadLocalRandom.current().nextInt(dict.size))).mkString("")
+            TransactionId.generateTid()
           }
 
       TransactionId(tid, extraLogging)
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index f376d77..1ac1151 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -21,8 +21,8 @@ import java.time.{Clock, Instant}
 
 import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
+import spray.json.DefaultJsonProtocol._
 import spray.json._
-
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId, 
UserEvents}
 import org.apache.openwhisk.core.connector.{ActivationMessage, EventMessage, 
MessagingProvider}
@@ -37,13 +37,13 @@ import org.apache.openwhisk.http.Messages._
 import org.apache.openwhisk.spi.SpiLoader
 import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
 import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.Interval
 
 import scala.collection.mutable.Buffer
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
-
 import pureconfig._
 import pureconfig.generic.auto._
 
@@ -290,7 +290,7 @@ protected[actions] trait PrimitiveActions {
     logging.info(this, s"invoking composition $action topmost ${cause.isEmpty} 
activationid '${session.activationId}'")
 
     val response: Future[Either[ActivationId, WhiskActivation]] =
-      invokeConductor(user, payload, session).map(response =>
+      invokeConductor(user, payload, session, transid).map(response =>
         Right(completeActivation(user, session, response, 
waitForResponse.isDefined)))
 
     // is caller waiting for the result of the activation?
@@ -315,10 +315,14 @@ protected[actions] trait PrimitiveActions {
    * @param user the identity invoking the action
    * @param payload the dynamic arguments for the activation
    * @param session the session object for this composition
-   * @param transid a transaction id for logging
+   * @param parentTid a parent transaction id
    */
-  private def invokeConductor(user: Identity, payload: Option[JsObject], 
session: Session)(
-    implicit transid: TransactionId): Future[ActivationResponse] = {
+  private def invokeConductor(user: Identity,
+                              payload: Option[JsObject],
+                              session: Session,
+                              parentTid: TransactionId): 
Future[ActivationResponse] = {
+
+    implicit val transid: TransactionId = TransactionId.childOf(parentTid)
 
     if (session.accounting.conductors > 2 * actionSequenceLimit) {
       // composition is too long
@@ -365,19 +369,21 @@ protected[actions] trait PrimitiveActions {
             case Some(next) =>
               FullyQualifiedEntityName.resolveName(next, user.namespace.name) 
match {
                 case Some(fqn) if session.accounting.components < 
actionSequenceLimit =>
-                  tryInvokeNext(user, fqn, params, session)
+                  tryInvokeNext(user, fqn, params, session, transid)
 
                 case Some(_) => // composition is too long
                   invokeConductor(
                     user,
                     payload = Some(JsObject(ERROR_FIELD -> 
JsString(compositionIsTooLong))),
-                    session = session)
+                    session = session,
+                    transid)
 
                 case None => // parsing failure
                   invokeConductor(
                     user,
                     payload = Some(JsObject(ERROR_FIELD -> 
JsString(compositionComponentInvalid(next)))),
-                    session = session)
+                    session = session,
+                    transid)
 
               }
           }
@@ -395,8 +401,14 @@ protected[actions] trait PrimitiveActions {
    * @param session the session for the current activation
    * @return promise for the eventual activation
    */
-  private def tryInvokeNext(user: Identity, fqn: FullyQualifiedEntityName, 
params: Option[JsObject], session: Session)(
-    implicit transid: TransactionId): Future[ActivationResponse] = {
+  private def tryInvokeNext(user: Identity,
+                            fqn: FullyQualifiedEntityName,
+                            params: Option[JsObject],
+                            session: Session,
+                            parentTid: TransactionId): 
Future[ActivationResponse] = {
+
+    implicit val transid: TransactionId = TransactionId.childOf(parentTid)
+
     val resource = Resource(fqn.path, Collection(Collection.ACTIONS), 
Some(fqn.name.asString))
     entitlementProvider
       .check(user, Privilege.ACTIVATE, Set(resource), noThrottle = true)
@@ -415,7 +427,8 @@ protected[actions] trait PrimitiveActions {
               invokeConductor(
                 user,
                 payload = Some(JsObject(ERROR_FIELD -> 
JsString(compositionComponentNotFound(fqn.asString)))),
-                session = session)
+                session = session,
+                transid)
           }
       }
       .recoverWith {
@@ -424,7 +437,8 @@ protected[actions] trait PrimitiveActions {
           invokeConductor(
             user,
             payload = Some(JsObject(ERROR_FIELD -> 
JsString(compositionComponentNotAccessible(fqn.asString)))),
-            session = session)
+            session = session,
+            transid)
       }
   }
 
@@ -480,7 +494,7 @@ protected[actions] trait PrimitiveActions {
       case Left(response) => // unsuccessful invocation, return error response
         Future.successful(response)
       case Right(activation) => // reinvoke conductor on component result
-        invokeConductor(user, payload = Some(activation.resultAsJson), session 
= session)
+        invokeConductor(user, payload = Some(activation.resultAsJson), session 
= session, transid)
     }
   }
 
@@ -544,6 +558,13 @@ protected[actions] trait PrimitiveActions {
       Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))
     }
 
+    // set waitTime for conductor action
+    val waitTime = {
+      Parameters(
+        WhiskActivation.waitTimeAnnotation,
+        Interval(transid.meta.start, session.start).duration.toMillis.toJson)
+    }
+
     // set binding if invoked action is in a package binding
     val binding =
       session.action.binding.map(f => 
Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString)))
@@ -567,7 +588,7 @@ protected[actions] trait PrimitiveActions {
         Parameters(WhiskActivation.pathAnnotation, 
JsString(session.action.fullyQualifiedName(false).asString)) ++
         Parameters(WhiskActivation.kindAnnotation, JsString(Exec.SEQUENCE)) ++
         Parameters(WhiskActivation.conductorAnnotation, JsTrue) ++
-        causedBy ++ binding ++
+        causedBy ++ waitTime ++ binding ++
         sequenceLimits,
       duration = Some(session.duration))
 
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
index 54272dd..619ccdb 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
@@ -21,8 +21,11 @@ import java.time.{Clock, Instant}
 import java.util.concurrent.atomic.AtomicReference
 
 import akka.actor.ActorSystem
+import spray.json._
+import spray.json.DefaultJsonProtocol._
 import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
 import org.apache.openwhisk.core.connector.{EventMessage, MessagingProvider}
+import org.apache.openwhisk.core.containerpool.Interval
 import org.apache.openwhisk.core.controller.WhiskServices
 import org.apache.openwhisk.core.database.{ActivationStore, 
NoDocumentException, UserContext}
 import org.apache.openwhisk.core.entity._
@@ -199,7 +202,7 @@ protected[actions] trait SequenceActions {
                                      topmost: Boolean,
                                      cause: Option[ActivationId],
                                      start: Instant,
-                                     end: Instant): WhiskActivation = {
+                                     end: Instant)(implicit transid: 
TransactionId): WhiskActivation = {
 
     // compute max memory
     val sequenceLimits = accounting.maxMemory map { 
maxMemoryAcrossActionsInSequence =>
@@ -213,6 +216,11 @@ protected[actions] trait SequenceActions {
       Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
     } else None
 
+    // set waitTime for sequence action
+    val waitTime = {
+      Parameters(WhiskActivation.waitTimeAnnotation, 
Interval(transid.meta.start, start).duration.toMillis.toJson)
+    }
+
     // set binding if an invoked action is in a package binding
     val binding = action.binding map { path =>
       Parameters(WhiskActivation.bindingAnnotation, JsString(path.asString))
@@ -234,7 +242,7 @@ protected[actions] trait SequenceActions {
       annotations = Parameters(WhiskActivation.topmostAnnotation, 
JsBoolean(topmost)) ++
         Parameters(WhiskActivation.pathAnnotation, 
JsString(action.fullyQualifiedName(false).asString)) ++
         Parameters(WhiskActivation.kindAnnotation, JsString(Exec.SEQUENCE)) ++
-        causedBy ++ binding ++
+        causedBy ++ waitTime ++ binding ++
         sequenceLimits,
       duration = Some(accounting.duration))
   }
@@ -285,7 +293,7 @@ protected[actions] trait SequenceActions {
       .foldLeft(initialAccounting) { (accountingFuture, futureAction) =>
         accountingFuture.flatMap { accounting =>
           if (accounting.atomicActionCnt < actionSequenceLimit) {
-            invokeNextAction(user, futureAction, accounting, cause)
+            invokeNextAction(user, futureAction, accounting, cause, transid)
               .flatMap { accounting =>
                 if (!accounting.shortcircuit) {
                   Future.successful(accounting)
@@ -327,12 +335,14 @@ protected[actions] trait SequenceActions {
    * @param cause the activation id of the first sequence containing this 
activations
    * @return a future which resolves with updated accounting for a sequence, 
including the last result, duration, and activation ids
    */
-  private def invokeNextAction(
-    user: Identity,
-    futureAction: Future[WhiskActionMetaData],
-    accounting: SequenceAccounting,
-    cause: Option[ActivationId])(implicit transid: TransactionId): 
Future[SequenceAccounting] = {
+  private def invokeNextAction(user: Identity,
+                               futureAction: Future[WhiskActionMetaData],
+                               accounting: SequenceAccounting,
+                               cause: Option[ActivationId],
+                               parentTid: TransactionId): 
Future[SequenceAccounting] = {
     futureAction.flatMap { action =>
+      implicit val transid: TransactionId = TransactionId.childOf(parentTid)
+
       // the previous response becomes input for the next action in the 
sequence;
       // the accounting no longer needs to hold a reference to it once the 
action is
       // invoked, so previousResponse.getAndSet(null) drops the reference at 
this point
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index d23c0e5..76dba7e 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -919,21 +919,13 @@ object ContainerProxy {
                                totalInterval: Interval,
                                isTimeout: Boolean,
                                response: ActivationResponse) = {
-    val causedBy = Some {
-      if (job.msg.causedBySequence) {
-        Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))
-      } else {
-        // emit the internal system hold time as the 'wait' time, but only for 
non-sequence
-        // actions, since the transid start time for a sequence does not 
correspond
-        // with a specific component of the activation but the entire sequence;
-        // it will require some work to generate a new transaction id for a 
sequence
-        // component - however, because the trace of activations is recorded 
in the parent
-        // sequence, a client can determine the queue time for sequences that 
way
-        val end = initInterval.map(_.start).getOrElse(totalInterval.start)
-        Parameters(
-          WhiskActivation.waitTimeAnnotation,
-          Interval(job.msg.transid.meta.start, end).duration.toMillis.toJson)
-      }
+    val causedBy = if (job.msg.causedBySequence) {
+      Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
+    } else None
+
+    val waitTime = {
+      val end = initInterval.map(_.start).getOrElse(totalInterval.start)
+      Parameters(WhiskActivation.waitTimeAnnotation, 
Interval(job.msg.transid.meta.start, end).duration.toMillis.toJson)
     }
 
     val initTime = {
@@ -959,7 +951,7 @@ object ContainerProxy {
           Parameters(WhiskActivation.pathAnnotation, 
JsString(job.action.fullyQualifiedName(false).asString)) ++
           Parameters(WhiskActivation.kindAnnotation, 
JsString(job.action.exec.kind)) ++
           Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) 
++
-          causedBy ++ initTime ++ binding
+          causedBy ++ initTime ++ waitTime ++ binding
       })
   }
 
diff --git a/docs/annotations.md b/docs/annotations.md
index 4923894..78f4af2 100644
--- a/docs/annotations.md
+++ b/docs/annotations.md
@@ -90,7 +90,7 @@ Additionally for sequence related activations, the system 
will generate the foll
 
 Lastly, and in order to provide you with some performance transparency, 
activations also record:
 
-* `waitTime`: the time spent waiting in the internal OpenWhisk system. This is 
roughly the time spent between the controller receiving the activation request 
and when the invoker provisioned a container for the action. This value is 
currently only present for non-sequence related activations. For sequences, 
this information can be derived from the `topmost` sequence activation record.
+* `waitTime`: the time spent waiting in the internal OpenWhisk system. This is 
roughly the time spent between the controller receiving the activation request 
and when the invoker provisioned a container for the action.
 * `initTime`: the time spent initializing the function. If this value is 
present, the action required initialization and represents a cold start. A warm 
activation will skip initialization, and in this case, the annotation is not 
generated.
 
 An example of these annotations as they would appear in an activation record 
is shown below.
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/SchedulerTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/SchedulerTests.scala
index e86a9e8..08518fd 100644
--- a/tests/src/test/scala/org/apache/openwhisk/common/SchedulerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/common/SchedulerTests.scala
@@ -127,7 +127,7 @@ class SchedulerTests extends FlatSpec with Matchers with 
WskActorSystem with Str
 
     waitForCalls()
     stream.toString.split(" ").drop(1).mkString(" ") shouldBe {
-      s"[ERROR] [$transid] [Scheduler] halted because $msg\n"
+      s"[ERROR] [${transid.root}] [] [Scheduler] halted because $msg\n"
     }
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/TransactionIdTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/TransactionIdTests.scala
new file mode 100644
index 0000000..a81b7c7
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/TransactionIdTests.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+import java.time.{Instant}
+import spray.json._
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class TransactionIdTests extends FlatSpec with Matchers with StreamLogging {
+
+  behavior of "TransactionId deserialization"
+
+  it should "deserialize with parent tid" in {
+
+    val json = 
"""["ctid1",1600000000000,["ptid1",1500000000000]]""".stripMargin.parseJson
+
+    val pnow = Instant.ofEpochMilli(1500000000000L)
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+
+    val ptid = TransactionId(TransactionMetadata("ptid1", pnow))
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow, parent = 
Some(ptid.meta)))
+
+    val transactionId = TransactionId.serdes.read(json)
+    transactionId shouldBe ctid
+  }
+
+  it should "deserialize with parent tid and extraLogging parameter" in {
+
+    val json = 
"""["ctid1",1600000000000,true,["ptid1",1500000000000,true]]""".stripMargin.parseJson
+
+    val pnow = Instant.ofEpochMilli(1500000000000L)
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+
+    val ptid = TransactionId(TransactionMetadata("ptid1", pnow, extraLogging = 
true))
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow, extraLogging = 
true, parent = Some(ptid.meta)))
+
+    val transactionId = TransactionId.serdes.read(json)
+    transactionId shouldBe ctid
+  }
+
+  it should "deserialize without parent tid" in {
+
+    val json = """["ctid1",1600000000000]""".stripMargin.parseJson
+
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow))
+
+    val transactionId = TransactionId.serdes.read(json)
+    transactionId shouldBe ctid
+  }
+
+  behavior of "TransactionId serialization"
+
+  it should "serialize with parent tid" in {
+
+    val pnow = Instant.ofEpochMilli(1500000000000L)
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+
+    val ptid = TransactionId(TransactionMetadata("ptid1", pnow))
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow, parent = 
Some(ptid.meta)))
+
+    val js = TransactionId.serdes.write(ctid)
+
+    val js2 = 
"""["ctid1",1600000000000,["ptid1",1500000000000]]""".stripMargin.parseJson
+    js shouldBe js2
+  }
+
+  it should "serialize with parent tid and extraLogging parameter" in {
+
+    val pnow = Instant.ofEpochMilli(1500000000000L)
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+
+    val ptid = TransactionId(TransactionMetadata("ptid1", pnow, extraLogging = 
true))
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow, extraLogging = 
true, parent = Some(ptid.meta)))
+
+    val js = TransactionId.serdes.write(ctid)
+
+    val js2 = 
"""["ctid1",1600000000000,true,["ptid1",1500000000000,true]]""".stripMargin.parseJson
+    js shouldBe js2
+  }
+
+  it should "serialize without parent tid" in {
+
+    val cnow = Instant.ofEpochMilli(1600000000000L)
+    val ctid = TransactionId(TransactionMetadata("ctid1", cnow))
+
+    val js = TransactionId.serdes.write(ctid)
+    val js2 = """["ctid1",1600000000000]""".stripMargin.parseJson
+    js shouldBe js2
+  }
+
+}
diff --git a/tests/src/test/scala/system/basic/WskConductorTests.scala 
b/tests/src/test/scala/system/basic/WskConductorTests.scala
index 01d51ca..c45f053 100644
--- a/tests/src/test/scala/system/basic/WskConductorTests.scala
+++ b/tests/src/test/scala/system/basic/WskConductorTests.scala
@@ -364,6 +364,9 @@ class WskConductorTests extends TestHelpers with 
WskTestHelpers with JsHelpers w
         totalWait = allowedActionDuration) { componentActivation =>
         componentActivation.cause shouldBe defined
         componentActivation.cause.get shouldBe (activation.activationId)
+        // check waitTime
+        val waitTime = componentActivation.getAnnotationValue("waitTime")
+        waitTime shouldBe defined
         // check causedBy
         val causedBy = componentActivation.getAnnotationValue("causedBy")
         causedBy shouldBe defined
diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala 
b/tests/src/test/scala/system/basic/WskSequenceTests.scala
index f151d9d..11ddd8f 100644
--- a/tests/src/test/scala/system/basic/WskSequenceTests.scala
+++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala
@@ -539,6 +539,9 @@ class WskSequenceTests extends TestHelpers with 
WskTestHelpers with StreamLoggin
         totalWait = allowedActionDuration) { componentActivation =>
         componentActivation.cause shouldBe defined
         componentActivation.cause.get shouldBe (activation.activationId)
+        // check waitTime
+        val waitTime = componentActivation.getAnnotationValue("waitTime")
+        waitTime shouldBe defined
         // check causedBy
         val causedBy = componentActivation.getAnnotationValue("causedBy")
         causedBy shouldBe defined

Reply via email to