This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0de4792  Add open tracing to pulsar go clinet (#518)
0de4792 is described below

commit 0de47927ca02e5567074f557063ad128da4e3125
Author: Marais <[email protected]>
AuthorDate: Mon Jul 12 10:25:40 2021 +0300

    Add open tracing to pulsar go clinet (#518)
    
    Here is the PR to add open tracing to the pulsar go client directly.
    This is replicating what the java tracer does:
    https://github.com/streamnative/pulsar-tracing
    The usage is stipulated in the readme.md file
    
    Co-authored-by: Marais Kruger <[email protected]>
---
 go.mod                                             |   3 +-
 go.sum                                             |   2 +
 .../internal/pulsartracing/consumer_interceptor.go |  58 ++++++++++
 .../pulsartracing/consumer_interceptor_test.go     |  89 +++++++++++++++
 .../pulsartracing/message_carrier_adaptors.go      |  84 ++++++++++++++
 .../internal/pulsartracing/message_carrier_util.go |  89 +++++++++++++++
 .../pulsartracing/message_carrier_util_test.go     | 121 +++++++++++++++++++++
 .../internal/pulsartracing/producer_interceptor.go |  57 ++++++++++
 .../pulsartracing/producer_interceptor_test.go     |  69 ++++++++++++
 pulsar/internal/pulsartracing/readme.md            |  40 +++++++
 pulsar/internal/pulsartracing/span-enrichment.go   |  50 +++++++++
 11 files changed, 660 insertions(+), 2 deletions(-)

diff --git a/go.mod b/go.mod
index 6d53cc6..67a7bb7 100644
--- a/go.mod
+++ b/go.mod
@@ -16,8 +16,7 @@ require (
        github.com/klauspost/compress v1.10.8
        github.com/kr/pretty v0.2.0 // indirect
        github.com/linkedin/goavro/v2 v2.9.8
-       github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
-       github.com/modern-go/reflect2 v1.0.1 // indirect
+       github.com/opentracing/opentracing-go v1.2.0
        github.com/pierrec/lz4 v2.0.5+incompatible
        github.com/pkg/errors v0.9.1
        github.com/prometheus/client_golang v1.7.1
diff --git a/go.sum b/go.sum
index b2818ee..2ccd39e 100644
--- a/go.sum
+++ b/go.sum
@@ -125,6 +125,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod 
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
 github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
 github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/opentracing/opentracing-go v1.2.0 
h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
+github.com/opentracing/opentracing-go v1.2.0/go.mod 
h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
 github.com/pierrec/lz4 v2.0.5+incompatible 
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go 
b/pulsar/internal/pulsartracing/consumer_interceptor.go
new file mode 100644
index 0000000..3969d45
--- /dev/null
+++ b/pulsar/internal/pulsartracing/consumer_interceptor.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+)
+
+const fromPrefix = "From__"
+
+type ConsumerInterceptor struct {
+}
+
+func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) {
+       buildAndInjectChildSpan(message).Finish()
+}
+
+func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID 
pulsar.MessageID) {}
+
+func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, 
msgIDs []pulsar.MessageID) {
+}
+
+func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
+       tracer := opentracing.GlobalTracer()
+       parentContext := ExtractSpanContextFromConsumerMessage(message)
+
+       var span opentracing.Span
+
+       var startSpanOptions []opentracing.StartSpanOption
+       if parentContext != nil {
+               startSpanOptions = 
[]opentracing.StartSpanOption{opentracing.FollowsFrom(parentContext)}
+       }
+
+       span = 
tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(), 
startSpanOptions...)
+
+       enrichConsumerSpan(&message, span)
+       
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
 span), message)
