wolfstudy commented on a change in pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#discussion_r532302829



##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
        assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+       const MsgBatchCount = 100
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := 
"persistent://public/default/test-key-based-batch-with-key-shared"
+
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     false,
+               BatcherBuilderType:  KeyBasedBatchBuilder,
+               BatchingMaxMessages: 10,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       keys := []string{"key1", "key2", "key3"}
+       for i := 0; i < MsgBatchCount; i++ {
+               for _, k := range keys {
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Key:     k,
+                               Payload: []byte(fmt.Sprintf("value-%d", i)),
+                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               assert.Nil(t, err)
+                       },
+                       )
+               }
+       }
+
+       receivedConsumer1 := 0
+       receivedConsumer2 := 0
+       consumer1Keys := make(map[string]int)
+       consumer2Keys := make(map[string]int)
+       for (receivedConsumer1 + receivedConsumer2) < 300 {
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer1++
+                       if cnt, has := consumer1Keys[cm.Key()]; !has {
+                               consumer1Keys[cm.Key()] = 1
+                       } else {
+                               consumer1Keys[cm.Key()] = cnt + 1
+                       }
+                       consumer1.Ack(cm.Message)
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer2++
+                       if cnt, has := consumer2Keys[cm.Key()]; !has {
+                               consumer2Keys[cm.Key()] = 1
+                       } else {
+                               consumer2Keys[cm.Key()] = cnt + 1
+                       }
+                       consumer2.Ack(cm.Message)
+               }
+       }
+
+       assert.NotEqual(t, 0, receivedConsumer1)
+       assert.NotEqual(t, 0, receivedConsumer2)
+       assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+       assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages consumer1: %d consumser2: %d\n",
+               receivedConsumer1, receivedConsumer2)
+       assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages keys consumer1: %v consumser2: %v\n",
+               consumer1Keys, consumer2Keys)
+}
+
+func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+       const MsgBatchCount = 10
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := 
"persistent://public/default/test-ordering-of-key-based-batch-with-key-shared"
+
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topic,
+               DisableBatching:         false,
+               BatcherBuilderType:      KeyBasedBatchBuilder,
+               BatchingMaxMessages:     30,
+               BatchingMaxPublishDelay: time.Second * 5,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       keys := []string{"key1", "key2", "key3"}
+       for i := 0; i < MsgBatchCount; i++ {
+               for _, k := range keys {
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Key:     k,
+                               Payload: []byte(fmt.Sprintf("value-%d", i)),
+                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               assert.Nil(t, err)
+                       },
+                       )
+               }
+       }
+
+       var receivedKey string
+       var receivedMessageIndex int
+       for i := 0; i < len(keys)*MsgBatchCount; i++ {
+               cm, ok := <-consumer1.Chan()
+               if !ok {
+                       break
+               }
+               if receivedKey != cm.Key() {
+                       receivedKey = cm.Key()
+                       receivedMessageIndex = 0
+               }
+               assert.Equal(
+                       t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+                       string(cm.Payload()),
+               )
+               consumer1.Ack(cm.Message)
+               receivedMessageIndex++
+       }
+
+       // TODO: add OrderingKey support

Review comment:
       Thanks @freeznet work for this, can you help to create an issue to track 
this issue?

##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
        assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+       const MsgBatchCount = 100
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := 
"persistent://public/default/test-key-based-batch-with-key-shared"
+
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topic,
+               DisableBatching:     false,
+               BatcherBuilderType:  KeyBasedBatchBuilder,
+               BatchingMaxMessages: 10,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       keys := []string{"key1", "key2", "key3"}
+       for i := 0; i < MsgBatchCount; i++ {
+               for _, k := range keys {
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Key:     k,
+                               Payload: []byte(fmt.Sprintf("value-%d", i)),
+                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               assert.Nil(t, err)
+                       },
+                       )
+               }
+       }
+
+       receivedConsumer1 := 0
+       receivedConsumer2 := 0
+       consumer1Keys := make(map[string]int)
+       consumer2Keys := make(map[string]int)
+       for (receivedConsumer1 + receivedConsumer2) < 300 {
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer1++
+                       if cnt, has := consumer1Keys[cm.Key()]; !has {
+                               consumer1Keys[cm.Key()] = 1
+                       } else {
+                               consumer1Keys[cm.Key()] = cnt + 1
+                       }
+                       consumer1.Ack(cm.Message)
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer2++
+                       if cnt, has := consumer2Keys[cm.Key()]; !has {
+                               consumer2Keys[cm.Key()] = 1
+                       } else {
+                               consumer2Keys[cm.Key()] = cnt + 1
+                       }
+                       consumer2.Ack(cm.Message)
+               }
+       }
+
+       assert.NotEqual(t, 0, receivedConsumer1)
+       assert.NotEqual(t, 0, receivedConsumer2)
+       assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+       assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages consumer1: %d consumser2: %d\n",
+               receivedConsumer1, receivedConsumer2)
+       assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages keys consumer1: %v consumser2: %v\n",
+               consumer1Keys, consumer2Keys)
+}

Review comment:
       In this test case, how do we determine that the order of messages 
received by the consumer is correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to