Repository: cxf Updated Branches: refs/heads/master 5ce0b6eb1 -> 3fa435a29
CXF-7247: Support tracing using latest Zipkin Brave 4.x release. More test cases, fixing proper context propagation for sync/async calls. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3fa435a2 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3fa435a2 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3fa435a2 Branch: refs/heads/master Commit: 3fa435a2908d95b656933d0d34f9ccf8c0153ffe Parents: 5ce0b6e Author: reta <drr...@gmail.com> Authored: Sun May 28 21:03:23 2017 -0400 Committer: reta <drr...@gmail.com> Committed: Sun May 28 21:03:23 2017 -0400 ---------------------------------------------------------------------- .../brave/AbstractBraveClientProvider.java | 24 +++---- .../tracing/brave/AbstractBraveProvider.java | 15 ++-- .../brave/BraveClientStartInterceptor.java | 4 +- .../brave/BraveClientStopInterceptor.java | 5 +- .../brave/jaxrs/BraveClientProvider.java | 8 +-- .../jaxrs/tracing/brave/BraveTracingTest.java | 76 ++++++++++++++++++++ .../jaxrs/tracing/htrace/HTraceTracingTest.java | 76 ++++++++++++++++++++ 7 files changed, 177 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientProvider.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientProvider.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientProvider.java index f0f2ebb..f5c190f 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientProvider.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientProvider.java @@ -33,6 +33,7 @@ import org.apache.cxf.tracing.brave.internal.HttpAdapterFactory.Response; import org.apache.cxf.tracing.brave.internal.HttpClientAdapterFactory; import brave.Span; +import brave.Tracer.SpanInScope; import brave.http.HttpClientAdapter; import brave.http.HttpClientHandler; import brave.http.HttpTracing; @@ -48,7 +49,7 @@ public abstract class AbstractBraveClientProvider extends AbstractTracingProvide this.brave = brave; } - protected TraceScopeHolder<Span> startTraceSpan(final Map<String, List<String>> requestHeaders, + protected TraceScopeHolder<TraceScope> startTraceSpan(final Map<String, List<String>> requestHeaders, URI uri, String method) { final Request request = HttpAdapterFactory.request(requestHeaders, uri, method); @@ -65,13 +66,12 @@ public abstract class AbstractBraveClientProvider extends AbstractTracingProvide // In case of asynchronous client invocation, the span should be detached as JAX-RS // client request / response filters are going to be executed in different threads. - boolean detached = false; - if (isAsyncInvocation() && span != null) { - // No need to detach implicitly, Brave is using inheritable thread local context - detached = true; + SpanInScope scope = null; + if (!isAsyncInvocation() && span != null && !span.isNoop()) { + scope = brave.tracing().tracer().withSpanInScope(span); } - return new TraceScopeHolder<Span>(span, detached); + return new TraceScopeHolder<TraceScope>(new TraceScope(span, scope), scope == null /* detached */); } private <C> Setter<C, String> inject(final Map<String, List<String>> requestHeaders) { @@ -86,18 +86,18 @@ public abstract class AbstractBraveClientProvider extends AbstractTracingProvide return !JAXRSUtils.getCurrentMessage().getExchange().isSynchronous(); } - protected void stopTraceSpan(final TraceScopeHolder<Span> holder, final int responseStatus) { + protected void stopTraceSpan(final TraceScopeHolder<TraceScope> holder, final int responseStatus) { if (holder == null) { return; } - final Span span = holder.getScope(); - if (span != null) { + final TraceScope scope = holder.getScope(); + if (scope != null) { try { // If the client invocation was asynchronous , the trace span has been created // in another thread and should be re-attached to the current one. if (holder.isDetached()) { - brave.tracing().tracer().joinSpan(span.context()); + brave.tracing().tracer().joinSpan(scope.getSpan().context()); } final Response response = HttpAdapterFactory.response(responseStatus); @@ -105,9 +105,9 @@ public abstract class AbstractBraveClientProvider extends AbstractTracingProvide @SuppressWarnings("unchecked") final HttpClientHandler<?, Response> handler = HttpClientHandler.create(brave, adapter); - handler.handleReceive(response, null, span); + handler.handleReceive(response, null, scope.getSpan()); } finally { - span.finish(); + scope.close(); } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveProvider.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveProvider.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveProvider.java index e7ae27c..64eac55 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveProvider.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveProvider.java @@ -64,20 +64,17 @@ public abstract class AbstractBraveProvider extends AbstractTracingProvider { .extractor(adapter::requestHeader), request); - SpanInScope scope = null; - if (span != null && !span.isNoop()) { - scope = brave.tracing().tracer().withSpanInScope(span); - } - // If the service resource is using asynchronous processing mode, the trace // scope will be closed in another thread and as such should be detached. - boolean detached = false; - if (isAsyncResponse()) { + SpanInScope scope = null; + if (isAsyncResponse() && span != null) { + // Do not modify the current context span propagateContinuationSpan(span); - detached = true; + } else if (span != null && !span.isNoop()) { + scope = brave.tracing().tracer().withSpanInScope(span); } - return new TraceScopeHolder<TraceScope>(new TraceScope(span, scope), detached); + return new TraceScopeHolder<TraceScope>(new TraceScope(span, scope), scope == null /* detached */); } protected void stopTraceSpan(final Map<String, List<String>> requestHeaders, http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStartInterceptor.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStartInterceptor.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStartInterceptor.java index fdedcee..34e944d 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStartInterceptor.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStartInterceptor.java @@ -23,7 +23,6 @@ import org.apache.cxf.message.Message; import org.apache.cxf.phase.Phase; import org.apache.cxf.tracing.brave.AbstractBraveInterceptor.ParsedMessage; -import brave.Span; import brave.http.HttpTracing; public class BraveClientStartInterceptor extends AbstractBraveClientInterceptor { @@ -39,12 +38,11 @@ public class BraveClientStartInterceptor extends AbstractBraveClientInterceptor public void handleMessage(Message message) throws Fault { final ParsedMessage parsed = new ParsedMessage(message); - final TraceScopeHolder<Span> holder = super.startTraceSpan(parsed.getHeaders(), + final TraceScopeHolder<TraceScope> holder = super.startTraceSpan(parsed.getHeaders(), parsed.getUri(), parsed.getHttpMethod()); if (holder != null) { message.getExchange().put(TRACE_SPAN, holder); } } - } http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStopInterceptor.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStopInterceptor.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStopInterceptor.java index 8e4487b..ebca21a 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStopInterceptor.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/BraveClientStopInterceptor.java @@ -22,7 +22,6 @@ import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Message; import org.apache.cxf.phase.Phase; -import brave.Span; import brave.http.HttpTracing; public class BraveClientStopInterceptor extends AbstractBraveClientInterceptor { @@ -37,8 +36,8 @@ public class BraveClientStopInterceptor extends AbstractBraveClientInterceptor { @Override public void handleMessage(Message message) throws Fault { @SuppressWarnings("unchecked") - final TraceScopeHolder<Span> holder = - (TraceScopeHolder<Span>)message.getExchange().get(TRACE_SPAN); + final TraceScopeHolder<TraceScope> holder = + (TraceScopeHolder<TraceScope>)message.getExchange().get(TRACE_SPAN); Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE); if (responseCode == null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/jaxrs/BraveClientProvider.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/jaxrs/BraveClientProvider.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/jaxrs/BraveClientProvider.java index 0de7c32..e08553a 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/jaxrs/BraveClientProvider.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/jaxrs/BraveClientProvider.java @@ -28,8 +28,8 @@ import javax.ws.rs.ext.Provider; import org.apache.cxf.tracing.brave.AbstractBraveClientProvider; import org.apache.cxf.tracing.brave.HttpClientSpanParser; +import org.apache.cxf.tracing.brave.TraceScope; -import brave.Span; import brave.Tracing; import brave.http.HttpTracing; @@ -52,7 +52,7 @@ public class BraveClientProvider extends AbstractBraveClientProvider @Override public void filter(final ClientRequestContext requestContext) throws IOException { - final TraceScopeHolder<Span> holder = super.startTraceSpan(requestContext.getStringHeaders(), + final TraceScopeHolder<TraceScope> holder = super.startTraceSpan(requestContext.getStringHeaders(), requestContext.getUri(), requestContext.getMethod()); if (holder != null) { @@ -64,8 +64,8 @@ public class BraveClientProvider extends AbstractBraveClientProvider @Override public void filter(final ClientRequestContext requestContext, final ClientResponseContext responseContext) throws IOException { - final TraceScopeHolder<Span> holder = - (TraceScopeHolder<Span>)requestContext.getProperty(TRACE_SPAN); + final TraceScopeHolder<TraceScope> holder = + (TraceScopeHolder<TraceScope>)requestContext.getProperty(TRACE_SPAN); super.stopTraceSpan(holder, responseContext.getStatus()); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/brave/BraveTracingTest.java ---------------------------------------------------------------------- diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/brave/BraveTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/brave/BraveTracingTest.java index b6fdf89..315a365 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/brave/BraveTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/brave/BraveTracingTest.java @@ -20,9 +20,14 @@ package org.apache.cxf.systest.jaxrs.tracing.brave; import java.net.MalformedURLException; import java.util.Arrays; +import java.util.Collection; import java.util.Random; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -235,6 +240,69 @@ public class BraveTracingTest extends AbstractBusClientServerTestBase { } @Test + public void testThatNewSpansAreCreatedWhenNotProvidedUsingMultipleAsyncClients() throws Exception { + final WebClient client = createWebClient("/bookstore/books", braveClientProvider); + + // The intention is to make a calls one after another, not in parallel, to ensure the + // thread have trace contexts cleared out. + final Collection<Response> responses = IntStream + .range(0, 4) + .mapToObj(index -> client.async().get()) + .map(this::get) + .collect(Collectors.toList()); + + for (final Response r: responses) { + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + assertThatTraceHeadersArePresent(r, false); + } + + assertThat(TestSpanReporter.getAllSpans().size(), equalTo(12)); + + IntStream + .range(0, 4) + .map(index -> index * 3) + .forEach(index -> { + assertThat(TestSpanReporter.getAllSpans().get(index).name, + equalTo("get books")); + assertThat(TestSpanReporter.getAllSpans().get(index + 1).name, + equalTo("get /bookstore/books")); + assertThat(TestSpanReporter.getAllSpans().get(index + 2).name, + equalTo("get " + client.getCurrentURI())); + }); + } + + @Test + public void testThatNewSpansAreCreatedWhenNotProvidedUsingMultipleClients() throws Exception { + final WebClient client = createWebClient("/bookstore/books", braveClientProvider); + + // The intention is to make a calls one after another, not in parallel, to ensure the + // thread have trace contexts cleared out. + final Collection<Response> responses = IntStream + .range(0, 4) + .mapToObj(index -> client.get()) + .collect(Collectors.toList()); + + for (final Response r: responses) { + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + assertThatTraceHeadersArePresent(r, false); + } + + assertThat(TestSpanReporter.getAllSpans().size(), equalTo(12)); + + IntStream + .range(0, 4) + .map(index -> index * 3) + .forEach(index -> { + assertThat(TestSpanReporter.getAllSpans().get(index).name, + equalTo("get books")); + assertThat(TestSpanReporter.getAllSpans().get(index + 1).name, + equalTo("get /bookstore/books")); + assertThat(TestSpanReporter.getAllSpans().get(index + 2).name, + equalTo("get " + client.getCurrentURI())); + }); + } + + @Test public void testThatProvidedSpanIsNotClosedWhenActive() throws MalformedURLException { final WebClient client = createWebClient("/bookstore/books", braveClientProvider); final Span span = brave.tracer().nextSpan().name("test span").start(); @@ -338,6 +406,14 @@ public class BraveTracingTest extends AbstractBusClientServerTestBase { assertFalse(r.getHeaders().containsKey(PARENT_SPAN_ID_NAME)); } } + + private<T> T get(final Future<T> future) { + try { + return future.get(1, TimeUnit.HOURS); + } catch (InterruptedException | TimeoutException | ExecutionException ex) { + throw new RuntimeException(ex); + } + } private SpanId fromRandom() { return new SpanId() http://git-wip-us.apache.org/repos/asf/cxf/blob/3fa435a2/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java ---------------------------------------------------------------------- diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java index f786f92..2bf22a6 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java @@ -20,10 +20,15 @@ package org.apache.cxf.systest.jaxrs.tracing.htrace; import java.net.MalformedURLException; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -288,6 +293,77 @@ public class HTraceTracingTest extends AbstractBusClientServerTestBase { assertThat((String)r.getHeaders().getFirst(TracerHeaders.DEFAULT_HEADER_SPAN_ID), equalTo(spanId.toString())); } + @Test + public void testThatNewSpansAreCreatedWhenNotProvidedUsingMultipleAsyncClients() throws Exception { + final WebClient client = createWebClient("/bookstore/books", htraceClientProvider); + + // The intention is to make a calls one after another, not in parallel, to ensure the + // thread have trace contexts cleared out. + final Collection<Response> responses = IntStream + .range(0, 4) + .mapToObj(index -> client.async().get()) + .map(this::get) + .collect(Collectors.toList()); + + for (final Response r: responses) { + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + assertTrue(r.getHeaders().containsKey(TracerHeaders.DEFAULT_HEADER_SPAN_ID)); + } + + assertThat(TestSpanReceiver.getAllSpans().size(), equalTo(12)); + + IntStream + .range(0, 4) + .map(index -> index * 3) + .forEach(index -> { + assertThat(TestSpanReceiver.getAllSpans().get(index).getDescription(), + equalTo("Get Books")); + assertThat(TestSpanReceiver.getAllSpans().get(index + 1).getDescription(), + equalTo("GET bookstore/books")); + assertThat(TestSpanReceiver.getAllSpans().get(index + 2).getDescription(), + equalTo("GET " + client.getCurrentURI())); + }); + } + + @Test + public void testThatNewSpansAreCreatedWhenNotProvidedUsingMultipleClients() throws Exception { + final WebClient client = createWebClient("/bookstore/books", htraceClientProvider); + + // The intention is to make a calls one after another, not in parallel, to ensure the + // thread have trace contexts cleared out. + final Collection<Response> responses = IntStream + .range(0, 4) + .mapToObj(index -> client.get()) + .collect(Collectors.toList()); + + for (final Response r: responses) { + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + assertTrue(r.getHeaders().containsKey(TracerHeaders.DEFAULT_HEADER_SPAN_ID)); + } + + assertThat(TestSpanReceiver.getAllSpans().size(), equalTo(12)); + + IntStream + .range(0, 4) + .map(index -> index * 3) + .forEach(index -> { + assertThat(TestSpanReceiver.getAllSpans().get(index).getDescription(), + equalTo("Get Books")); + assertThat(TestSpanReceiver.getAllSpans().get(index + 1).getDescription(), + equalTo("GET bookstore/books")); + assertThat(TestSpanReceiver.getAllSpans().get(index + 2).getDescription(), + equalTo("GET " + client.getCurrentURI())); + }); + } + + private<T> T get(final Future<T> future) { + try { + return future.get(1, TimeUnit.HOURS); + } catch (InterruptedException | TimeoutException | ExecutionException ex) { + throw new RuntimeException(ex); + } + } + protected WebClient createWebClient(final String url, final Object ... providers) { return WebClient .create("http://localhost:" + PORT + url, Arrays.asList(providers))