gemmellr commented on a change in pull request #319:
URL: https://github.com/apache/qpid-proton/pull/319#discussion_r685824036



##########
File path: .github/workflows/build.yml
##########
@@ -4,6 +4,7 @@ on: [push, pull_request]
 
 jobs:
   build:
+    if: github.repository == 'DreamPearl/qpid-proton'

Review comment:
       Probably worth starting to try and remove this and get the GitHub 
Actions build running on the PR now. This initial suggestion was when you were 
pushing commits often and doing remote debug sessions to get the build going.
   
   Now that you dont need the remote debug and arent pushing to the PR as often 
it would be good to get it running so we can see it is working. (EDIT: the 
TravisCI build is running, and failing, see later comment)

##########
File path: cpp/src/tracing.cpp
##########
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <opentelemetry/sdk/trace/simple_processor.h>
+#include <opentelemetry/sdk/trace/tracer_provider.h>
+#include <opentelemetry/trace/provider.h>
+#include <opentelemetry/trace/span.h>
+
+#include <proton/messaging_handler.hpp>
+
+// Using an exporter that simply dumps span data to stdout.
+#include <opentelemetry/exporters/ostream/span_exporter.h>
+#include <proton/annotation_key.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/source.hpp>
+#include <proton/target.hpp>
+#include <proton/tracing.hpp>
+#include <proton/tracker.hpp>
+#include <proton/transfer.hpp>
+
+#include "opentelemetry/nostd/unique_ptr.h"
+
+#include "opentelemetry/baggage/propagation/baggage_propagator.h"
+#include "opentelemetry/context/propagation/global_propagator.h"
+#include "opentelemetry/context/propagation/text_map_propagator.h"
+#include "opentelemetry/trace/propagation/http_trace_context.h"
+
+#include <iostream>
+#include <sstream>
+#include <memory>
+
+namespace proton
+{
+namespace trace = opentelemetry::trace;
+namespace nostd = opentelemetry::nostd;
+namespace common = opentelemetry::common;
+namespace context = opentelemetry::context;
+
+const std::string kContextKey = "CONTEXT";
+std::map<binary, nostd::shared_ptr<trace::Span>> tag_span;
+
+template <typename T>
+class AMQPTextMapCarrier
+    : public opentelemetry::context::propagation::TextMapCarrier {
+  public:
+    AMQPTextMapCarrier<T>(T *message_annotations)
+        : message_annotations_(message_annotations) {}
+    virtual nostd::string_view
+    Get(nostd::string_view key) const noexcept override {
+        std::string key_to_compare = key.data();
+
+        if (message_annotations_->exists(annotation_key(key_to_compare))) {
+            value extracted_value =
+                message_annotations_->get(annotation_key(key_to_compare));
+            std::string extracted_string = to_string(extracted_value);
+            extracted_strings.push_back(extracted_string);
+            nostd::string_view final_extracted_string =
+                nostd::string_view(extracted_strings.back());
+
+            return final_extracted_string;
+        }
+        else
+        {
+            std::cout << "Key does not exists" << std::endl;
+        }
+        return "";
+    }
+
+    virtual void Set(nostd::string_view key,
+                     nostd::string_view val) noexcept override {
+
+        message_annotations_->put(annotation_key(std::string(key)),
+                                  value(std::string(val)));
+    }
+
+    T *message_annotations_;
+    mutable std::vector<std::string> extracted_strings;
+};
+
+void initTracer(std::unique_ptr<sdktrace::SpanExporter> exporter)
+{
+  // auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
+  //     new opentelemetry::exporter::trace::OStreamSpanExporter);
+  auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
+      new sdktrace::SimpleSpanProcessor(std::move(exporter)));
+  auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
+      new sdktrace::TracerProvider(std::move(processor)));
+
+  // Set the global trace provider
+  opentelemetry::trace::Provider::SetTracerProvider(provider);
+
+  // set global propagator
+  opentelemetry::context::propagation::GlobalTextMapPropagator::
+      SetGlobalPropagator(
+          nostd::shared_ptr<
+              opentelemetry::context::propagation::TextMapPropagator>(
+              new opentelemetry::trace::propagation::HttpTraceContext()));
+}
+
+nostd::shared_ptr<trace::Tracer> get_tracer()
+{
+  auto provider = trace::Provider::GetTracerProvider();
+  nostd::shared_ptr<opentelemetry::v0::trace::Tracer> tracer =
+      provider->GetTracer("qpid-tracer");
+  return tracer;
+}
+
+void on_message_span(message &message, delivery &d) {
+
+    opentelemetry::trace::StartSpanOptions options;
+    options.kind          = opentelemetry::trace::SpanKind::kConsumer;
+
+    // extract context from AMQP message annotations
+    const AMQPTextMapCarrier<proton::message::annotation_map> carrier(
+        &message.message_annotations());
+
+    nostd::shared_ptr<
+        opentelemetry::v0::context::propagation::TextMapPropagator>
+        prop = opentelemetry::context::propagation::GlobalTextMapPropagator::
+            GetGlobalPropagator();
+    context::Context current_ctx =
+        opentelemetry::context::RuntimeContext::GetCurrent();
+
+    context::Context new_context = prop->Extract(carrier, current_ctx);
+    options.parent =
+        opentelemetry::trace::propagation::GetSpan(new_context)->GetContext();
+
+    binary tag_in_binary = d.tag();
+    std::string tag_in_string = std::string(d.tag());
+    std::stringstream ss;
+    for(int i=0; i<(int)tag_in_string.length(); ++i)
+        ss << std::hex << (int)tag_in_binary[i];
+    std::string delivery_tag = ss.str();
+
+    receiver r = d.receiver();
+    source s = r.source();
+    std::string s_addr = s.address();
+
+    transfer tt(d);
+    std::string delivery_state = to_string(tt.state());
+
+    nostd::shared_ptr<trace::Span> span =
+        get_tracer()->StartSpan("amqp-message-received",
+                                {{"Delivery_tag", delivery_tag},
+                                 {"Source_address", s_addr}},
+                                options);
+
+    trace::Scope scope = get_tracer()->WithActiveSpan(span);
+
+    if (delivery_state == "unknown")
+    {
+        span->AddEvent("Delivery state is unknown");
+    }
+    if (delivery_state == "Received")
+    {
+        span->AddEvent("Delivery is received");
+    }
+
+    tag_span[tag_in_binary] = span;
+}
+
+void on_message_end_span(delivery &d) {
+    binary tag = d.tag();
+    nostd::shared_ptr<trace::Span> span = tag_span[tag];
+
+    span->End();
+}
+
+void on_settled_span(delivery &d) {
+    binary tag = d.tag();
+    nostd::shared_ptr<trace::Span> span = tag_span[tag];
+
+    span->End();
+}
+
+void send_span(message &message, binary &tag, tracker &track) {
+
+    opentelemetry::trace::StartSpanOptions options;
+    options.kind = opentelemetry::trace::SpanKind::kProducer;
+
+    binary tag_in_binary = tag;
+    std::string tag_in_string = std::string(tag);
+    std::stringstream ss;
+    for (int i = 0; i < (int)tag_in_string.length(); ++i)
+        ss << std::hex << (int)tag_in_binary[i];
+    std::string delivery_tag = ss.str();
+
+    sender s = track.sender();
+    target t = s.target();
+    std::string t_addr = t.address();
+
+    transfer tt(track);
+    std::string delivery_state = to_string(tt.state());
+
+    nostd::shared_ptr<trace::Span> span = get_tracer()->StartSpan(
+        "amqp-delivery-send",
+        {{"Delivery_tag", delivery_tag}, {"Destination_address", t_addr}},
+        options);
+
+    trace::Scope scope = proton::get_tracer()->WithActiveSpan(span);

Review comment:
       I dont think the send span should be set as the 'active' span, as we 
dont want it to affect other things (e.g replacing an on_message active span), 
and/or the life of the scope then seems like it would be in play. We just want 
to create and retain a span and close it once the message is accepted/other 
later.

##########
File path: cpp/src/messaging_adapter.cpp
##########
@@ -48,6 +48,8 @@
 
 #include <assert.h>
 
+#include <proton/tracing.hpp>

Review comment:
       The TravisCI build did run, and it fails compilation at this point [1], 
as it isnt installing the opentracing dependency (which is fine; many people 
wont use it, so its good that we test that situation) but this code relies on 
it being present even though its optional in the build. It would be good to get 
that aspect working again. Since Travis CI is very slow to start due to limited 
resources, you should just use GitHub Actions on your fork for testing things 
out....you could create the same situation in the GHA build temporarily for 
testing, by just disabling the opentelemetry compile+install steps, e.g make 
them skip or remove them entirely.
   
   Some way of indirecting the actual usage or selective inclusion and usage, 
so that it isnt attempted when opentelemtry isnt present, will be required in 
order to handle this. In other instances I know there are sometimes separate 
'dummy' or 'stub' impls that are substituted in by the build when the 
dependencies needed to use a main impl arent found. For example see the 
src/connect_config_dummy.cpp file and related mechanisms in the build [2], used 
when the JsonCPP library isnt found.
   
   @astitcher should perhaps chip in here with his thoughts on this.
   
   EDIT: Ah, I later got as far as seeing there is already dummy file, you just 
havent got to wiring it up yet. Ok :)
   
   [1] 
