freeznet commented on a change in pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#discussion_r532323506
##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
assert.Equal(consumerName, consumer.Name())
}
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+ const MsgBatchCount = 100
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic :=
"persistent://public/default/test-key-based-batch-with-key-shared"
+
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Type: KeyShared,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ BatcherBuilderType: KeyBasedBatchBuilder,
+ BatchingMaxMessages: 10,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ keys := []string{"key1", "key2", "key3"}
+ for i := 0; i < MsgBatchCount; i++ {
+ for _, k := range keys {
+ producer.SendAsync(ctx, &ProducerMessage{
+ Key: k,
+ Payload: []byte(fmt.Sprintf("value-%d", i)),
+ }, func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ assert.Nil(t, err)
+ },
+ )
+ }
+ }
+
+ receivedConsumer1 := 0
+ receivedConsumer2 := 0
+ consumer1Keys := make(map[string]int)
+ consumer2Keys := make(map[string]int)
+ for (receivedConsumer1 + receivedConsumer2) < 300 {
+ select {
+ case cm, ok := <-consumer1.Chan():
+ if !ok {
+ break
+ }
+ receivedConsumer1++
+ if cnt, has := consumer1Keys[cm.Key()]; !has {
+ consumer1Keys[cm.Key()] = 1
+ } else {
+ consumer1Keys[cm.Key()] = cnt + 1
+ }
+ consumer1.Ack(cm.Message)
+ case cm, ok := <-consumer2.Chan():
+ if !ok {
+ break
+ }
+ receivedConsumer2++
+ if cnt, has := consumer2Keys[cm.Key()]; !has {
+ consumer2Keys[cm.Key()] = 1
+ } else {
+ consumer2Keys[cm.Key()] = cnt + 1
+ }
+ consumer2.Ack(cm.Message)
+ }
+ }
+
+ assert.NotEqual(t, 0, receivedConsumer1)
+ assert.NotEqual(t, 0, receivedConsumer2)
+ assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+ assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+ fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received
messages consumer1: %d consumser2: %d\n",
+ receivedConsumer1, receivedConsumer2)
+ assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+ fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received
messages keys consumer1: %v consumser2: %v\n",
+ consumer1Keys, consumer2Keys)
+}
Review comment:
@wolfstudy thanks for comment. The intention for this test is to check
if `KeyBasedBatchBuilder` works ok (batch message with key works, which
different from normal batch builder), the order of messages is test with
`TestOrderingOfKeyBasedBatchProducerConsumerKeyShared`. But I can make some
changes to add order test in `TestKeyBasedBatchProducerConsumerKeyShared` as
well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]