This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 63496d5  Distributed tracing support #2192 (#2282)
63496d5 is described below

commit 63496d5ee3a67a24f1e811569dae30ce0b738377
Author: sandeep-paliwal <[email protected]>
AuthorDate: Fri Jun 29 20:37:57 2018 +0530

    Distributed tracing support #2192 (#2282)
    
    Enables Tracing support via Zipkin and OpenTracer.
    
    It can be enabled via config
    
       tracing {
            zipkin {
                 url = "http://localhost:9411"; //url to connecto to zipkin 
server
                 //sample-rate to decide a request is sampled or not.
                 sample-rate = "0.01" // sample 1% of requests by default
            }
        }
    
    Tracing enables tracking of request from Controller to Invoker
---
 common/scala/build.gradle                          |   8 +
 common/scala/src/main/resources/application.conf   |  14 ++
 .../main/scala/whisk/common/TransactionId.scala    |  11 ++
 .../whisk/common/tracing/OpenTracingProvider.scala | 201 +++++++++++++++++++++
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   3 +
 .../main/scala/whisk/core/connector/Message.scala  |   5 +-
 .../controller/src/main/resources/application.conf |  10 +
 .../core/controller/actions/PrimitiveActions.scala |  25 ++-
 core/invoker/src/main/resources/application.conf   |   5 +
 .../scala/whisk/core/invoker/InvokerReactive.scala |   4 +
 tests/build.gradle                                 |   3 +
 tests/src/test/scala/common/WskTracingTests.scala  | 126 +++++++++++++
 12 files changed, 403 insertions(+), 12 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 488c94f..02e9453 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,6 +64,14 @@ dependencies {
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
     //for mesos
     compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7'
+
+    //tracing support
+    compile 'io.opentracing:opentracing-api:0.31.0'
+    compile 'io.opentracing:opentracing-util:0.31.0'
+    compile 'io.opentracing.brave:brave-opentracing:0.31.0'
+    compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
+    compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'
+
     scoverage gradle.scoverage.deps
 }
 
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 3ceaab8..bbac5f4 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -188,4 +188,18 @@ whisk {
         constraint-delimiter = " "//used to parse constraint strings
         teardown-on-exit = true //set to true to disable the mesos framework 
on system exit; set for false for HA deployments
     }
+
+    # tracing configuration
+    tracing {
+        cache-expiry = 30 seconds #how long to keep spans in cache. Set to 
appropriate value to trace long running requests
+        #Zipkin configuration. Uncomment following to enable zipkin based 
tracing
+        #zipkin {
+        #   url = "http://localhost:9411"; //url to connecto to zipkin server
+             //sample-rate to decide a request is sampled or not.
+             //sample-rate 0.5 eqauls to sampling 50% of the requests
+             //sample-rate of 1 means 100% sampling.
+             //sample-rate of 0 means no sampling
+        #   sample-rate = "0.01" // sample 1% of requests by default
+        #}
+    }
 }
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala 
b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index cb16a42..6441629 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -24,6 +24,8 @@ import akka.http.scaladsl.model.headers.RawHeader
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import whisk.core.ConfigKeys
+import pureconfig._
+import whisk.common.tracing.WhiskTracerProvider
 
 import scala.util.Try
 
@@ -82,6 +84,9 @@ case class TransactionId private (meta: TransactionMetadata) 
extends AnyVal {
     }
 
     MetricEmitter.emitCounterMetric(marker)
+
+    //tracing support
+    WhiskTracerProvider.tracer.startSpan(marker, this)
     StartMarker(Instant.now, marker)
   }
 
@@ -116,6 +121,9 @@ case class TransactionId private (meta: 
TransactionMetadata) extends AnyVal {
     }
 
     MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+