+
+       return span
+}
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go 
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
new file mode 100644
index 0000000..b15a926
--- /dev/null
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+       "github.com/opentracing/opentracing-go/mocktracer"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
+       tracer := mocktracer.New()
+
+       opentracing.SetGlobalTracer(tracer)
+
+       message := pulsar.ConsumerMessage{
+               Consumer: &mockConsumer{},
+               Message: &mockConsumerMessage{
+                       properties: map[string]string{},
+               },
+       }
+
+       span := buildAndInjectChildSpan(message)
+       assert.NotNil(t, span)
+       assert.True(t, len(message.Properties()) > 0)
+}
+
+type mockConsumer struct {
+}
+
+func (c *mockConsumer) Subscription() string {
+       return ""
+}
+
+func (c *mockConsumer) Unsubscribe() error {
+       return nil
+}
+
+func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, 
err error) {
+       return nil, nil
+}
+
+func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
+       return nil
+}
+
+func (c *mockConsumer) Ack(msg pulsar.Message) {}
+
+func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}
+
+func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) 
{}
+
+func (c *mockConsumer) Nack(msg pulsar.Message) {}
+
+func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
+
+func (c *mockConsumer) Close() {}
+
+func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
+       return nil
+}
+
+func (c *mockConsumer) SeekByTime(time time.Time) error {
+       return nil
+}
+
+func (c *mockConsumer) Name() string {
+       return ""
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go 
b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
new file mode 100644
index 0000000..3b6394b
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "errors"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+)
+
+// ProducerMessageExtractAdapter Implements TextMap Interface
+type ProducerMessageExtractAdapter struct {
+       message *pulsar.ProducerMessage
+}
+
+func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val 
string) error) error {
+       for k, v := range (*a.message).Properties {
+               if err := handler(k, v); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (a *ProducerMessageExtractAdapter) Set(key, val string) {}
+
+// ProducerMessageInjectAdapter Implements TextMap Interface
+type ProducerMessageInjectAdapter struct {
+       message *pulsar.ProducerMessage
+}
+
+func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val 
string) error) error {
+       return errors.New("iterator should never be used with Tracer.inject()")
+}
+
+func (a *ProducerMessageInjectAdapter) Set(key, val string) {
+       a.message.Properties[key] = val
+}
+
+// ConsumerMessageExtractAdapter Implements TextMap Interface
+type ConsumerMessageExtractAdapter struct {
+       message pulsar.ConsumerMessage
+}
+
+func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val 
string) error) error {
+       for k, v := range a.message.Properties() {
+               if err := handler(k, v); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}
+
+// ConsumerMessageInjectAdapter Implements TextMap Interface
+type ConsumerMessageInjectAdapter struct {
+       message pulsar.ConsumerMessage
+}
+
+func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val 
string) error) error {
+       return errors.New("iterator should never be used with tracer.inject()")
+}
+
+func (a *ConsumerMessageInjectAdapter) Set(key, val string) {
+       a.message.Properties()[key] = val
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util.go 
b/pulsar/internal/pulsartracing/message_carrier_util.go
new file mode 100644
index 0000000..d1fd0dd
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_util.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+       log "github.com/sirupsen/logrus"
+)
+
+func InjectProducerMessageSpanContext(ctx context.Context, message 
*pulsar.ProducerMessage) {
+       injectAdapter := &ProducerMessageInjectAdapter{message}
+
+       span := opentracing.SpanFromContext(ctx)
+
+       err := opentracing.GlobalTracer().Inject(span.Context(), 
opentracing.TextMap, injectAdapter)
+
+       if err != nil {
+               log.Error("could not inject span context into pulsar message", 
err)
+       }
+}
+
+func ExtractSpanContextFromProducerMessage(message *pulsar.ProducerMessage) 
opentracing.SpanContext {
+       extractAdapter := &ProducerMessageExtractAdapter{message}
+
+       spanContext, err := 
opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)
+
+       if err != nil {
+               log.Error("could not extract span context from pulsar message", 
err)
+       }
+
+       return spanContext
+}
+
+func ExtractSpanContextFromConsumerMessage(message pulsar.ConsumerMessage) 
opentracing.SpanContext {
+       extractAdapter := &ConsumerMessageExtractAdapter{message}
+
+       spanContext, err := 
opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)
+
+       if err != nil {
+               log.Error("could not extract span context from pulsar message", 
err)
+       }
+
+       return spanContext
+}
+
+func InjectConsumerMessageSpanContext(ctx context.Context, message 
pulsar.ConsumerMessage) {
+       injectAdapter := &ConsumerMessageInjectAdapter{message}
+       span := opentracing.SpanFromContext(ctx)
+
+       if span == nil {
+               log.Warn("no span could be extracted from context, nothing will 
be injected into the message properties")
+               return
+       }
+
+       err := opentracing.GlobalTracer().Inject(span.Context(), 
opentracing.TextMap, injectAdapter)
+
+       if err != nil {
+               log.Error("could not inject span context into pulsar message", 
err)
+       }
+}
+
+func CreateSpanFromMessage(cm *pulsar.ConsumerMessage, tracer 
opentracing.Tracer, label string) opentracing.Span {
+       parentSpan := ExtractSpanContextFromConsumerMessage(*cm)
+       var span opentracing.Span
+       if parentSpan != nil {
+               span = tracer.StartSpan(label, opentracing.ChildOf(parentSpan))
+       } else {
+               span = tracer.StartSpan(label)
+       }
+       return span
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
new file mode 100644
index 0000000..9fe608d
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+       "github.com/opentracing/opentracing-go/mocktracer"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestProducerMessageInjectAndExtract(t *testing.T) {
+       message := &pulsar.ProducerMessage{
+               Properties: map[string]string{},
+       }
+
+       tracer := mocktracer.New()
+
+       opentracing.SetGlobalTracer(tracer)
+
+       span := tracer.StartSpan("test")
+
+       
InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
 span), message)
+       assert.True(t, len(message.Properties) > 0)
+       extractedSpanContext := ExtractSpanContextFromProducerMessage(message)
+       assert.Equal(t, span.Context(), extractedSpanContext)
+}
+
+func TestConsumerMessageInjectAndExtract(t *testing.T) {
+       message := pulsar.ConsumerMessage{
+               Message: &mockConsumerMessage{
+                       properties: map[string]string{},
+               },
+       }
+
+       tracer := mocktracer.New()
+
+       opentracing.SetGlobalTracer(tracer)
+
+       span := tracer.StartSpan("test")
+
+       
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
 span), message)
