gemmellr commented on a change in pull request #319: URL: https://github.com/apache/qpid-proton/pull/319#discussion_r685875843
########## File path: cpp/src/tracing.cpp ########## @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <opentelemetry/sdk/trace/simple_processor.h> +#include <opentelemetry/sdk/trace/tracer_provider.h> +#include <opentelemetry/trace/provider.h> +#include <opentelemetry/trace/span.h> + +#include <proton/messaging_handler.hpp> + +// Using an exporter that simply dumps span data to stdout. +#include <opentelemetry/exporters/ostream/span_exporter.h> +#include <proton/annotation_key.hpp> +#include <proton/delivery.hpp> +#include <proton/message.hpp> +#include <proton/receiver.hpp> +#include <proton/sender.hpp> +#include <proton/source.hpp> +#include <proton/target.hpp> +#include <proton/tracing.hpp> +#include <proton/tracker.hpp> +#include <proton/transfer.hpp> + +#include "opentelemetry/nostd/unique_ptr.h" + +#include "opentelemetry/baggage/propagation/baggage_propagator.h" +#include "opentelemetry/context/propagation/global_propagator.h" +#include "opentelemetry/context/propagation/text_map_propagator.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" + +#include <iostream> +#include <sstream> +#include <memory> + +namespace proton +{ +namespace trace = opentelemetry::trace; +namespace nostd = opentelemetry::nostd; +namespace common = opentelemetry::common; +namespace context = opentelemetry::context; + +const std::string kContextKey = "CONTEXT"; +std::map<binary, nostd::shared_ptr<trace::Span>> tag_span; + +template <typename T> +class AMQPTextMapCarrier + : public opentelemetry::context::propagation::TextMapCarrier { + public: + AMQPTextMapCarrier<T>(T *message_annotations) + : message_annotations_(message_annotations) {} + virtual nostd::string_view + Get(nostd::string_view key) const noexcept override { + std::string key_to_compare = key.data(); + + if (message_annotations_->exists(annotation_key(key_to_compare))) { + value extracted_value = + message_annotations_->get(annotation_key(key_to_compare)); + std::string extracted_string = to_string(extracted_value); + extracted_strings.push_back(extracted_string); + nostd::string_view final_extracted_string = + nostd::string_view(extracted_strings.back()); + + return final_extracted_string; + } + else + { + std::cout << "Key does not exists" << std::endl; + } + return ""; + } + + virtual void Set(nostd::string_view key, + nostd::string_view val) noexcept override { + + message_annotations_->put(annotation_key(std::string(key)), + value(std::string(val))); + } + + T *message_annotations_; + mutable std::vector<std::string> extracted_strings; +}; + +void initTracer(std::unique_ptr<sdktrace::SpanExporter> exporter) +{ + // auto exporter = std::unique_ptr<sdktrace::SpanExporter>( + // new opentelemetry::exporter::trace::OStreamSpanExporter); + auto processor = std::unique_ptr<sdktrace::SpanProcessor>( + new sdktrace::SimpleSpanProcessor(std::move(exporter))); + auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>( + new sdktrace::TracerProvider(std::move(processor))); + + // Set the global trace provider + opentelemetry::trace::Provider::SetTracerProvider(provider); + + // set global propagator + opentelemetry::context::propagation::GlobalTextMapPropagator:: + SetGlobalPropagator( + nostd::shared_ptr< + opentelemetry::context::propagation::TextMapPropagator>( + new opentelemetry::trace::propagation::HttpTraceContext())); +} + +nostd::shared_ptr<trace::Tracer> get_tracer() +{ + auto provider = trace::Provider::GetTracerProvider(); + nostd::shared_ptr<opentelemetry::v0::trace::Tracer> tracer = + provider->GetTracer("qpid-tracer"); + return tracer; +} + +void on_message_span(message &message, delivery &d) { + + opentelemetry::trace::StartSpanOptions options; + options.kind = opentelemetry::trace::SpanKind::kConsumer; + + // extract context from AMQP message annotations + const AMQPTextMapCarrier<proton::message::annotation_map> carrier( + &message.message_annotations()); + + nostd::shared_ptr< + opentelemetry::v0::context::propagation::TextMapPropagator> + prop = opentelemetry::context::propagation::GlobalTextMapPropagator:: + GetGlobalPropagator(); + context::Context current_ctx = + opentelemetry::context::RuntimeContext::GetCurrent(); + + context::Context new_context = prop->Extract(carrier, current_ctx); + options.parent = + opentelemetry::trace::propagation::GetSpan(new_context)->GetContext(); + + binary tag_in_binary = d.tag(); + std::string tag_in_string = std::string(d.tag()); + std::stringstream ss; + for(int i=0; i<(int)tag_in_string.length(); ++i) + ss << std::hex << (int)tag_in_binary[i]; + std::string delivery_tag = ss.str(); + + receiver r = d.receiver(); + source s = r.source(); + std::string s_addr = s.address(); + + transfer tt(d); + std::string delivery_state = to_string(tt.state()); + + nostd::shared_ptr<trace::Span> span = + get_tracer()->StartSpan("amqp-message-received", + {{"Delivery_tag", delivery_tag}, + {"Source_address", s_addr}}, + options); + + trace::Scope scope = get_tracer()->WithActiveSpan(span); + + if (delivery_state == "unknown") + { + span->AddEvent("Delivery state is unknown"); + } + if (delivery_state == "Received") + { + span->AddEvent("Delivery is received"); + } Review comment: A delivery state is unlikely to ever be known at this point (creating the span, before firing on_message), the client or application will typically set it once they are given the message and finish processing it. That may be before on_message returns if they do it explicitly, or they may have the 'auto-accept' enabled (it is by default) and it is done by the client just after on_message returns, or they may disable that and do it manually later after some long processing completes. For the JMS client the same selection of possibilities (and others) are true, and we cant easily access the state or span later anyway (and cant leave it open forever either, and need to create a new active span for the next message delivery). We also have cases there that the proton clients do not, where we specifically *dont* deliver the message to the application. So what I did was, when onMessage returns (or the message wasnt given to onMessage at all) and I am completing the delivery span, I simply logged a summary event relating to the onMessage invocation, not the literal delivery state as it may well not be known yet, adding a description of what happened with e.g 'event = DELIVERED' (or for some of those special cases I mentioend..also EXPIRED, REDELIVERIES_EXCEEDED...or APPLICATION_ERROR if they throw an exception from onMessage). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