+    //tracing support
+    WhiskTracerProvider.tracer.finishSpan(this)
   }
 
   /**
@@ -144,6 +152,9 @@ case class TransactionId private (meta: 
TransactionMetadata) extends AnyVal {
 
     MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
     MetricEmitter.emitCounterMetric(endMarker)
+
+    //tracing support
+    WhiskTracerProvider.tracer.error(this)
   }
 
   /**
diff --git 
a/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala 
b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
new file mode 100644
index 0000000..09c105a
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common.tracing
+
+import java.util.concurrent.TimeUnit
+
+import brave.Tracing
+import brave.opentracing.BraveTracer
+import brave.sampler.Sampler
+import com.github.benmanes.caffeine.cache.{Caffeine, Ticker}
+import io.opentracing.propagation.{Format, TextMapExtractAdapter, 
TextMapInjectAdapter}
+import io.opentracing.util.GlobalTracer
+import io.opentracing.{Span, SpanContext, Tracer}
+import pureconfig._
+import whisk.common.{LogMarkerToken, TransactionId}
+import whisk.core.ConfigKeys
+import zipkin2.reporter.okhttp3.OkHttpSender
+import zipkin2.reporter.{AsyncReporter, Sender}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+/**
+ * OpenTracing based implementation for tracing
+ */
+class OpenTracer(val tracer: Tracer, tracingConfig: TracingConfig, ticker: 
Ticker = SystemTicker) extends WhiskTracer {
+  val spanMap = configureCache[String, List[Span]]()
+  val contextMap = configureCache[String, SpanContext]()
+
+  /**
+   * Start a Trace for given service.
+   *
+   * @param transactionId transactionId to which this Trace belongs.
+   * @return TracedRequest which provides details about current service being 
traced.
+   */
+  override def startSpan(logMarker: LogMarkerToken, transactionId: 
TransactionId): Unit = {
+    //initialize list for this transactionId
+    val spanList = spanMap.getOrElse(transactionId.meta.id, Nil)
+
+    val spanBuilder = tracer
+      .buildSpan(logMarker.action)
+      .withTag("transactionId", transactionId.meta.id)
+
+    val active = spanList match {
+      case Nil =>
+        //Check if any active context then resume from that else create a 
fresh span
+        contextMap
+          .get(transactionId.meta.id)
+          .map(spanBuilder.asChildOf)
+          .getOrElse(spanBuilder.ignoreActiveSpan())
+          .startActive(true)
+          .span()
+      case head :: _ =>
+        //Create a child span of current head
+        spanBuilder.asChildOf(head).startActive(true).span()
+    }
+    //add active span to list
+    spanMap.put(transactionId.meta.id, active :: spanList)
+  }
+
+  /**
+   * Finish a Trace associated with given transactionId.
+   *
+   * @param transactionId
+   */
+  override def finishSpan(transactionId: TransactionId): Unit = {
+    clear(transactionId)
+  }
+
+  /**
+   * Register error
+   *
+   * @param transactionId
+   */
+  override def error(transactionId: TransactionId): Unit = {
+    clear(transactionId)
+  }
+
+  /**
+   * Get the current TraceContext which can be used for downstream services
+   *
+   * @param transactionId
+   * @return
+   */
+  override def getTraceContext(transactionId: TransactionId): 
Option[Map[String, String]] = {
+    spanMap
+      .get(transactionId.meta.id)
+      .flatMap(_.headOption)
+      .map { span =>
+        val map = mutable.Map.empty[String, String]
+        tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new 
TextMapInjectAdapter(map.asJava))
+        map.toMap
+      }
+  }
+
+  /**
+   * Get the current TraceContext which can be used for downstream services
+   *
+   * @param transactionId
+   * @return
+   */
+  override def setTraceContext(transactionId: TransactionId, context: 
Option[Map[String, String]]) = {
+    context.foreach { scalaMap =>
+      val ctx: SpanContext = tracer.extract(Format.Builtin.TEXT_MAP, new 
TextMapExtractAdapter(scalaMap.asJava))
+      contextMap.put(transactionId.meta.id, ctx)
+    }
+  }
+
+  private def clear(transactionId: TransactionId): Unit = {
+    spanMap.get(transactionId.meta.id).foreach {
+      case head :: Nil =>
+        head.finish()
+        spanMap.remove(transactionId.meta.id)
+        contextMap.remove(transactionId.meta.id)
+      case head :: tail =>
+        head.finish()
+        spanMap.put(transactionId.meta.id, tail)
+      case Nil =>
+    }
+  }
+
+  private def configureCache[T, R](): collection.concurrent.Map[T, R] =
+    Caffeine
+      .newBuilder()
+      .ticker(ticker)
+      .expireAfterAccess(tracingConfig.cacheExpiry.toSeconds, TimeUnit.SECONDS)
+      .build()
+      .asMap()
+      .asScala
+      .asInstanceOf[collection.concurrent.Map[T, R]]
+}
+
+trait WhiskTracer {
+  def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit 
= {}
+  def finishSpan(transactionId: TransactionId): Unit = {}
+  def error(transactionId: TransactionId): Unit = {}
+  def getTraceContext(transactionId: TransactionId): Option[Map[String, 
String]] = None
+  def setTraceContext(transactionId: TransactionId, context: 
Option[Map[String, String]]): Unit = {}
+}
+
+object WhiskTracerProvider {
+  val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+
+  val tracer: WhiskTracer = createTracer(tracingConfig)
+
+  private def createTracer(tracingConfig: TracingConfig): WhiskTracer = {
+
+    tracingConfig.zipkin match {
+      case Some(zipkinConfig) => {
+        if (!GlobalTracer.isRegistered) {
+          val sender: Sender = OkHttpSender.create(zipkinConfig.generateUrl)
+          val spanReporter = AsyncReporter.create(sender)
+          val braveTracing = Tracing
+            .newBuilder()
+            .localServiceName(tracingConfig.component)
+            .spanReporter(spanReporter)
+            .sampler(Sampler.create(zipkinConfig.sampleRate.toFloat))
+            .build()
+
+          //register with OpenTracing
+          GlobalTracer.register(BraveTracer.create(braveTracing))
+
+          sys.addShutdownHook({ spanReporter.close() })
+        }
+      }
+      case None =>
+    }
+
+    if (GlobalTracer.isRegistered)
+      new OpenTracer(GlobalTracer.get(), tracingConfig)
+    else
+      NoopTracer
+  }
+}
+
+private object NoopTracer extends WhiskTracer
+case class TracingConfig(component: String, cacheExpiry: Duration, zipkin: 
Option[ZipkinConfig] = None)
+case class ZipkinConfig(url: String, sampleRate: String) {
+  def generateUrl = s"$url/api/v2/spans"
+}
+object SystemTicker extends Ticker {
+  override def read() = {
+    System.nanoTime()
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index fb20fda..be9f7b0 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -222,6 +222,9 @@ object ConfigKeys {
   val dockerContainerFactory = s"${docker}.container-factory"
   val runc = "whisk.runc"
   val runcTimeouts = s"$runc.timeouts"
+
+  val tracing = "whisk.tracing"
+
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
   val containerPool = "whisk.container-pool"
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index eb960b3..d540a54 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -49,7 +49,8 @@ case class ActivationMessage(override val transid: 
TransactionId,
                              rootControllerIndex: ControllerInstanceId,
                              blocking: Boolean,
                              content: Option[JsObject],
-                             cause: Option[ActivationId] = None)
+                             cause: Option[ActivationId] = None,
+                             traceContext: Option[Map[String, String]] = None)
     extends Message {
 
   override def serialize = ActivationMessage.serdes.write(this).compactPrint
@@ -67,7 +68,7 @@ object ActivationMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
   private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat9(ActivationMessage.apply)
+  implicit val serdes = jsonFormat10(ActivationMessage.apply)
 }
 
 /**
diff --git a/core/controller/src/main/resources/application.conf 
b/core/controller/src/main/resources/application.conf
index a288636..77ce527 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -78,3 +78,13 @@ ssl-config.enabledCipherSuites = [
   "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
   "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
 ]
+
+whisk{
+  # tracing configuration
+  tracing {
+    component = "Controller"
+  }
+}
+
+
+
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 79da2d5..49d84c2 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
 import spray.json._
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.tracing.WhiskTracerProvider
 import whisk.core.connector.ActivationMessage
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
@@ -146,25 +147,29 @@ protected[actions] trait PrimitiveActions {
 
     // merge package parameters with action (action parameters supersede), 
then merge in payload
     val args = action.parameters merge payload
+    val activationId = activationIdFactory.make()
+
+    val startActivation = transid.started(
+      this,
+      waitForResponse
+        .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
+        .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
+      logLevel = InfoLevel)
+    val startLoadbalancer =
+      transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action 
activation id: ${activationId}")
+
     val message = ActivationMessage(
       transid,
       FullyQualifiedEntityName(action.namespace, action.name, 
Some(action.version)),
       action.rev,
       user,
-      activationIdFactory.make(), // activation id created here
+      activationId, // activation id created here
       activeAckTopicIndex,
       waitForResponse.isDefined,
       args,
-      cause = cause)
+      cause = cause,
+      WhiskTracerProvider.tracer.getTraceContext(transid))
 
-    val startActivation = transid.started(
-      this,
-      waitForResponse
-        .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
-        .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
-      logLevel = InfoLevel)
-    val startLoadbalancer =
-      transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action 
activation id: ${message.activationId}")
     val postedFuture = loadBalancer.publish(action, message)
 
     postedFuture.flatMap { activeAckResponse =>
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index ebd45e5..00c3339 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -66,4 +66,9 @@ whisk {
       pause-grace = 50 milliseconds
     }
   }
+
+  # tracing configuration
+  tracing {
+    component = "Invoker"
+  }
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index ae4a1ae..87ca1e8 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,6 +26,7 @@ import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
 import spray.json._
+import whisk.common.tracing.WhiskTracerProvider
 import whisk.common._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
@@ -192,6 +193,9 @@ class InvokerReactive(
 
         implicit val transid: TransactionId = msg.transid
 
+        //set trace context to continue tracing
+        WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
+
         if (!namespaceBlacklist.isBlacklisted(msg.user)) {
           val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, 
logLevel = InfoLevel)
           val namespace = msg.action.path
diff --git a/tests/build.gradle b/tests/build.gradle
index cf737ed..919d27e 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -145,12 +145,15 @@ dependencies {
     compile 'com.typesafe.akka:akka-http-testkit_2.11:10.1.1'
     compile 'com.github.java-json-tools:json-schema-validator:2.2.8'
     compile "org.mockito:mockito-core:2.15.0"
+    compile 'io.opentracing:opentracing-mock:0.31.0'
 
     compile project(':common:scala')
     compile project(':core:controller')
     compile project(':core:invoker')
     compile project(':tools:admin')
 
+
+
     scoverage gradle.scoverage.deps
 }
 
diff --git a/tests/src/test/scala/common/WskTracingTests.scala 
b/tests/src/test/scala/common/WskTracingTests.scala
new file mode 100644
index 0000000..516f890
--- /dev/null
+++ b/tests/src/test/scala/common/WskTracingTests.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import io.opentracing.Span
+import io.opentracing.mock.{MockSpan, MockTracer}
+import com.github.benmanes.caffeine.cache.Ticker
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import pureconfig.loadConfigOrThrow
+import whisk.common.{LoggingMarkers, TransactionId}
+import whisk.common.tracing.{OpenTracer, TracingConfig}
+import whisk.core.ConfigKeys
+
+import scala.ref.WeakReference
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+@RunWith(classOf[JUnitRunner])
+class WskTracingTests extends FlatSpec with TestHelpers with Matchers {
+
+  val tracer: MockTracer = new MockTracer()
+  val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+  val ticker = new FakeTicker(System.nanoTime())
+  val openTracer = new OpenTracer(tracer, tracingConfig, ticker)
+
+  it should "create span and context and invalidate cache after expiry" in {
+    tracer.reset
+
+    val transactionId: TransactionId = TransactionId.testing
+    var list: List[WeakReference[Span]] = List()
+
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    var ctx = openTracer.getTraceContext(transactionId)
+    openTracer.setTraceContext(transactionId, ctx)
+    ctx should be(defined)
+
+    //advance ticker
+    ticker.time = System.nanoTime() + (tracingConfig.cacheExpiry.toNanos + 100)
+    ctx = openTracer.getTraceContext(transactionId)
+    ctx should not be (defined)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    //no parent for new span as cache expiry cleared spanMap and contextMap
+    finishedSpans.get(0).parentId() should be(0)
+  }
+
+  it should "create a finished span" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+
+  }
+
+  it should "create a child span" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 2
+    val parent: MockSpan = finishedSpans.get(1)
+    val child: MockSpan = finishedSpans.get(0)
+    child.parentId should be(parent.context().spanId)
+
+  }
+
+  it should "create a span with tag" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    val mockSpan: MockSpan = finishedSpans.get(0)
+    mockSpan.tags should not be null
+    mockSpan.tags should have size 1
+
+  }
+
+  it should "create a valid trace context and use it" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    val context = openTracer.getTraceContext(transactionId)
+    openTracer.finishSpan(transactionId)
+    tracer.reset
+    //use context for new span
+    openTracer.setTraceContext(transactionId, context)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    val child: MockSpan = finishedSpans.get(0)
+    //This child span should have a parent as we have set trace context
+    child.parentId should be > 0L
+  }
+}
+
+class FakeTicker(var time: Long) extends Ticker {
+  override def read() = {
+    time
+  }
+}

Reply via email to