This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push: new 842673a7893 CAMEL-19776 for camel-4.0.x (#11230) 842673a7893 is described below commit 842673a7893052a22aec9d8bfb0c96f6ccd0d605 Author: RuslanHryn <gry...@gmail.com> AuthorDate: Tue Aug 29 22:08:28 2023 +0300 CAMEL-19776 for camel-4.0.x (#11230) * CAMEL-19776: Added tracing strategy for OpenTelemetry to trace processors * CAMEL-19776: Removed unnecessary sleep in the test --------- Co-authored-by: Ruslan Hryn <hryn.exter...@crxmarkets.com> --- .../camel/opentelemetry/NoopTracingStrategy.java | 34 +++++++ .../OpenTelemetryTracingStrategy.java | 107 +++++++++++++++++++++ .../CamelOpenTelemetryTestSupport.java | 7 ++ .../OpenTelemetryTracingStrategyTest.java | 94 ++++++++++++++++++ 4 files changed, 242 insertions(+) diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java new file mode 100644 index 00000000000..ad8d54ba761 --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/NoopTracingStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import org.apache.camel.CamelContext; +import org.apache.camel.NamedNode; +import org.apache.camel.Processor; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.support.processor.DelegateAsyncProcessor; + +public class NoopTracingStrategy implements InterceptStrategy { + + @Override + public Processor wrapProcessorInInterceptors( + CamelContext camelContext, NamedNode processorDefinition, + Processor target, Processor nextTarget) + throws Exception { + return new DelegateAsyncProcessor(target); + } +} diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java new file mode 100644 index 00000000000..7e2266bf3ae --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.NamedNode; +import org.apache.camel.Processor; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.support.PatternHelper; +import org.apache.camel.support.processor.DelegateAsyncProcessor; +import org.apache.camel.tracing.ActiveSpanManager; +import org.apache.camel.tracing.SpanDecorator; + +public class OpenTelemetryTracingStrategy implements InterceptStrategy { + private static final String UNNAMED = "unnamed"; + private final OpenTelemetryTracer tracer; + + public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) { + this.tracer = tracer; + } + + @Override + public Processor wrapProcessorInInterceptors( + CamelContext camelContext, + NamedNode processorDefinition, Processor target, Processor nextTarget) + throws Exception { + if (!shouldTrace(processorDefinition)) { + return new DelegateAsyncProcessor(target); + } + + return new DelegateAsyncProcessor((Exchange exchange) -> { + OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) ActiveSpanManager.getSpan(exchange); + Span span = spanWrapper.getOpenTelemetrySpan(); + if (span == null) { + target.process(exchange); + return; + } + + final Span processorSpan = tracer.getTracer().spanBuilder(getOperationName(processorDefinition)) + .setParent(Context.current().with(span)) + .setAttribute("component", getComponentName(processorDefinition)) + .startSpan(); + + boolean activateExchange = !(target instanceof GetCorrelationContextProcessor + || target instanceof SetCorrelationContextProcessor); + + if (activateExchange) { + ActiveSpanManager.activate(exchange, new OpenTelemetrySpanAdapter(processorSpan)); + } + + try (Scope ignored = processorSpan.makeCurrent()) { + target.process(exchange); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR); + span.recordException(ex); + throw ex; + } finally { + if (activateExchange) { + ActiveSpanManager.deactivate(exchange); + } + + processorSpan.end(); + } + }); + } + + private static String getComponentName(NamedNode processorDefinition) { + return SpanDecorator.CAMEL_COMPONENT + processorDefinition.getShortName(); + } + + private static String getOperationName(NamedNode processorDefinition) { + final String name = processorDefinition.getId(); + return name == null ? UNNAMED : name; + } + + // Adapted from org.apache.camel.impl.engine.DefaultTracer.shouldTrace + // org.apache.camel.impl.engine.DefaultTracer.shouldTracePattern + private boolean shouldTrace(NamedNode definition) { + for (String pattern : tracer.getExcludePatterns()) { + // use matchPattern method from endpoint helper that has a good matcher we use in Camel + if (PatternHelper.matchPattern(definition.getId(), pattern)) { + return false; + } + } + + return true; + } +} diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java index c6a4a71d7de..c4d49c9a60d 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import io.opentelemetry.api.common.AttributeKey; @@ -38,6 +39,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.apache.camel.CamelContext; +import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.tracing.SpanDecorator; import org.awaitility.Awaitility; @@ -86,6 +88,7 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { ottracer.setTracer(tracer); ottracer.setExcludePatterns(getExcludePatterns()); ottracer.addDecorator(new TestSEDASpanDecorator()); + ottracer.setTracingStrategy(getTracingStrategy().apply(ottracer)); ottracer.init(context); return context; } @@ -209,6 +212,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count()); } + protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() { + return ottracer -> new NoopTracingStrategy(); + } + private static class LoggingSpanProcessor implements SpanProcessor { private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class); diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java new file mode 100644 index 00000000000..4701e65eef8 --- /dev/null +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import java.util.function.Function; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Scope; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.InterceptStrategy; +import org.junit.jupiter.api.Test; + +class OpenTelemetryTracingStrategyTest extends CamelOpenTelemetryTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("camel-process").setOperation("third-party-span") + .setParentId(1), + new SpanTestData().setLabel("camel-process").setOperation("third-party-processor") + .setParentId(6), + new SpanTestData().setLabel("camel-process").setOperation("direct-processor") + .setParentId(3), + new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB") + .setParentId(4), + new SpanTestData().setLabel("direct:serviceB").setOperation("serviceB") + .setKind(SpanKind.CLIENT) + .setParentId(5), + new SpanTestData().setLabel("to:serviceB").setOperation("to-serviceB") + .setParentId(6), + new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA") + .setParentId(7), + new SpanTestData().setLabel("direct:serviceA").setUri("direct://start").setOperation("serviceA") + .setKind(SpanKind.CLIENT) + }; + + OpenTelemetryTracingStrategyTest() { + super(testdata); + } + + @Test + void testTracingOfProcessors() { + template.requestBody("direct:serviceA", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:serviceA").routeId("serviceA") + .process(exchange -> { + callThirdPartyInstrumentation(); + }).id("third-party-processor") + .to("direct:serviceB").id("to-serviceB"); + + from("direct:serviceB").routeId("serviceB") + .process(exchange -> { + // noop + }).id("direct-processor"); + } + + private void callThirdPartyInstrumentation() throws InterruptedException { + Span span = getTracer().spanBuilder("third-party-span").startSpan(); + try (Scope ignored = span.makeCurrent()) { + span.setAttribute(COMPONENT_KEY, "third-party-component"); + } finally { + span.end(); + } + } + }; + } + + @Override + protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() { + return OpenTelemetryTracingStrategy::new; + } +}