This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a08460  Fix partition key not set in batch builder (#175)
6a08460 is described below

commit 6a0846083957a5982d443aa7945c57802c06a3d5
Author: 冉小龙 <[email protected]>
AuthorDate: Fri Jan 17 04:48:50 2020 +0800

    Fix partition key not set in batch builder (#175)
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 pulsar/consumer_test.go          | 9 ++++++---
 pulsar/internal/batch_builder.go | 3 ++-
 pulsar/producer_partition.go     | 2 +-
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6e389bd..52ca7c2 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -280,7 +280,7 @@ func TestConsumerKeyShared(t *testing.T) {
        defer producer.Close()
 
        ctx := context.Background()
-       for i := 0; i < 10; i++ {
+       for i := 0; i < 100; i++ {
                _, err := producer.Send(ctx, &ProducerMessage{
                        Key:     fmt.Sprintf("key-shared-%d", i%3),
                        Payload: []byte(fmt.Sprintf("value-%d", i)),
@@ -290,7 +290,7 @@ func TestConsumerKeyShared(t *testing.T) {
 
        receivedConsumer1 := 0
        receivedConsumer2 := 0
-       for (receivedConsumer1 + receivedConsumer2) < 10 {
+       for (receivedConsumer1 + receivedConsumer2) < 100 {
                select {
                case cm, ok := <-consumer1.Chan():
                        if !ok {
@@ -307,9 +307,12 @@ func TestConsumerKeyShared(t *testing.T) {
                }
        }
 
+       assert.NotEqual(t, 0, receivedConsumer1)
+       assert.NotEqual(t, 0, receivedConsumer2)
+
        fmt.Printf("TestConsumerKeyShared received messages consumer1: %d 
consumser2: %d\n",
                receivedConsumer1, receivedConsumer2)
-       assert.Equal(t, 10, receivedConsumer1+receivedConsumer2)
+       assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
 
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 9bbf8bd..d9990b1 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -104,7 +104,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 }
 
 // Add will add single message to batch.
-func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload 
[]byte,
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID 
uint64, payload []byte,
        callback interface{}, replicateTo []string, deliverAt time.Time) bool {
        if replicateTo != nil && bb.numMessages != 0 {
                // If the current batch is not empty and we're trying to set 
the replication clusters,
@@ -125,6 +125,7 @@ func (bb *BatchBuilder) Add(metadata proto.Message, 
sequenceID uint64, payload [
                bb.msgMetadata.SequenceId = proto.Uint64(sequenceID)
                bb.msgMetadata.ProducerName = &bb.producerName
                bb.msgMetadata.ReplicateTo = replicateTo
+               bb.msgMetadata.PartitionKey = metadata.PartitionKey
 
                if deliverAt.UnixNano() > 0 {
                        bb.msgMetadata.DeliverAtTime = 
proto.Int64(int64(TimestampMillis(deliverAt)))
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index db6c5d9..a24ff66 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -254,7 +254,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 
        if msg.Key != "" {
-               smm.PartitionKey = &msg.Key
+               smm.PartitionKey = proto.String(msg.Key)
        }
 
        if msg.Properties != nil {

Reply via email to