This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6a08460 Fix partition key not set in batch builder (#175)
6a08460 is described below
commit 6a0846083957a5982d443aa7945c57802c06a3d5
Author: 冉小龙 <[email protected]>
AuthorDate: Fri Jan 17 04:48:50 2020 +0800
Fix partition key not set in batch builder (#175)
Signed-off-by: xiaolong.ran <[email protected]>
---
pulsar/consumer_test.go | 9 ++++++---
pulsar/internal/batch_builder.go | 3 ++-
pulsar/producer_partition.go | 2 +-
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6e389bd..52ca7c2 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -280,7 +280,7 @@ func TestConsumerKeyShared(t *testing.T) {
defer producer.Close()
ctx := context.Background()
- for i := 0; i < 10; i++ {
+ for i := 0; i < 100; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%3),
Payload: []byte(fmt.Sprintf("value-%d", i)),
@@ -290,7 +290,7 @@ func TestConsumerKeyShared(t *testing.T) {
receivedConsumer1 := 0
receivedConsumer2 := 0
- for (receivedConsumer1 + receivedConsumer2) < 10 {
+ for (receivedConsumer1 + receivedConsumer2) < 100 {
select {
case cm, ok := <-consumer1.Chan():
if !ok {
@@ -307,9 +307,12 @@ func TestConsumerKeyShared(t *testing.T) {
}
}
+ assert.NotEqual(t, 0, receivedConsumer1)
+ assert.NotEqual(t, 0, receivedConsumer2)
+
fmt.Printf("TestConsumerKeyShared received messages consumer1: %d
consumser2: %d\n",
receivedConsumer1, receivedConsumer2)
- assert.Equal(t, 10, receivedConsumer1+receivedConsumer2)
+ assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 9bbf8bd..d9990b1 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -104,7 +104,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
}
// Add will add single message to batch.
-func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload
[]byte,
+func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID
uint64, payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time) bool {
if replicateTo != nil && bb.numMessages != 0 {
// If the current batch is not empty and we're trying to set
the replication clusters,
@@ -125,6 +125,7 @@ func (bb *BatchBuilder) Add(metadata proto.Message,
sequenceID uint64, payload [
bb.msgMetadata.SequenceId = proto.Uint64(sequenceID)
bb.msgMetadata.ProducerName = &bb.producerName
bb.msgMetadata.ReplicateTo = replicateTo
+ bb.msgMetadata.PartitionKey = metadata.PartitionKey
if deliverAt.UnixNano() > 0 {
bb.msgMetadata.DeliverAtTime =
proto.Int64(int64(TimestampMillis(deliverAt)))
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index db6c5d9..a24ff66 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -254,7 +254,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
if msg.Key != "" {
- smm.PartitionKey = &msg.Key
+ smm.PartitionKey = proto.String(msg.Key)
}
if msg.Properties != nil {