This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new aaea614 Make producer.Send() to also return MessageID (#141)
aaea614 is described below
commit aaea614f95bb0ffa1215f9706abd1cbe10647034
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Dec 23 14:45:22 2019 -0800
Make producer.Send() to also return MessageID (#141)
* Make producer.Send() to also return MessageID
* fixed interface
---
examples/producer/producer.go | 4 +++-
pulsar/consumer_regex_test.go | 2 +-
pulsar/consumer_test.go | 24 ++++++++++++------------
pulsar/producer.go | 2 +-
pulsar/producer_impl.go | 2 +-
pulsar/producer_partition.go | 8 +++++---
pulsar/producer_test.go | 15 ++++++++++-----
7 files changed, 33 insertions(+), 24 deletions(-)
diff --git a/examples/producer/producer.go b/examples/producer/producer.go
index 24ca58d..770aa83 100644
--- a/examples/producer/producer.go
+++ b/examples/producer/producer.go
@@ -48,10 +48,12 @@ func main() {
ctx := context.Background()
for i := 0; i < 10; i++ {
- if err := producer.Send(ctx, &pulsar.ProducerMessage{
+ if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
+ } else {
+ log.Println("Published message: ", msgId)
}
}
}
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 52a5576..97c0a55 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -363,7 +363,7 @@ func genMessages(p Producer, num int, msgFn func(idx int)
string) error {
m := &ProducerMessage{
Payload: []byte(msgFn(i)),
}
- if err := p.Send(ctx, m); err != nil {
+ if _, err := p.Send(ctx, m); err != nil {
return err
}
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 9bc2bd6..6e389bd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -63,7 +63,7 @@ func TestProducerConsumer(t *testing.T) {
// send 10 messages
for i := 0; i < 10; i++ {
- if err := producer.Send(ctx, &ProducerMessage{
+ if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
@@ -153,7 +153,7 @@ func TestBatchMessageReceive(t *testing.T) {
msg := &ProducerMessage{
Payload: []byte(messageContent),
}
- err := producer.Send(ctx, msg)
+ _, err := producer.Send(ctx, msg)
assert.Nil(t, err)
}
@@ -221,12 +221,12 @@ func TestConsumerSubscriptionEarliestPosition(t
*testing.T) {
// send message
ctx := context.Background()
- err = producer.Send(ctx, &ProducerMessage{
+ _, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-1"),
})
assert.Nil(t, err)
- err = producer.Send(ctx, &ProducerMessage{
+ _, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-2"),
})
assert.Nil(t, err)
@@ -281,7 +281,7 @@ func TestConsumerKeyShared(t *testing.T) {
ctx := context.Background()
for i := 0; i < 10; i++ {
- err := producer.Send(ctx, &ProducerMessage{
+ _, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%3),
Payload: []byte(fmt.Sprintf("value-%d", i)),
})
@@ -348,7 +348,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
ctx := context.Background()
for i := 0; i < 10; i++ {
- err := producer.Send(ctx, &ProducerMessage{
+ _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
@@ -435,7 +435,7 @@ func TestConsumerShared(t *testing.T) {
// send 10 messages with unique payloads
for i := 0; i < 10; i++ {
- if err := producer.Send(context.Background(), &ProducerMessage{
+ if _, err := producer.Send(context.Background(),
&ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
@@ -496,7 +496,7 @@ func TestConsumerEventTime(t *testing.T) {
defer consumer.Close()
et := timeFromUnixTimestampMillis(uint64(5))
- err = producer.Send(ctx, &ProducerMessage{
+ _, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
EventTime: &et,
})
@@ -533,7 +533,7 @@ func TestConsumerFlow(t *testing.T) {
assert.Nil(t, err)
for msgNum := 0; msgNum < 100; msgNum++ {
- if err := producer.Send(ctx, &ProducerMessage{
+ if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
}); err != nil {
t.Fatal(err)
@@ -574,7 +574,7 @@ func TestConsumerAck(t *testing.T) {
const N = 100
for i := 0; i < N; i++ {
- if err := producer.Send(ctx, &ProducerMessage{
+ if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
@@ -641,7 +641,7 @@ func TestConsumerNack(t *testing.T) {
const N = 100
for i := 0; i < N; i++ {
- if err := producer.Send(ctx, &ProducerMessage{
+ if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
@@ -701,7 +701,7 @@ func TestConsumerCompression(t *testing.T) {
const N = 100
for i := 0; i < N; i++ {
- if err := producer.Send(ctx, &ProducerMessage{
+ if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
}); err != nil {
t.Fatal(err)
diff --git a/pulsar/producer.go b/pulsar/producer.go
index cb024fa..6a68e7b 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -122,7 +122,7 @@ type Producer interface {
// This call will be blocking until is successfully acknowledged by the
Pulsar broker.
// Example:
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
- Send(context.Context, *ProducerMessage) error
+ Send(context.Context, *ProducerMessage) (MessageID, error)
// SendAsync a message in asynchronous mode
// The callback will report back the message being published and
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ef0119e..ce97c22 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -124,7 +124,7 @@ func (p *producer) NumPartitions() uint32 {
return uint32(len(p.producers))
}
-func (p *producer) Send(ctx context.Context, msg *ProducerMessage) error {
+func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID,
error) {
partition := p.messageRouter(msg, p)
return p.producers[partition].Send(ctx, msg)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c38766a..47dff85 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -336,25 +336,27 @@ func (p *partitionProducer) internalFlush(fr
*flushRequest) {
pi.Unlock()
}
-func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage)
error {
+func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage)
(MessageID, error) {
wg := sync.WaitGroup{}
wg.Add(1)
var err error
+ var msgId MessageID
p.internalSendAsync(ctx, msg, func(ID MessageID, message
*ProducerMessage, e error) {
err = e
+ msgId = ID
wg.Done()
}, true)
// When sending synchronously we flush immediately to avoid
// the increased latency and reduced throughput of batching
if err = p.Flush(); err != nil {
- return err
+ return nil, err
}
wg.Wait()
- return err
+ return msgId, err
}
func (p *partitionProducer) SendAsync(ctx context.Context, msg
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 1986004..189902c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -96,11 +96,12 @@ func TestSimpleProducer(t *testing.T) {
defer producer.Close()
for i := 0; i < 10; i++ {
- err = producer.Send(context.Background(), &ProducerMessage{
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
+ assert.NotNil(t, ID)
}
}
@@ -180,11 +181,12 @@ func TestProducerCompression(t *testing.T) {
defer producer.Close()
for i := 0; i < 10; i++ {
- err = producer.Send(context.Background(),
&ProducerMessage{
+ ID, err := producer.Send(context.Background(),
&ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
+ assert.NotNil(t, ID)
}
})
}
@@ -208,11 +210,12 @@ func TestProducerLastSequenceID(t *testing.T) {
assert.Equal(t, int64(-1), producer.LastSequenceID())
for i := 0; i < 10; i++ {
- err = producer.Send(context.Background(), &ProducerMessage{
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.NoError(t, err)
+ assert.NotNil(t, ID)
assert.Equal(t, int64(i), producer.LastSequenceID())
}
}
@@ -239,11 +242,12 @@ func TestEventTime(t *testing.T) {
defer consumer.Close()
eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
- err = producer.Send(context.Background(), &ProducerMessage{
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("test-event-time")),
EventTime: &eventTime,
})
assert.Nil(t, err)
+ assert.NotNil(t, ID)
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
@@ -457,10 +461,11 @@ func TestMessageRouter(t *testing.T) {
ctx := context.Background()
- err = producer.Send(ctx, &ProducerMessage{
+ ID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, err)
+ assert.NotNil(t, ID)
fmt.Println("PUBLISHED")