+       assert.True(t, len(message.Properties()) > 0)
+       extractedSpanContext := ExtractSpanContextFromConsumerMessage(message)
+       assert.Equal(t, span.Context(), extractedSpanContext)
+}
+
+type mockConsumerMessage struct {
+       properties map[string]string
+}
+
+func (msg *mockConsumerMessage) Topic() string {
+       return ""
+}
+
+func (msg *mockConsumerMessage) Properties() map[string]string {
+       return msg.properties
+}
+
+func (msg *mockConsumerMessage) Payload() []byte {
+       return nil
+}
+
+func (msg *mockConsumerMessage) ID() pulsar.MessageID {
+       return nil
+}
+
+func (msg *mockConsumerMessage) PublishTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockConsumerMessage) EventTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockConsumerMessage) Key() string {
+       return ""
+}
+
+func (msg *mockConsumerMessage) OrderingKey() string {
+       return ""
+}
+
+func (msg *mockConsumerMessage) RedeliveryCount() uint32 {
+       return 0
+}
+
+func (msg *mockConsumerMessage) IsReplicated() bool {
+       return false
+}
+
+func (msg *mockConsumerMessage) GetReplicatedFrom() string {
+       return ""
+}
+
+func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error {
+       return nil
+}
+
+func (msg *mockConsumerMessage) ProducerName() string {
+       return ""
+}
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go 
b/pulsar/internal/pulsartracing/producer_interceptor.go
new file mode 100644
index 0000000..b400e57
--- /dev/null
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+)
+
+const toPrefix = "To__"
+
+type ProducerInterceptor struct {
+}
+
+func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message 
*pulsar.ProducerMessage) {
+       buildAndInjectSpan(message, producer).Finish()
+}
+
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, 
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+}
+
+func buildAndInjectSpan(message *pulsar.ProducerMessage, producer 
pulsar.Producer) opentracing.Span {
+       tracer := opentracing.GlobalTracer()
+       spanContext := ExtractSpanContextFromProducerMessage(message)
+
+       var span opentracing.Span
+
+       var startSpanOptions []opentracing.StartSpanOption
+       if spanContext != nil {
+               startSpanOptions = 
[]opentracing.StartSpanOption{opentracing.FollowsFrom(spanContext)}
+       }
+
+       span = tracer.StartSpan(toPrefix+producer.Topic(), startSpanOptions...)
+
+       enrichProducerSpan(message, producer, span)
+
+       
InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
 span), message)
