This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new aaea614  Make producer.Send() to also return MessageID (#141)
aaea614 is described below

commit aaea614f95bb0ffa1215f9706abd1cbe10647034
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Dec 23 14:45:22 2019 -0800

    Make producer.Send() to also return MessageID (#141)
    
    * Make producer.Send() to also return MessageID
    
    * fixed interface
---
 examples/producer/producer.go |  4 +++-
 pulsar/consumer_regex_test.go |  2 +-
 pulsar/consumer_test.go       | 24 ++++++++++++------------
 pulsar/producer.go            |  2 +-
 pulsar/producer_impl.go       |  2 +-
 pulsar/producer_partition.go  |  8 +++++---
 pulsar/producer_test.go       | 15 ++++++++++-----
 7 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/examples/producer/producer.go b/examples/producer/producer.go
index 24ca58d..770aa83 100644
--- a/examples/producer/producer.go
+++ b/examples/producer/producer.go
@@ -48,10 +48,12 @@ func main() {
        ctx := context.Background()
 
        for i := 0; i < 10; i++ {
-               if err := producer.Send(ctx, &pulsar.ProducerMessage{
+               if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                }); err != nil {
                        log.Fatal(err)
+               } else {
+                       log.Println("Published message: ", msgId)
                }
        }
 }
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 52a5576..97c0a55 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -363,7 +363,7 @@ func genMessages(p Producer, num int, msgFn func(idx int) 
string) error {
                m := &ProducerMessage{
                        Payload: []byte(msgFn(i)),
                }
-               if err := p.Send(ctx, m); err != nil {
+               if _, err := p.Send(ctx, m); err != nil {
                        return err
                }
        }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 9bc2bd6..6e389bd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -63,7 +63,7 @@ func TestProducerConsumer(t *testing.T) {
 
        // send 10 messages
        for i := 0; i < 10; i++ {
-               if err := producer.Send(ctx, &ProducerMessage{
+               if _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                        Key:     "pulsar",
                        Properties: map[string]string{
@@ -153,7 +153,7 @@ func TestBatchMessageReceive(t *testing.T) {
                msg := &ProducerMessage{
                        Payload: []byte(messageContent),
                }
-               err := producer.Send(ctx, msg)
+               _, err := producer.Send(ctx, msg)
                assert.Nil(t, err)
        }
 
@@ -221,12 +221,12 @@ func TestConsumerSubscriptionEarliestPosition(t 
*testing.T) {
 
        // send message
        ctx := context.Background()
-       err = producer.Send(ctx, &ProducerMessage{
+       _, err = producer.Send(ctx, &ProducerMessage{
                Payload: []byte("msg-1-content-1"),
        })
        assert.Nil(t, err)
 
-       err = producer.Send(ctx, &ProducerMessage{
+       _, err = producer.Send(ctx, &ProducerMessage{
                Payload: []byte("msg-1-content-2"),
        })
        assert.Nil(t, err)
@@ -281,7 +281,7 @@ func TestConsumerKeyShared(t *testing.T) {
 
        ctx := context.Background()
        for i := 0; i < 10; i++ {
-               err := producer.Send(ctx, &ProducerMessage{
+               _, err := producer.Send(ctx, &ProducerMessage{
                        Key:     fmt.Sprintf("key-shared-%d", i%3),
                        Payload: []byte(fmt.Sprintf("value-%d", i)),
                })
@@ -348,7 +348,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 
        ctx := context.Background()
        for i := 0; i < 10; i++ {
-               err := producer.Send(ctx, &ProducerMessage{
+               _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                assert.Nil(t, err)
@@ -435,7 +435,7 @@ func TestConsumerShared(t *testing.T) {
 
        // send 10 messages with unique payloads
        for i := 0; i < 10; i++ {
-               if err := producer.Send(context.Background(), &ProducerMessage{
+               if _, err := producer.Send(context.Background(), 
&ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                }); err != nil {
                        log.Fatal(err)
@@ -496,7 +496,7 @@ func TestConsumerEventTime(t *testing.T) {
        defer consumer.Close()
 
        et := timeFromUnixTimestampMillis(uint64(5))
-       err = producer.Send(ctx, &ProducerMessage{
+       _, err = producer.Send(ctx, &ProducerMessage{
                Payload:   []byte("test"),
                EventTime: &et,
        })
@@ -533,7 +533,7 @@ func TestConsumerFlow(t *testing.T) {
        assert.Nil(t, err)
 
        for msgNum := 0; msgNum < 100; msgNum++ {
-               if err := producer.Send(ctx, &ProducerMessage{
+               if _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
                }); err != nil {
                        t.Fatal(err)
@@ -574,7 +574,7 @@ func TestConsumerAck(t *testing.T) {
        const N = 100
 
        for i := 0; i < N; i++ {
-               if err := producer.Send(ctx, &ProducerMessage{
+               if _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
                }); err != nil {
                        t.Fatal(err)
@@ -641,7 +641,7 @@ func TestConsumerNack(t *testing.T) {
        const N = 100
 
        for i := 0; i < N; i++ {
-               if err := producer.Send(ctx, &ProducerMessage{
+               if _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
                }); err != nil {
                        t.Fatal(err)
@@ -701,7 +701,7 @@ func TestConsumerCompression(t *testing.T) {
        const N = 100
 
        for i := 0; i < N; i++ {
-               if err := producer.Send(ctx, &ProducerMessage{
+               if _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
                }); err != nil {
                        t.Fatal(err)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index cb024fa..6a68e7b 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -122,7 +122,7 @@ type Producer interface {
        // This call will be blocking until is successfully acknowledged by the 
Pulsar broker.
        // Example:
        // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
-       Send(context.Context, *ProducerMessage) error
+       Send(context.Context, *ProducerMessage) (MessageID, error)
 
        // SendAsync a message in asynchronous mode
        // The callback will report back the message being published and
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ef0119e..ce97c22 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -124,7 +124,7 @@ func (p *producer) NumPartitions() uint32 {
        return uint32(len(p.producers))
 }
 
-func (p *producer) Send(ctx context.Context, msg *ProducerMessage) error {
+func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, 
error) {
        partition := p.messageRouter(msg, p)
        return p.producers[partition].Send(ctx, msg)
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c38766a..47dff85 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -336,25 +336,27 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
        pi.Unlock()
 }
 
-func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
error {
+func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
(MessageID, error) {
        wg := sync.WaitGroup{}
        wg.Add(1)
 
        var err error
+       var msgId MessageID
 
        p.internalSendAsync(ctx, msg, func(ID MessageID, message 
*ProducerMessage, e error) {
                err = e
+               msgId = ID
                wg.Done()
        }, true)
 
        // When sending synchronously we flush immediately to avoid
        // the increased latency and reduced throughput of batching
        if err = p.Flush(); err != nil {
-               return err
+               return nil, err
        }
 
        wg.Wait()
-       return err
+       return msgId, err
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 1986004..189902c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -96,11 +96,12 @@ func TestSimpleProducer(t *testing.T) {
        defer producer.Close()
 
        for i := 0; i < 10; i++ {
-               err = producer.Send(context.Background(), &ProducerMessage{
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
                        Payload: []byte("hello"),
                })
 
                assert.NoError(t, err)
+               assert.NotNil(t, ID)
        }
 }
 
@@ -180,11 +181,12 @@ func TestProducerCompression(t *testing.T) {
                        defer producer.Close()
 
                        for i := 0; i < 10; i++ {
-                               err = producer.Send(context.Background(), 
&ProducerMessage{
+                               ID, err := producer.Send(context.Background(), 
&ProducerMessage{
                                        Payload: []byte("hello"),
                                })
 
                                assert.NoError(t, err)
+                               assert.NotNil(t, ID)
                        }
                })
        }
@@ -208,11 +210,12 @@ func TestProducerLastSequenceID(t *testing.T) {
        assert.Equal(t, int64(-1), producer.LastSequenceID())
 
        for i := 0; i < 10; i++ {
-               err = producer.Send(context.Background(), &ProducerMessage{
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
                        Payload: []byte("hello"),
                })
 
                assert.NoError(t, err)
+               assert.NotNil(t, ID)
                assert.Equal(t, int64(i), producer.LastSequenceID())
        }
 }
@@ -239,11 +242,12 @@ func TestEventTime(t *testing.T) {
        defer consumer.Close()
 
        eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
-       err = producer.Send(context.Background(), &ProducerMessage{
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
                Payload:   []byte(fmt.Sprintf("test-event-time")),
                EventTime: &eventTime,
        })
        assert.Nil(t, err)
+       assert.NotNil(t, ID)
 
        msg, err := consumer.Receive(context.Background())
        assert.Nil(t, err)
@@ -457,10 +461,11 @@ func TestMessageRouter(t *testing.T) {
 
        ctx := context.Background()
 
-       err = producer.Send(ctx, &ProducerMessage{
+       ID, err := producer.Send(ctx, &ProducerMessage{
                Payload: []byte("hello"),
        })
        assert.Nil(t, err)
+       assert.NotNil(t, ID)
 
        fmt.Println("PUBLISHED")
 

Reply via email to