This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 887853e  add epoch to handle create producer timeout (#582)
887853e is described below

commit 887853e5c8f734bbfefce77f4444bed8877c86d0
Author: Rui Fu <[email protected]>
AuthorDate: Thu Aug 12 03:48:33 2021 +0800

    add epoch to handle create producer timeout (#582)
    
    * add epoch to producer
    
    * fix CI
    
    * address comments
    
    * address comments
    
    * update style
    
    * better logging
---
 pulsar/producer_partition.go | 24 +++++++++++++++++-------
 pulsar/producer_test.go      | 36 ++++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index abec4fc..ca6850d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -79,6 +79,8 @@ type partitionProducer struct {
        schemaInfo       *SchemaInfo
        partitionIdx     int32
        metrics          *internal.TopicMetrics
+
+       epoch uint64
 }
 
 func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int,
@@ -114,6 +116,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                lastSequenceID:   -1,
                partitionIdx:     int32(partitionIdx),
                metrics:          metrics,
+               epoch:            0,
        }
        p.setProducerState(producerInit)
 
@@ -176,12 +179,16 @@ func (p *partitionProducer) grabCnx() error {
                p.log.Debug("The partition consumer schema is nil")
        }
 
+       userProvidedProducerName := p.producerName != ""
+
        cmdProducer := &pb.CommandProducer{
-               RequestId:  proto.Uint64(id),
-               Topic:      proto.String(p.topic),
-               Encrypted:  nil,
-               ProducerId: proto.Uint64(p.producerID),
-               Schema:     pbSchema,
+               RequestId:                proto.Uint64(id),
+               Topic:                    proto.String(p.topic),
+               Encrypted:                nil,
+               ProducerId:               proto.Uint64(p.producerID),
+               Schema:                   pbSchema,
+               Epoch:                    
proto.Uint64(atomic.LoadUint64(&p.epoch)),
+               UserProvidedProducerName: proto.Bool(userProvidedProducerName),
        }
 
        if p.producerName != "" {
@@ -230,7 +237,10 @@ func (p *partitionProducer) grabCnx() error {
        }
        p.cnx = res.Cnx
        p.cnx.RegisterListener(p.producerID, p)
-       p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
+       p.log.WithFields(log.Fields{
+               "cnx":   res.Cnx.ID(),
+               "epoch": atomic.LoadUint64(&p.epoch),
+       }).Debug("Connected producer")
 
        pendingItems := p.pendingQueue.ReadableSlice()
        viewSize := len(pendingItems)
@@ -298,7 +308,7 @@ func (p *partitionProducer) reconnectToBroker() {
                d := backoff.Next()
                p.log.Info("Reconnecting to broker in ", d)
                time.Sleep(d)
-
+               atomic.AddUint64(&p.epoch, 1)
                err := p.grabCnx()
                if err == nil {
                        // Successfully reconnected
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bbe8028..dc7a5ef 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1127,3 +1127,39 @@ func TestProducerSendAfterClose(t *testing.T) {
        assert.Nil(t, ID)
        assert.Error(t, err)
 }
+
+func TestExactlyOnceWithProducerNameSpecified(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+               Name:  "p-name-1",
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+               Name:  "p-name-2",
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, producer2)
+       defer producer2.Close()
+
+       producer3, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+               Name:  "p-name-2",
+       })
+
+       assert.NotNil(t, err)
+       assert.Nil(t, producer3)
+}

Reply via email to