+
+       return span
+}
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go 
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
new file mode 100644
index 0000000..b146e4e
--- /dev/null
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+       "github.com/opentracing/opentracing-go/mocktracer"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestProducerBuildAndInjectSpan(t *testing.T) {
+       tracer := mocktracer.New()
+       opentracing.SetGlobalTracer(tracer)
+
+       message := &pulsar.ProducerMessage{
+               Properties: map[string]string{},
+       }
+
+       span := buildAndInjectSpan(message, &mockProducer{})
+       assert.NotNil(t, span)
+       assert.True(t, len(message.Properties) > 0)
+}
+
+type mockProducer struct {
+}
+
+func (p *mockProducer) Topic() string {
+       return ""
+}
+
+func (p *mockProducer) Name() string {
+       return ""
+}
+
+func (p *mockProducer) Send(context.Context, *pulsar.ProducerMessage) 
(pulsar.MessageID, error) {
+       return nil, nil
+}
+
+func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage, 
func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
+}
+
+func (p *mockProducer) LastSequenceID() int64 {
+       return 0
+}
+
+func (p *mockProducer) Flush() error {
+       return nil
+}
+
+func (p *mockProducer) Close() {}
diff --git a/pulsar/internal/pulsartracing/readme.md 
b/pulsar/internal/pulsartracing/readme.md
new file mode 100644
index 0000000..319587e
--- /dev/null
+++ b/pulsar/internal/pulsartracing/readme.md
@@ -0,0 +1,40 @@
+### Usage
+
+#### Interceptors based solution
+
+```go
+// create new tracer
+// register tracer with GlobalTracer
+opentracing.SetGlobalTracer(tracer)
+```
+
+**Producer**
+
+```go
+tracingInterceptor := &pulsartracing.ProducerInterceptor{}
+
+options := pulsar.ProducerOptions{
+Topic:            topicName,
+Interceptors:     pulsar.ProducerInterceptors{tracingInterceptor},
+}
+```
+
+**Consumer**
+```go
+tracingInterceptor := &pulsartracing.ConsumerInterceptor{}
+
+options := pulsar.ConsumerOptions{
+Topics:           topicName,
+SubscriptionName: subscriptionName,
+Type:             pulsar.Shared,
+Interceptors:      pulsar.ConsumerInterceptors{tracingInterceptor},
+}
+
+
+// to create span with message as parent span
+span := pulsartracing.CreateSpanFromMessage(message, tracer, "child_span")
+```
+
+## License
+
+[Apache 2.0 License](./LICENSE).
\ No newline at end of file
diff --git a/pulsar/internal/pulsartracing/span-enrichment.go 
b/pulsar/internal/pulsartracing/span-enrichment.go
new file mode 100644
index 0000000..e75c398
--- /dev/null
+++ b/pulsar/internal/pulsartracing/span-enrichment.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/opentracing/opentracing-go"
+)
+
+func enrichConsumerSpan(message *pulsar.ConsumerMessage, span 
opentracing.Span) {
+       spanCommonTags(span)
+
+       for k, v := range message.Properties() {
+               span.SetTag(k, v)
+       }
+       span.SetTag("message_bus.destination", message.Topic())
+       span.SetTag("messageId", message.ID())
+       span.SetTag("subscription", message.Subscription())
+}
+
+func enrichProducerSpan(message *pulsar.ProducerMessage, producer 
pulsar.Producer, span opentracing.Span) {
+       spanCommonTags(span)
+
+       for k, v := range message.Properties {
+               span.SetTag(k, v)
+       }
+       span.SetTag("span.kind", "producer")
+       span.SetTag("message_bus.destination", producer.Topic())
+       span.SetTag("sequenceId", producer.LastSequenceID())
+}
+
+func spanCommonTags(span opentracing.Span) {
+       span.SetTag("component", "pulsar-client-go")
+       span.SetTag("peer.service", "pulsar-broker")
+}

Reply via email to