This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 63496d5 Distributed tracing support #2192 (#2282)
63496d5 is described below
commit 63496d5ee3a67a24f1e811569dae30ce0b738377
Author: sandeep-paliwal <[email protected]>
AuthorDate: Fri Jun 29 20:37:57 2018 +0530
Distributed tracing support #2192 (#2282)
Enables Tracing support via Zipkin and OpenTracer.
It can be enabled via config
tracing {
zipkin {
url = "http://localhost:9411" //url to connecto to zipkin
server
//sample-rate to decide a request is sampled or not.
sample-rate = "0.01" // sample 1% of requests by default
}
}
Tracing enables tracking of request from Controller to Invoker
---
common/scala/build.gradle | 8 +
common/scala/src/main/resources/application.conf | 14 ++
.../main/scala/whisk/common/TransactionId.scala | 11 ++
.../whisk/common/tracing/OpenTracingProvider.scala | 201 +++++++++++++++++++++
.../src/main/scala/whisk/core/WhiskConfig.scala | 3 +
.../main/scala/whisk/core/connector/Message.scala | 5 +-
.../controller/src/main/resources/application.conf | 10 +
.../core/controller/actions/PrimitiveActions.scala | 25 ++-
core/invoker/src/main/resources/application.conf | 5 +
.../scala/whisk/core/invoker/InvokerReactive.scala | 4 +
tests/build.gradle | 3 +
tests/src/test/scala/common/WskTracingTests.scala | 126 +++++++++++++
12 files changed, 403 insertions(+), 12 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 488c94f..02e9453 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,6 +64,14 @@ dependencies {
compile 'io.kamon:kamon-statsd_2.11:0.6.7'
//for mesos
compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7'
+
+ //tracing support
+ compile 'io.opentracing:opentracing-api:0.31.0'
+ compile 'io.opentracing:opentracing-util:0.31.0'
+ compile 'io.opentracing.brave:brave-opentracing:0.31.0'
+ compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
+ compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'
+
scoverage gradle.scoverage.deps
}
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 3ceaab8..bbac5f4 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -188,4 +188,18 @@ whisk {
constraint-delimiter = " "//used to parse constraint strings
teardown-on-exit = true //set to true to disable the mesos framework
on system exit; set for false for HA deployments
}
+
+ # tracing configuration
+ tracing {
+ cache-expiry = 30 seconds #how long to keep spans in cache. Set to
appropriate value to trace long running requests
+ #Zipkin configuration. Uncomment following to enable zipkin based
tracing
+ #zipkin {
+ # url = "http://localhost:9411" //url to connecto to zipkin server
+ //sample-rate to decide a request is sampled or not.
+ //sample-rate 0.5 eqauls to sampling 50% of the requests
+ //sample-rate of 1 means 100% sampling.
+ //sample-rate of 0 means no sampling
+ # sample-rate = "0.01" // sample 1% of requests by default
+ #}
+ }
}
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala
b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index cb16a42..6441629 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -24,6 +24,8 @@ import akka.http.scaladsl.model.headers.RawHeader
import pureconfig.loadConfigOrThrow
import spray.json._
import whisk.core.ConfigKeys
+import pureconfig._
+import whisk.common.tracing.WhiskTracerProvider
import scala.util.Try
@@ -82,6 +84,9 @@ case class TransactionId private (meta: TransactionMetadata)
extends AnyVal {
}
MetricEmitter.emitCounterMetric(marker)
+
+ //tracing support
+ WhiskTracerProvider.tracer.startSpan(marker, this)
StartMarker(Instant.now, marker)
}
@@ -116,6 +121,9 @@ case class TransactionId private (meta:
TransactionMetadata) extends AnyVal {
}
MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+
+ //tracing support
+ WhiskTracerProvider.tracer.finishSpan(this)
}
/**
@@ -144,6 +152,9 @@ case class TransactionId private (meta:
TransactionMetadata) extends AnyVal {
MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
MetricEmitter.emitCounterMetric(endMarker)
+
+ //tracing support
+ WhiskTracerProvider.tracer.error(this)
}
/**
diff --git
a/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
new file mode 100644
index 0000000..09c105a
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common.tracing
+
+import java.util.concurrent.TimeUnit
+
+import brave.Tracing
+import brave.opentracing.BraveTracer
+import brave.sampler.Sampler
+import com.github.benmanes.caffeine.cache.{Caffeine, Ticker}
+import io.opentracing.propagation.{Format, TextMapExtractAdapter,
TextMapInjectAdapter}
+import io.opentracing.util.GlobalTracer
+import io.opentracing.{Span, SpanContext, Tracer}
+import pureconfig._
+import whisk.common.{LogMarkerToken, TransactionId}
+import whisk.core.ConfigKeys
+import zipkin2.reporter.okhttp3.OkHttpSender
+import zipkin2.reporter.{AsyncReporter, Sender}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+/**
+ * OpenTracing based implementation for tracing
+ */
+class OpenTracer(val tracer: Tracer, tracingConfig: TracingConfig, ticker:
Ticker = SystemTicker) extends WhiskTracer {
+ val spanMap = configureCache[String, List[Span]]()
+ val contextMap = configureCache[String, SpanContext]()
+
+ /**
+ * Start a Trace for given service.
+ *
+ * @param transactionId transactionId to which this Trace belongs.
+ * @return TracedRequest which provides details about current service being
traced.
+ */
+ override def startSpan(logMarker: LogMarkerToken, transactionId:
TransactionId): Unit = {
+ //initialize list for this transactionId
+ val spanList = spanMap.getOrElse(transactionId.meta.id, Nil)
+
+ val spanBuilder = tracer
+ .buildSpan(logMarker.action)
+ .withTag("transactionId", transactionId.meta.id)
+
+ val active = spanList match {
+ case Nil =>
+ //Check if any active context then resume from that else create a
fresh span
+ contextMap
+ .get(transactionId.meta.id)
+ .map(spanBuilder.asChildOf)
+ .getOrElse(spanBuilder.ignoreActiveSpan())
+ .startActive(true)
+ .span()
+ case head :: _ =>
+ //Create a child span of current head
+ spanBuilder.asChildOf(head).startActive(true).span()
+ }
+ //add active span to list
+ spanMap.put(transactionId.meta.id, active :: spanList)
+ }
+
+ /**
+ * Finish a Trace associated with given transactionId.
+ *
+ * @param transactionId
+ */
+ override def finishSpan(transactionId: TransactionId): Unit = {
+ clear(transactionId)
+ }
+
+ /**
+ * Register error
+ *
+ * @param transactionId
+ */
+ override def error(transactionId: TransactionId): Unit = {
+ clear(transactionId)
+ }
+
+ /**
+ * Get the current TraceContext which can be used for downstream services
+ *
+ * @param transactionId
+ * @return
+ */
+ override def getTraceContext(transactionId: TransactionId):
Option[Map[String, String]] = {
+ spanMap
+ .get(transactionId.meta.id)
+ .flatMap(_.headOption)
+ .map { span =>
+ val map = mutable.Map.empty[String, String]
+ tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new
TextMapInjectAdapter(map.asJava))
+ map.toMap
+ }
+ }
+
+ /**
+ * Get the current TraceContext which can be used for downstream services
+ *
+ * @param transactionId
+ * @return
+ */
+ override def setTraceContext(transactionId: TransactionId, context:
Option[Map[String, String]]) = {
+ context.foreach { scalaMap =>
+ val ctx: SpanContext = tracer.extract(Format.Builtin.TEXT_MAP, new
TextMapExtractAdapter(scalaMap.asJava))
+ contextMap.put(transactionId.meta.id, ctx)
+ }
+ }
+
+ private def clear(transactionId: TransactionId): Unit = {
+ spanMap.get(transactionId.meta.id).foreach {
+ case head :: Nil =>
+ head.finish()
+ spanMap.remove(transactionId.meta.id)
+ contextMap.remove(transactionId.meta.id)
+ case head :: tail =>
+ head.finish()
+ spanMap.put(transactionId.meta.id, tail)
+ case Nil =>
+ }
+ }
+
+ private def configureCache[T, R](): collection.concurrent.Map[T, R] =
+ Caffeine
+ .newBuilder()
+ .ticker(ticker)
+ .expireAfterAccess(tracingConfig.cacheExpiry.toSeconds, TimeUnit.SECONDS)
+ .build()
+ .asMap()
+ .asScala
+ .asInstanceOf[collection.concurrent.Map[T, R]]
+}
+
+trait WhiskTracer {
+ def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit
= {}
+ def finishSpan(transactionId: TransactionId): Unit = {}
+ def error(transactionId: TransactionId): Unit = {}
+ def getTraceContext(transactionId: TransactionId): Option[Map[String,
String]] = None
+ def setTraceContext(transactionId: TransactionId, context:
Option[Map[String, String]]): Unit = {}
+}
+
+object WhiskTracerProvider {
+ val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+
+ val tracer: WhiskTracer = createTracer(tracingConfig)
+
+ private def createTracer(tracingConfig: TracingConfig): WhiskTracer = {
+
+ tracingConfig.zipkin match {
+ case Some(zipkinConfig) => {
+ if (!GlobalTracer.isRegistered) {
+ val sender: Sender = OkHttpSender.create(zipkinConfig.generateUrl)
+ val spanReporter = AsyncReporter.create(sender)
+ val braveTracing = Tracing
+ .newBuilder()
+ .localServiceName(tracingConfig.component)
+ .spanReporter(spanReporter)
+ .sampler(Sampler.create(zipkinConfig.sampleRate.toFloat))
+ .build()
+
+ //register with OpenTracing
+ GlobalTracer.register(BraveTracer.create(braveTracing))
+
+ sys.addShutdownHook({ spanReporter.close() })
+ }
+ }
+ case None =>
+ }
+
+ if (GlobalTracer.isRegistered)
+ new OpenTracer(GlobalTracer.get(), tracingConfig)
+ else
+ NoopTracer
+ }
+}
+
+private object NoopTracer extends WhiskTracer
+case class TracingConfig(component: String, cacheExpiry: Duration, zipkin:
Option[ZipkinConfig] = None)
+case class ZipkinConfig(url: String, sampleRate: String) {
+ def generateUrl = s"$url/api/v2/spans"
+}
+object SystemTicker extends Ticker {
+ override def read() = {
+ System.nanoTime()
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index fb20fda..be9f7b0 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -222,6 +222,9 @@ object ConfigKeys {
val dockerContainerFactory = s"${docker}.container-factory"
val runc = "whisk.runc"
val runcTimeouts = s"$runc.timeouts"
+
+ val tracing = "whisk.tracing"
+
val containerFactory = "whisk.container-factory"
val containerArgs = s"$containerFactory.container-args"
val containerPool = "whisk.container-pool"
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index eb960b3..d540a54 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -49,7 +49,8 @@ case class ActivationMessage(override val transid:
TransactionId,
rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
content: Option[JsObject],
- cause: Option[ActivationId] = None)
+ cause: Option[ActivationId] = None,
+ traceContext: Option[Map[String, String]] = None)
extends Message {
override def serialize = ActivationMessage.serdes.write(this).compactPrint
@@ -67,7 +68,7 @@ object ActivationMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
- implicit val serdes = jsonFormat9(ActivationMessage.apply)
+ implicit val serdes = jsonFormat10(ActivationMessage.apply)
}
/**
diff --git a/core/controller/src/main/resources/application.conf
b/core/controller/src/main/resources/application.conf
index a288636..77ce527 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -78,3 +78,13 @@ ssl-config.enabledCipherSuites = [
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
]
+
+whisk{
+ # tracing configuration
+ tracing {
+ component = "Controller"
+ }
+}
+
+
+
diff --git
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 79da2d5..49d84c2 100644
---
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import spray.json._
import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.tracing.WhiskTracerProvider
import whisk.core.connector.ActivationMessage
import whisk.core.controller.WhiskServices
import whisk.core.database.NoDocumentException
@@ -146,25 +147,29 @@ protected[actions] trait PrimitiveActions {
// merge package parameters with action (action parameters supersede),
then merge in payload
val args = action.parameters merge payload
+ val activationId = activationIdFactory.make()
+
+ val startActivation = transid.started(
+ this,
+ waitForResponse
+ .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
+ .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
+ logLevel = InfoLevel)
+ val startLoadbalancer =
+ transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action
activation id: ${activationId}")
+
val message = ActivationMessage(
transid,
FullyQualifiedEntityName(action.namespace, action.name,
Some(action.version)),
action.rev,
user,
- activationIdFactory.make(), // activation id created here
+ activationId, // activation id created here
activeAckTopicIndex,
waitForResponse.isDefined,
args,
- cause = cause)
+ cause = cause,
+ WhiskTracerProvider.tracer.getTraceContext(transid))
- val startActivation = transid.started(
- this,
- waitForResponse
- .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
- .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
- logLevel = InfoLevel)
- val startLoadbalancer =
- transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action
activation id: ${message.activationId}")
val postedFuture = loadBalancer.publish(action, message)
postedFuture.flatMap { activeAckResponse =>
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index ebd45e5..00c3339 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -66,4 +66,9 @@ whisk {
pause-grace = 50 milliseconds
}
}
+
+ # tracing configuration
+ tracing {
+ component = "Invoker"
+ }
}
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index ae4a1ae..87ca1e8 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,6 +26,7 @@ import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json._
+import whisk.common.tracing.WhiskTracerProvider
import whisk.common._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
@@ -192,6 +193,9 @@ class InvokerReactive(
implicit val transid: TransactionId = msg.transid
+ //set trace context to continue tracing
+ WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
+
if (!namespaceBlacklist.isBlacklisted(msg.user)) {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION,
logLevel = InfoLevel)
val namespace = msg.action.path
diff --git a/tests/build.gradle b/tests/build.gradle
index cf737ed..919d27e 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -145,12 +145,15 @@ dependencies {
compile 'com.typesafe.akka:akka-http-testkit_2.11:10.1.1'
compile 'com.github.java-json-tools:json-schema-validator:2.2.8'
compile "org.mockito:mockito-core:2.15.0"
+ compile 'io.opentracing:opentracing-mock:0.31.0'
compile project(':common:scala')
compile project(':core:controller')
compile project(':core:invoker')
compile project(':tools:admin')
+
+
scoverage gradle.scoverage.deps
}
diff --git a/tests/src/test/scala/common/WskTracingTests.scala
b/tests/src/test/scala/common/WskTracingTests.scala
new file mode 100644
index 0000000..516f890
--- /dev/null
+++ b/tests/src/test/scala/common/WskTracingTests.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import io.opentracing.Span
+import io.opentracing.mock.{MockSpan, MockTracer}
+import com.github.benmanes.caffeine.cache.Ticker
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import pureconfig.loadConfigOrThrow
+import whisk.common.{LoggingMarkers, TransactionId}
+import whisk.common.tracing.{OpenTracer, TracingConfig}
+import whisk.core.ConfigKeys
+
+import scala.ref.WeakReference
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+@RunWith(classOf[JUnitRunner])
+class WskTracingTests extends FlatSpec with TestHelpers with Matchers {
+
+ val tracer: MockTracer = new MockTracer()
+ val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+ val ticker = new FakeTicker(System.nanoTime())
+ val openTracer = new OpenTracer(tracer, tracingConfig, ticker)
+
+ it should "create span and context and invalidate cache after expiry" in {
+ tracer.reset
+
+ val transactionId: TransactionId = TransactionId.testing
+ var list: List[WeakReference[Span]] = List()
+
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+ var ctx = openTracer.getTraceContext(transactionId)
+ openTracer.setTraceContext(transactionId, ctx)
+ ctx should be(defined)
+
+ //advance ticker
+ ticker.time = System.nanoTime() + (tracingConfig.cacheExpiry.toNanos + 100)
+ ctx = openTracer.getTraceContext(transactionId)
+ ctx should not be (defined)
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+ openTracer.finishSpan(transactionId)
+ val finishedSpans = tracer.finishedSpans()
+ finishedSpans should have size 1
+ //no parent for new span as cache expiry cleared spanMap and contextMap
+ finishedSpans.get(0).parentId() should be(0)
+ }
+
+ it should "create a finished span" in {
+ tracer.reset
+ val transactionId: TransactionId = TransactionId.testing
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+ openTracer.finishSpan(transactionId)
+ val finishedSpans = tracer.finishedSpans()
+ finishedSpans should have size 1
+
+ }
+
+ it should "create a child span" in {
+ tracer.reset
+ val transactionId: TransactionId = TransactionId.testing
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+ openTracer.finishSpan(transactionId)
+ openTracer.finishSpan(transactionId)
+ val finishedSpans = tracer.finishedSpans()
+ finishedSpans should have size 2
+ val parent: MockSpan = finishedSpans.get(1)
+ val child: MockSpan = finishedSpans.get(0)
+ child.parentId should be(parent.context().spanId)
+
+ }
+
+ it should "create a span with tag" in {
+ tracer.reset
+ val transactionId: TransactionId = TransactionId.testing
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+ openTracer.finishSpan(transactionId)
+ val finishedSpans = tracer.finishedSpans()
+ finishedSpans should have size 1
+ val mockSpan: MockSpan = finishedSpans.get(0)
+ mockSpan.tags should not be null
+ mockSpan.tags should have size 1
+
+ }
+
+ it should "create a valid trace context and use it" in {
+ tracer.reset
+ val transactionId: TransactionId = TransactionId.testing
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+ val context = openTracer.getTraceContext(transactionId)
+ openTracer.finishSpan(transactionId)
+ tracer.reset
+ //use context for new span
+ openTracer.setTraceContext(transactionId, context)
+ openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+ openTracer.finishSpan(transactionId)
+ val finishedSpans = tracer.finishedSpans()
+ finishedSpans should have size 1
+ val child: MockSpan = finishedSpans.get(0)
+ //This child span should have a parent as we have set trace context
+ child.parentId should be > 0L
+ }
+}
+
+class FakeTicker(var time: Long) extends Ticker {
+ override def read() = {
+ time
+ }
+}