This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0de4792 Add open tracing to pulsar go clinet (#518)
0de4792 is described below
commit 0de47927ca02e5567074f557063ad128da4e3125
Author: Marais <[email protected]>
AuthorDate: Mon Jul 12 10:25:40 2021 +0300
Add open tracing to pulsar go clinet (#518)
Here is the PR to add open tracing to the pulsar go client directly.
This is replicating what the java tracer does:
https://github.com/streamnative/pulsar-tracing
The usage is stipulated in the readme.md file
Co-authored-by: Marais Kruger <[email protected]>
---
go.mod | 3 +-
go.sum | 2 +
.../internal/pulsartracing/consumer_interceptor.go | 58 ++++++++++
.../pulsartracing/consumer_interceptor_test.go | 89 +++++++++++++++
.../pulsartracing/message_carrier_adaptors.go | 84 ++++++++++++++
.../internal/pulsartracing/message_carrier_util.go | 89 +++++++++++++++
.../pulsartracing/message_carrier_util_test.go | 121 +++++++++++++++++++++
.../internal/pulsartracing/producer_interceptor.go | 57 ++++++++++
.../pulsartracing/producer_interceptor_test.go | 69 ++++++++++++
pulsar/internal/pulsartracing/readme.md | 40 +++++++
pulsar/internal/pulsartracing/span-enrichment.go | 50 +++++++++
11 files changed, 660 insertions(+), 2 deletions(-)
diff --git a/go.mod b/go.mod
index 6d53cc6..67a7bb7 100644
--- a/go.mod
+++ b/go.mod
@@ -16,8 +16,7 @@ require (
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
- github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
- github.com/modern-go/reflect2 v1.0.1 // indirect
+ github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
diff --git a/go.sum b/go.sum
index b2818ee..2ccd39e 100644
--- a/go.sum
+++ b/go.sum
@@ -125,6 +125,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
github.com/onsi/gomega v1.7.1/go.mod
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/opentracing/opentracing-go v1.2.0
h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
+github.com/opentracing/opentracing-go v1.2.0/go.mod
h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pierrec/lz4 v2.0.5+incompatible
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go
b/pulsar/internal/pulsartracing/consumer_interceptor.go
new file mode 100644
index 0000000..3969d45
--- /dev/null
+++ b/pulsar/internal/pulsartracing/consumer_interceptor.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+)
+
+const fromPrefix = "From__"
+
+type ConsumerInterceptor struct {
+}
+
+func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) {
+ buildAndInjectChildSpan(message).Finish()
+}
+
+func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID
pulsar.MessageID) {}
+
+func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer,
msgIDs []pulsar.MessageID) {
+}
+
+func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
+ tracer := opentracing.GlobalTracer()
+ parentContext := ExtractSpanContextFromConsumerMessage(message)
+
+ var span opentracing.Span
+
+ var startSpanOptions []opentracing.StartSpanOption
+ if parentContext != nil {
+ startSpanOptions =
[]opentracing.StartSpanOption{opentracing.FollowsFrom(parentContext)}
+ }
+
+ span =
tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(),
startSpanOptions...)
+
+ enrichConsumerSpan(&message, span)
+
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
span), message)
+
+ return span
+}
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
new file mode 100644
index 0000000..b15a926
--- /dev/null
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/mocktracer"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
+ tracer := mocktracer.New()
+
+ opentracing.SetGlobalTracer(tracer)
+
+ message := pulsar.ConsumerMessage{
+ Consumer: &mockConsumer{},
+ Message: &mockConsumerMessage{
+ properties: map[string]string{},
+ },
+ }
+
+ span := buildAndInjectChildSpan(message)
+ assert.NotNil(t, span)
+ assert.True(t, len(message.Properties()) > 0)
+}
+
+type mockConsumer struct {
+}
+
+func (c *mockConsumer) Subscription() string {
+ return ""
+}
+
+func (c *mockConsumer) Unsubscribe() error {
+ return nil
+}
+
+func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message,
err error) {
+ return nil, nil
+}
+
+func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
+ return nil
+}
+
+func (c *mockConsumer) Ack(msg pulsar.Message) {}
+
+func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}
+
+func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration)
{}
+
+func (c *mockConsumer) Nack(msg pulsar.Message) {}
+
+func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
+
+func (c *mockConsumer) Close() {}
+
+func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
+ return nil
+}
+
+func (c *mockConsumer) SeekByTime(time time.Time) error {
+ return nil
+}
+
+func (c *mockConsumer) Name() string {
+ return ""
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go
b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
new file mode 100644
index 0000000..3b6394b
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "errors"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+)
+
+// ProducerMessageExtractAdapter Implements TextMap Interface
+type ProducerMessageExtractAdapter struct {
+ message *pulsar.ProducerMessage
+}
+
+func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val
string) error) error {
+ for k, v := range (*a.message).Properties {
+ if err := handler(k, v); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (a *ProducerMessageExtractAdapter) Set(key, val string) {}
+
+// ProducerMessageInjectAdapter Implements TextMap Interface
+type ProducerMessageInjectAdapter struct {
+ message *pulsar.ProducerMessage
+}
+
+func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val
string) error) error {
+ return errors.New("iterator should never be used with Tracer.inject()")
+}
+
+func (a *ProducerMessageInjectAdapter) Set(key, val string) {
+ a.message.Properties[key] = val
+}
+
+// ConsumerMessageExtractAdapter Implements TextMap Interface
+type ConsumerMessageExtractAdapter struct {
+ message pulsar.ConsumerMessage
+}
+
+func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val
string) error) error {
+ for k, v := range a.message.Properties() {
+ if err := handler(k, v); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}
+
+// ConsumerMessageInjectAdapter Implements TextMap Interface
+type ConsumerMessageInjectAdapter struct {
+ message pulsar.ConsumerMessage
+}
+
+func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val
string) error) error {
+ return errors.New("iterator should never be used with tracer.inject()")
+}
+
+func (a *ConsumerMessageInjectAdapter) Set(key, val string) {
+ a.message.Properties()[key] = val
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util.go
b/pulsar/internal/pulsartracing/message_carrier_util.go
new file mode 100644
index 0000000..d1fd0dd
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_util.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+ log "github.com/sirupsen/logrus"
+)
+
+func InjectProducerMessageSpanContext(ctx context.Context, message
*pulsar.ProducerMessage) {
+ injectAdapter := &ProducerMessageInjectAdapter{message}
+
+ span := opentracing.SpanFromContext(ctx)
+
+ err := opentracing.GlobalTracer().Inject(span.Context(),
opentracing.TextMap, injectAdapter)
+
+ if err != nil {
+ log.Error("could not inject span context into pulsar message",
err)
+ }
+}
+
+func ExtractSpanContextFromProducerMessage(message *pulsar.ProducerMessage)
opentracing.SpanContext {
+ extractAdapter := &ProducerMessageExtractAdapter{message}
+
+ spanContext, err :=
opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)
+
+ if err != nil {
+ log.Error("could not extract span context from pulsar message",
err)
+ }
+
+ return spanContext
+}
+
+func ExtractSpanContextFromConsumerMessage(message pulsar.ConsumerMessage)
opentracing.SpanContext {
+ extractAdapter := &ConsumerMessageExtractAdapter{message}
+
+ spanContext, err :=
opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)
+
+ if err != nil {
+ log.Error("could not extract span context from pulsar message",
err)
+ }
+
+ return spanContext
+}
+
+func InjectConsumerMessageSpanContext(ctx context.Context, message
pulsar.ConsumerMessage) {
+ injectAdapter := &ConsumerMessageInjectAdapter{message}
+ span := opentracing.SpanFromContext(ctx)
+
+ if span == nil {
+ log.Warn("no span could be extracted from context, nothing will
be injected into the message properties")
+ return
+ }
+
+ err := opentracing.GlobalTracer().Inject(span.Context(),
opentracing.TextMap, injectAdapter)
+
+ if err != nil {
+ log.Error("could not inject span context into pulsar message",
err)
+ }
+}
+
+func CreateSpanFromMessage(cm *pulsar.ConsumerMessage, tracer
opentracing.Tracer, label string) opentracing.Span {
+ parentSpan := ExtractSpanContextFromConsumerMessage(*cm)
+ var span opentracing.Span
+ if parentSpan != nil {
+ span = tracer.StartSpan(label, opentracing.ChildOf(parentSpan))
+ } else {
+ span = tracer.StartSpan(label)
+ }
+ return span
+}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
new file mode 100644
index 0000000..9fe608d
--- /dev/null
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/mocktracer"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestProducerMessageInjectAndExtract(t *testing.T) {
+ message := &pulsar.ProducerMessage{
+ Properties: map[string]string{},
+ }
+
+ tracer := mocktracer.New()
+
+ opentracing.SetGlobalTracer(tracer)
+
+ span := tracer.StartSpan("test")
+
+
InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
span), message)
+ assert.True(t, len(message.Properties) > 0)
+ extractedSpanContext := ExtractSpanContextFromProducerMessage(message)
+ assert.Equal(t, span.Context(), extractedSpanContext)
+}
+
+func TestConsumerMessageInjectAndExtract(t *testing.T) {
+ message := pulsar.ConsumerMessage{
+ Message: &mockConsumerMessage{
+ properties: map[string]string{},
+ },
+ }
+
+ tracer := mocktracer.New()
+
+ opentracing.SetGlobalTracer(tracer)
+
+ span := tracer.StartSpan("test")
+
+
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
span), message)
+ assert.True(t, len(message.Properties()) > 0)
+ extractedSpanContext := ExtractSpanContextFromConsumerMessage(message)
+ assert.Equal(t, span.Context(), extractedSpanContext)
+}
+
+type mockConsumerMessage struct {
+ properties map[string]string
+}
+
+func (msg *mockConsumerMessage) Topic() string {
+ return ""
+}
+
+func (msg *mockConsumerMessage) Properties() map[string]string {
+ return msg.properties
+}
+
+func (msg *mockConsumerMessage) Payload() []byte {
+ return nil
+}
+
+func (msg *mockConsumerMessage) ID() pulsar.MessageID {
+ return nil
+}
+
+func (msg *mockConsumerMessage) PublishTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockConsumerMessage) EventTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockConsumerMessage) Key() string {
+ return ""
+}
+
+func (msg *mockConsumerMessage) OrderingKey() string {
+ return ""
+}
+
+func (msg *mockConsumerMessage) RedeliveryCount() uint32 {
+ return 0
+}
+
+func (msg *mockConsumerMessage) IsReplicated() bool {
+ return false
+}
+
+func (msg *mockConsumerMessage) GetReplicatedFrom() string {
+ return ""
+}
+
+func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error {
+ return nil
+}
+
+func (msg *mockConsumerMessage) ProducerName() string {
+ return ""
+}
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go
b/pulsar/internal/pulsartracing/producer_interceptor.go
new file mode 100644
index 0000000..b400e57
--- /dev/null
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+)
+
+const toPrefix = "To__"
+
+type ProducerInterceptor struct {
+}
+
+func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message
*pulsar.ProducerMessage) {
+ buildAndInjectSpan(message, producer).Finish()
+}
+
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+}
+
+func buildAndInjectSpan(message *pulsar.ProducerMessage, producer
pulsar.Producer) opentracing.Span {
+ tracer := opentracing.GlobalTracer()
+ spanContext := ExtractSpanContextFromProducerMessage(message)
+
+ var span opentracing.Span
+
+ var startSpanOptions []opentracing.StartSpanOption
+ if spanContext != nil {
+ startSpanOptions =
[]opentracing.StartSpanOption{opentracing.FollowsFrom(spanContext)}
+ }
+
+ span = tracer.StartSpan(toPrefix+producer.Topic(), startSpanOptions...)
+
+ enrichProducerSpan(message, producer, span)
+
+
InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(),
span), message)
+
+ return span
+}
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
new file mode 100644
index 0000000..b146e4e
--- /dev/null
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/mocktracer"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestProducerBuildAndInjectSpan(t *testing.T) {
+ tracer := mocktracer.New()
+ opentracing.SetGlobalTracer(tracer)
+
+ message := &pulsar.ProducerMessage{
+ Properties: map[string]string{},
+ }
+
+ span := buildAndInjectSpan(message, &mockProducer{})
+ assert.NotNil(t, span)
+ assert.True(t, len(message.Properties) > 0)
+}
+
+type mockProducer struct {
+}
+
+func (p *mockProducer) Topic() string {
+ return ""
+}
+
+func (p *mockProducer) Name() string {
+ return ""
+}
+
+func (p *mockProducer) Send(context.Context, *pulsar.ProducerMessage)
(pulsar.MessageID, error) {
+ return nil, nil
+}
+
+func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage,
func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
+}
+
+func (p *mockProducer) LastSequenceID() int64 {
+ return 0
+}
+
+func (p *mockProducer) Flush() error {
+ return nil
+}
+
+func (p *mockProducer) Close() {}
diff --git a/pulsar/internal/pulsartracing/readme.md
b/pulsar/internal/pulsartracing/readme.md
new file mode 100644
index 0000000..319587e
--- /dev/null
+++ b/pulsar/internal/pulsartracing/readme.md
@@ -0,0 +1,40 @@
+### Usage
+
+#### Interceptors based solution
+
+```go
+// create new tracer
+// register tracer with GlobalTracer
+opentracing.SetGlobalTracer(tracer)
+```
+
+**Producer**
+
+```go
+tracingInterceptor := &pulsartracing.ProducerInterceptor{}
+
+options := pulsar.ProducerOptions{
+Topic: topicName,
+Interceptors: pulsar.ProducerInterceptors{tracingInterceptor},
+}
+```
+
+**Consumer**
+```go
+tracingInterceptor := &pulsartracing.ConsumerInterceptor{}
+
+options := pulsar.ConsumerOptions{
+Topics: topicName,
+SubscriptionName: subscriptionName,
+Type: pulsar.Shared,
+Interceptors: pulsar.ConsumerInterceptors{tracingInterceptor},
+}
+
+
+// to create span with message as parent span
+span := pulsartracing.CreateSpanFromMessage(message, tracer, "child_span")
+```
+
+## License
+
+[Apache 2.0 License](./LICENSE).
\ No newline at end of file
diff --git a/pulsar/internal/pulsartracing/span-enrichment.go
b/pulsar/internal/pulsartracing/span-enrichment.go
new file mode 100644
index 0000000..e75c398
--- /dev/null
+++ b/pulsar/internal/pulsartracing/span-enrichment.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsartracing
+
+import (
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/opentracing/opentracing-go"
+)
+
+func enrichConsumerSpan(message *pulsar.ConsumerMessage, span
opentracing.Span) {
+ spanCommonTags(span)
+
+ for k, v := range message.Properties() {
+ span.SetTag(k, v)
+ }
+ span.SetTag("message_bus.destination", message.Topic())
+ span.SetTag("messageId", message.ID())
+ span.SetTag("subscription", message.Subscription())
+}
+
+func enrichProducerSpan(message *pulsar.ProducerMessage, producer
pulsar.Producer, span opentracing.Span) {
+ spanCommonTags(span)
+
+ for k, v := range message.Properties {
+ span.SetTag(k, v)
+ }
+ span.SetTag("span.kind", "producer")
+ span.SetTag("message_bus.destination", producer.Topic())
+ span.SetTag("sequenceId", producer.LastSequenceID())
+}
+
+func spanCommonTags(span opentracing.Span) {
+ span.SetTag("component", "pulsar-client-go")
+ span.SetTag("peer.service", "pulsar-broker")
+}