https://app.travis-ci.com/github/apache/qpid-proton/jobs/529561541#L1062-L1066
   [2] 
https://github.com/apache/qpid-proton/blob/0.35.0/cpp/CMakeLists.txt#L28-L37

##########
File path: cpp/src/messaging_adapter.cpp
##########
@@ -128,7 +130,9 @@ void on_delivery(messaging_handler& handler, pn_event_t* 
event) {
                 if (lctx.auto_accept)
                     d.release();
             } else {
+                on_message_span(msg, d);
                 handler.on_message(d, msg);
+                on_message_end_span(d);

Review comment:
       Perhaps 'on_message_start_span' or such like, for better alignment with 
the 'on_message_end_span'

##########
File path: cpp/src/tracing.cpp
##########
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <opentelemetry/sdk/trace/simple_processor.h>
+#include <opentelemetry/sdk/trace/tracer_provider.h>
+#include <opentelemetry/trace/provider.h>
+#include <opentelemetry/trace/span.h>
+
+#include <proton/messaging_handler.hpp>
+
+// Using an exporter that simply dumps span data to stdout.
+#include <opentelemetry/exporters/ostream/span_exporter.h>
+#include <proton/annotation_key.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/source.hpp>
+#include <proton/target.hpp>
+#include <proton/tracing.hpp>
+#include <proton/tracker.hpp>
+#include <proton/transfer.hpp>
+
+#include "opentelemetry/nostd/unique_ptr.h"
+
+#include "opentelemetry/baggage/propagation/baggage_propagator.h"
+#include "opentelemetry/context/propagation/global_propagator.h"
+#include "opentelemetry/context/propagation/text_map_propagator.h"
+#include "opentelemetry/trace/propagation/http_trace_context.h"
+
+#include <iostream>
+#include <sstream>
+#include <memory>
+
+namespace proton
+{
+namespace trace = opentelemetry::trace;
+namespace nostd = opentelemetry::nostd;
+namespace common = opentelemetry::common;
+namespace context = opentelemetry::context;
+
+const std::string kContextKey = "CONTEXT";
+std::map<binary, nostd::shared_ptr<trace::Span>> tag_span;
+
+template <typename T>
+class AMQPTextMapCarrier
+    : public opentelemetry::context::propagation::TextMapCarrier {
+  public:
+    AMQPTextMapCarrier<T>(T *message_annotations)
+        : message_annotations_(message_annotations) {}
+    virtual nostd::string_view
+    Get(nostd::string_view key) const noexcept override {
+        std::string key_to_compare = key.data();
+
+        if (message_annotations_->exists(annotation_key(key_to_compare))) {
+            value extracted_value =
+                message_annotations_->get(annotation_key(key_to_compare));
+            std::string extracted_string = to_string(extracted_value);
+            extracted_strings.push_back(extracted_string);
+            nostd::string_view final_extracted_string =
+                nostd::string_view(extracted_strings.back());
+
+            return final_extracted_string;
+        }
+        else
+        {
+            std::cout << "Key does not exists" << std::endl;
+        }
+        return "";
+    }
+
+    virtual void Set(nostd::string_view key,
+                     nostd::string_view val) noexcept override {
+
+        message_annotations_->put(annotation_key(std::string(key)),
+                                  value(std::string(val)));
+    }
+
+    T *message_annotations_;
+    mutable std::vector<std::string> extracted_strings;
+};
+
+void initTracer(std::unique_ptr<sdktrace::SpanExporter> exporter)
+{
+  // auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
+  //     new opentelemetry::exporter::trace::OStreamSpanExporter);
+  auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
+      new sdktrace::SimpleSpanProcessor(std::move(exporter)));
+  auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
+      new sdktrace::TracerProvider(std::move(processor)));
+
+  // Set the global trace provider
+  opentelemetry::trace::Provider::SetTracerProvider(provider);
+
+  // set global propagator
+  opentelemetry::context::propagation::GlobalTextMapPropagator::
+      SetGlobalPropagator(
+          nostd::shared_ptr<
+              opentelemetry::context::propagation::TextMapPropagator>(
+              new opentelemetry::trace::propagation::HttpTraceContext()));
+}
+
+nostd::shared_ptr<trace::Tracer> get_tracer()
+{
+  auto provider = trace::Provider::GetTracerProvider();
+  nostd::shared_ptr<opentelemetry::v0::trace::Tracer> tracer =
+      provider->GetTracer("qpid-tracer");
+  return tracer;
+}
+
+void on_message_span(message &message, delivery &d) {
+
+    opentelemetry::trace::StartSpanOptions options;
+    options.kind          = opentelemetry::trace::SpanKind::kConsumer;
+
+    // extract context from AMQP message annotations
+    const AMQPTextMapCarrier<proton::message::annotation_map> carrier(
+        &message.message_annotations());
+
+    nostd::shared_ptr<
+        opentelemetry::v0::context::propagation::TextMapPropagator>
+        prop = opentelemetry::context::propagation::GlobalTextMapPropagator::
+            GetGlobalPropagator();
+    context::Context current_ctx =
+        opentelemetry::context::RuntimeContext::GetCurrent();
+
+    context::Context new_context = prop->Extract(carrier, current_ctx);
+    options.parent =
+        opentelemetry::trace::propagation::GetSpan(new_context)->GetContext();
+
+    binary tag_in_binary = d.tag();
+    std::string tag_in_string = std::string(d.tag());
+    std::stringstream ss;
+    for(int i=0; i<(int)tag_in_string.length(); ++i)
+        ss << std::hex << (int)tag_in_binary[i];
+    std::string delivery_tag = ss.str();
+
+    receiver r = d.receiver();
+    source s = r.source();
+    std::string s_addr = s.address();
+
+    transfer tt(d);
+    std::string delivery_state = to_string(tt.state());
+
+    nostd::shared_ptr<trace::Span> span =
+        get_tracer()->StartSpan("amqp-message-received",
+                                {{"Delivery_tag", delivery_tag},
+                                 {"Source_address", s_addr}},
+                                options);
+
+    trace::Scope scope = get_tracer()->WithActiveSpan(span);
+
+    if (delivery_state == "unknown")
+    {
+        span->AddEvent("Delivery state is unknown");
+    }
+    if (delivery_state == "Received")
+    {
+        span->AddEvent("Delivery is received");
+    }
+
+    tag_span[tag_in_binary] = span;
+}
+
+void on_message_end_span(delivery &d) {
+    binary tag = d.tag();
+    nostd::shared_ptr<trace::Span> span = tag_span[tag];
+
+    span->End();
+}
+
+void on_settled_span(delivery &d) {
+    binary tag = d.tag();
+    nostd::shared_ptr<trace::Span> span = tag_span[tag];
+
+    span->End();
+}
+
+void send_span(message &message, binary &tag, tracker &track) {
+
+    opentelemetry::trace::StartSpanOptions options;
+    options.kind = opentelemetry::trace::SpanKind::kProducer;
+
+    binary tag_in_binary = tag;
+    std::string tag_in_string = std::string(tag);
+    std::stringstream ss;
+    for (int i = 0; i < (int)tag_in_string.length(); ++i)
+        ss << std::hex << (int)tag_in_binary[i];
+    std::string delivery_tag = ss.str();
+
+    sender s = track.sender();
+    target t = s.target();
+    std::string t_addr = t.address();
+
+    transfer tt(track);
+    std::string delivery_state = to_string(tt.state());
+
+    nostd::shared_ptr<trace::Span> span = get_tracer()->StartSpan(
+        "amqp-delivery-send",
+        {{"Delivery_tag", delivery_tag}, {"Destination_address", t_addr}},
+        options);
+
+    trace::Scope scope = proton::get_tracer()->WithActiveSpan(span);
+
+    if (delivery_state == "unknown") {
+        span->AddEvent("Delivery state is unknown");
+    }

Review comment:
       The send state will essentially always be unknown at this point. It 
becomes known when the disposition arrives from peer (e.g broker) setting the 
applicable state (for the messages that are not sent pre-settled).
   
   Wherever the message 'tracker' is being updated (perhaps [1]) upon 
disposition is the point the delivery state will become known for send, and 
where the send span should be ended.
   
   For pre-settled sends that may be different, as they will never get updated 
state due to being settled in advance. Depending on handling of the events, 
their spans may even need completed within the send call itself.
   
   [1] 
https://github.com/apache/qpid-proton/blob/0.35.0/cpp/src/messaging_adapter.cpp#L157-L177

##########
File path: cpp/src/tracing.cpp
##########
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <opentelemetry/sdk/trace/simple_processor.h>
+#include <opentelemetry/sdk/trace/tracer_provider.h>
+#include <opentelemetry/trace/provider.h>
+#include <opentelemetry/trace/span.h>
+
+#include <proton/messaging_handler.hpp>
+
+// Using an exporter that simply dumps span data to stdout.
+#include <opentelemetry/exporters/ostream/span_exporter.h>
+#include <proton/annotation_key.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/source.hpp>
+#include <proton/target.hpp>
+#include <proton/tracing.hpp>
+#include <proton/tracker.hpp>
+#include <proton/transfer.hpp>
+
+#include "opentelemetry/nostd/unique_ptr.h"
+
+#include "opentelemetry/baggage/propagation/baggage_propagator.h"
+#include "opentelemetry/context/propagation/global_propagator.h"
+#include "opentelemetry/context/propagation/text_map_propagator.h"
+#include "opentelemetry/trace/propagation/http_trace_context.h"
+
+#include <iostream>
+#include <sstream>
+#include <memory>
+
+namespace proton
+{
+namespace trace = opentelemetry::trace;
+namespace nostd = opentelemetry::nostd;
+namespace common = opentelemetry::common;
+namespace context = opentelemetry::context;
+
+const std::string kContextKey = "CONTEXT";
+std::map<binary, nostd::shared_ptr<trace::Span>> tag_span;
+
+template <typename T>
+class AMQPTextMapCarrier
+    : public opentelemetry::context::propagation::TextMapCarrier {
+  public:
+    AMQPTextMapCarrier<T>(T *message_annotations)
+        : message_annotations_(message_annotations) {}
+    virtual nostd::string_view
+    Get(nostd::string_view key) const noexcept override {
+        std::string key_to_compare = key.data();
+
+        if (message_annotations_->exists(annotation_key(key_to_compare))) {
+            value extracted_value =
+                message_annotations_->get(annotation_key(key_to_compare));
+            std::string extracted_string = to_string(extracted_value);
+            extracted_strings.push_back(extracted_string);
+            nostd::string_view final_extracted_string =
+                nostd::string_view(extracted_strings.back());
+
+            return final_extracted_string;
+        }
+        else
+        {
+            std::cout << "Key does not exists" << std::endl;
+        }
+        return "";
+    }
+
+    virtual void Set(nostd::string_view key,
+                     nostd::string_view val) noexcept override {
+
+        message_annotations_->put(annotation_key(std::string(key)),
+                                  value(std::string(val)));
+    }
+
+    T *message_annotations_;
+    mutable std::vector<std::string> extracted_strings;
+};
+
+void initTracer(std::unique_ptr<sdktrace::SpanExporter> exporter)
+{
+  // auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
+  //     new opentelemetry::exporter::trace::OStreamSpanExporter);
+  auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
+      new sdktrace::SimpleSpanProcessor(std::move(exporter)));
+  auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
+      new sdktrace::TracerProvider(std::move(processor)));
+
+  // Set the global trace provider
+  opentelemetry::trace::Provider::SetTracerProvider(provider);
+
+  // set global propagator
+  opentelemetry::context::propagation::GlobalTextMapPropagator::
+      SetGlobalPropagator(
+          nostd::shared_ptr<
+              opentelemetry::context::propagation::TextMapPropagator>(
+              new opentelemetry::trace::propagation::HttpTraceContext()));
+}
+
+nostd::shared_ptr<trace::Tracer> get_tracer()
+{
+  auto provider = trace::Provider::GetTracerProvider();
+  nostd::shared_ptr<opentelemetry::v0::trace::Tracer> tracer =
+      provider->GetTracer("qpid-tracer");
+  return tracer;
+}
+
+void on_message_span(message &message, delivery &d) {
+
+    opentelemetry::trace::StartSpanOptions options;
+    options.kind          = opentelemetry::trace::SpanKind::kConsumer;
+
+    // extract context from AMQP message annotations
+    const AMQPTextMapCarrier<proton::message::annotation_map> carrier(
+        &message.message_annotations());
+
+    nostd::shared_ptr<
+        opentelemetry::v0::context::propagation::TextMapPropagator>
+        prop = opentelemetry::context::propagation::GlobalTextMapPropagator::
+            GetGlobalPropagator();
+    context::Context current_ctx =
+        opentelemetry::context::RuntimeContext::GetCurrent();
+
+    context::Context new_context = prop->Extract(carrier, current_ctx);
+    options.parent =
+        opentelemetry::trace::propagation::GetSpan(new_context)->GetContext();
+
+    binary tag_in_binary = d.tag();
+    std::string tag_in_string = std::string(d.tag());
+    std::stringstream ss;
+    for(int i=0; i<(int)tag_in_string.length(); ++i)
+        ss << std::hex << (int)tag_in_binary[i];
+    std::string delivery_tag = ss.str();
+
+    receiver r = d.receiver();
+    source s = r.source();
+    std::string s_addr = s.address();
+
+    transfer tt(d);
+    std::string delivery_state = to_string(tt.state());
+
+    nostd::shared_ptr<trace::Span> span =
+        get_tracer()->StartSpan("amqp-message-received",
+                                {{"Delivery_tag", delivery_tag},
+                                 {"Source_address", s_addr}},
+                                options);
+
+    trace::Scope scope = get_tracer()->WithActiveSpan(span);
+
+    if (delivery_state == "unknown")
+    {
+        span->AddEvent("Delivery state is unknown");
+    }
+    if (delivery_state == "Received")
+    {
+        span->AddEvent("Delivery is received");
+    }

Review comment:
       A delivery state is unlikely to ever be known at this point (creating 
the span, before firing on_message), the client or application will typically 
set it once they are given the message and finish processing it. That may be 
before on_message returns if they do it explicitly, or they may have the 
'auto-accept' enabled (it is by default) and it is done by the client just 
after on_message returns, or they may disable that and do it manually later 
after some long processing completes.
   
   For the JMS client the same selection of possibilities (and others) are 
true, and we cant easily access the state or span later anyway (and cant leave 
it open forever either, and need to create a new active span for the next 
message delivery). We also have cases there that the proton clients do not, 
where we specifically *dont* deliver the message to the application.
   
   So what I did was, when onMessage returns (or the message wasnt given to 
onMessage at all) and I am completing the delivery span, I simply logged a 
summary event relating to the onMessage invocation, not the literal delivery 
state as it may well not be known yet, adding a description of what happened 
with e.g 'event = DELIVERED' (or EXPIRED, REDELIVERIES_EXCEEDED...or 
APPLICATION_ERROR if they throw an exception from onMessage).
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to