gemmellr commented on a change in pull request #319: URL: https://github.com/apache/qpid-proton/pull/319#discussion_r692928889
########## File path: cpp/src/tracing.cpp ########## @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <opentelemetry/sdk/trace/simple_processor.h> +#include <opentelemetry/sdk/trace/tracer_provider.h> +#include <opentelemetry/trace/provider.h> +#include <opentelemetry/trace/span.h> + +#include <proton/messaging_handler.hpp> + +// Using an exporter that simply dumps span data to stdout. +#include <opentelemetry/exporters/ostream/span_exporter.h> +#include <proton/annotation_key.hpp> +#include <proton/delivery.hpp> +#include <proton/message.hpp> +#include <proton/receiver.hpp> +#include <proton/sender.hpp> +#include <proton/source.hpp> +#include <proton/target.hpp> +#include <proton/tracing.hpp> +#include <proton/tracker.hpp> +#include <proton/transfer.hpp> + +#include "opentelemetry/nostd/unique_ptr.h" + +#include "opentelemetry/baggage/propagation/baggage_propagator.h" +#include "opentelemetry/context/propagation/global_propagator.h" +#include "opentelemetry/context/propagation/text_map_propagator.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" + +#include <iostream> +#include <sstream> +#include <memory> + +namespace proton +{ +namespace trace = opentelemetry::trace; +namespace nostd = opentelemetry::nostd; +namespace common = opentelemetry::common; +namespace context = opentelemetry::context; + +const std::string kContextKey = "CONTEXT"; +std::map<binary, nostd::shared_ptr<trace::Span>> tag_span; + +template <typename T> +class AMQPTextMapCarrier + : public opentelemetry::context::propagation::TextMapCarrier { + public: + AMQPTextMapCarrier<T>(T *message_annotations) + : message_annotations_(message_annotations) {} + virtual nostd::string_view + Get(nostd::string_view key) const noexcept override { + std::string key_to_compare = key.data(); + + if (message_annotations_->exists(annotation_key(key_to_compare))) { + value extracted_value = + message_annotations_->get(annotation_key(key_to_compare)); + std::string extracted_string = to_string(extracted_value); + extracted_strings.push_back(extracted_string); + nostd::string_view final_extracted_string = + nostd::string_view(extracted_strings.back()); + + return final_extracted_string; + } + else + { + std::cout << "Key does not exists" << std::endl; + } + return ""; + } + + virtual void Set(nostd::string_view key, + nostd::string_view val) noexcept override { + + message_annotations_->put(annotation_key(std::string(key)), + value(std::string(val))); + } + + T *message_annotations_; + mutable std::vector<std::string> extracted_strings; +}; + +void initTracer(std::unique_ptr<sdktrace::SpanExporter> exporter) +{ + // auto exporter = std::unique_ptr<sdktrace::SpanExporter>( + // new opentelemetry::exporter::trace::OStreamSpanExporter); + auto processor = std::unique_ptr<sdktrace::SpanProcessor>( + new sdktrace::SimpleSpanProcessor(std::move(exporter))); + auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>( + new sdktrace::TracerProvider(std::move(processor))); + + // Set the global trace provider + opentelemetry::trace::Provider::SetTracerProvider(provider); + + // set global propagator + opentelemetry::context::propagation::GlobalTextMapPropagator:: + SetGlobalPropagator( + nostd::shared_ptr< + opentelemetry::context::propagation::TextMapPropagator>( + new opentelemetry::trace::propagation::HttpTraceContext())); +} + +nostd::shared_ptr<trace::Tracer> get_tracer() +{ + auto provider = trace::Provider::GetTracerProvider(); + nostd::shared_ptr<opentelemetry::v0::trace::Tracer> tracer = + provider->GetTracer("qpid-tracer"); + return tracer; +} + +void on_message_span(message &message, delivery &d) { + + opentelemetry::trace::StartSpanOptions options; + options.kind = opentelemetry::trace::SpanKind::kConsumer; + + // extract context from AMQP message annotations + const AMQPTextMapCarrier<proton::message::annotation_map> carrier( + &message.message_annotations()); + + nostd::shared_ptr< + opentelemetry::v0::context::propagation::TextMapPropagator> + prop = opentelemetry::context::propagation::GlobalTextMapPropagator:: + GetGlobalPropagator(); + context::Context current_ctx = + opentelemetry::context::RuntimeContext::GetCurrent(); + + context::Context new_context = prop->Extract(carrier, current_ctx); + options.parent = + opentelemetry::trace::propagation::GetSpan(new_context)->GetContext(); + + binary tag_in_binary = d.tag(); + std::string tag_in_string = std::string(d.tag()); + std::stringstream ss; + for(int i=0; i<(int)tag_in_string.length(); ++i) + ss << std::hex << (int)tag_in_binary[i]; + std::string delivery_tag = ss.str(); + + receiver r = d.receiver(); + source s = r.source(); + std::string s_addr = s.address(); + + transfer tt(d); + std::string delivery_state = to_string(tt.state()); + + nostd::shared_ptr<trace::Span> span = + get_tracer()->StartSpan("amqp-message-received", + {{"Delivery_tag", delivery_tag}, + {"Source_address", s_addr}}, + options); + + trace::Scope scope = get_tracer()->WithActiveSpan(span); + + if (delivery_state == "unknown") + { + span->AddEvent("Delivery state is unknown"); + } + if (delivery_state == "Received") + { + span->AddEvent("Delivery is received"); + } + + tag_span[tag_in_binary] = span; +} + +void on_message_end_span(delivery &d) { + binary tag = d.tag(); + nostd::shared_ptr<trace::Span> span = tag_span[tag]; + + span->End(); +} + +void on_settled_span(delivery &d) { + binary tag = d.tag(); + nostd::shared_ptr<trace::Span> span = tag_span[tag]; + + span->End(); +} + +void send_span(message &message, binary &tag, tracker &track) { + + opentelemetry::trace::StartSpanOptions options; + options.kind = opentelemetry::trace::SpanKind::kProducer; + + binary tag_in_binary = tag; + std::string tag_in_string = std::string(tag); + std::stringstream ss; + for (int i = 0; i < (int)tag_in_string.length(); ++i) + ss << std::hex << (int)tag_in_binary[i]; + std::string delivery_tag = ss.str(); + + sender s = track.sender(); + target t = s.target(); + std::string t_addr = t.address(); + + transfer tt(track); + std::string delivery_state = to_string(tt.state()); + + nostd::shared_ptr<trace::Span> span = get_tracer()->StartSpan( + "amqp-delivery-send", + {{"Delivery_tag", delivery_tag}, {"Destination_address", t_addr}}, + options); + + trace::Scope scope = proton::get_tracer()->WithActiveSpan(span); Review comment: As discussed on IRC, it seems that in the opentelemetry-cpp case, the returned 'scope' being destroyed will reinstate the previously active span as the active span, so as that destruction seems to happen before send returns then it will essentially behave as I wanted in terms of not leaving the new span as the 'active'. Setting the span as active in this case seems largely redundant since we have no other related spans for it to impact and simply inject the context and send the messaage, but if thats required then so be it. It was the knock on effect on the 'active' I was concerned with. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
