cckellogg commented on a change in pull request #168:
[Issue-148][pulsar-client-go] add seek by messageID
URL: https://github.com/apache/pulsar-client-go/pull/168#discussion_r364349981
##########
File path: pulsar/consumer_test.go
##########
@@ -202,192 +157,160 @@ func TestConsumerWithInvalidConf(t *testing.T) {
}
func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
- client, err := NewClient(ClientOptions{
- URL: lookupURL,
- })
-
- assert.Nil(t, err)
+ client := createClient(t)
defer client.Close()
- topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix())
- subName := "test-subscription-initial-earliest-position"
+ topicName := newTopicName()
- // create producer
- producer, err := client.CreateProducer(ProducerOptions{
+ producer := createProducer(t, client, ProducerOptions{
Topic: topicName,
})
- assert.Nil(t, err)
defer producer.Close()
- // send message
- ctx := context.Background()
- _, err = producer.Send(ctx, &ProducerMessage{
- Payload: []byte("msg-1-content-1"),
- })
- assert.Nil(t, err)
+ times := 2
+ msgPayload := "hello"
- _, err = producer.Send(ctx, &ProducerMessage{
- Payload: []byte("msg-1-content-2"),
+ ctx := context.Background()
+ // send before create consumer to simulate SubscriptionPositionEarliest
+ send(t, producer, ctx, times, &ProducerMessage{
+ Payload: []byte(msgPayload),
})
- assert.Nil(t, err)
- // create consumer
- consumer, err := client.Subscribe(ConsumerOptions{
+ consumer := createConsumer(t, client, ConsumerOptions{
Topic: topicName,
- SubscriptionName: subName,
+ SubscriptionName:
"test-subscription-initial-earliest-position",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
- assert.Nil(t, err)
defer consumer.Close()
- msg, err := consumer.Receive(ctx)
- assert.Nil(t, err)
-
- assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
+ receive(t, consumer, ctx, times, func(t *testing.T, msg Message, i int)
{
+ expectMsg := fmt.Sprintf("%s-%d", msgPayload, i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ })
}
func TestConsumerKeyShared(t *testing.T) {
- client, err := NewClient(ClientOptions{
- URL: lookupURL,
- })
- assert.Nil(t, err)
+ client := createClient(t)
defer client.Close()
- topic := "persistent://public/default/test-topic-6"
+ topic := newTopicName()
- consumer1, err := client.Subscribe(ConsumerOptions{
+ consumer1 := createConsumer(t, client, ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
- assert.Nil(t, err)
defer consumer1.Close()
- consumer2, err := client.Subscribe(ConsumerOptions{
+ consumer2 := createConsumer(t, client, ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
- assert.Nil(t, err)
defer consumer2.Close()
- // create producer
- producer, err := client.CreateProducer(ProducerOptions{
+ producer := createProducer(t, client, ProducerOptions{
Topic: topic,
DisableBatching: true,
})
- assert.Nil(t, err)
defer producer.Close()
+ msgPayload := "hello"
+ msgKey := "pulsar"
+
ctx := context.Background()
- for i := 0; i < 10; i++ {
- _, err := producer.Send(ctx, &ProducerMessage{
- Key: fmt.Sprintf("key-shared-%d", i%3),
- Payload: []byte(fmt.Sprintf("value-%d", i)),
- })
- assert.Nil(t, err)
- }
+ send(t, producer, ctx, 3, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("%s-%s", msgPayload,
"A")),
+ Key: fmt.Sprintf("%s-%s", msgKey, "A"),
+ })
+ send(t, producer, ctx, 3, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("%s-%s", msgPayload,
"B")),
+ Key: fmt.Sprintf("%s-%s", msgKey, "B"),
+ })
- receivedConsumer1 := 0
- receivedConsumer2 := 0
- for (receivedConsumer1 + receivedConsumer2) < 10 {
- select {
- case cm, ok := <-consumer1.Chan():
- if !ok {
- break
- }
- receivedConsumer1++
- consumer1.Ack(cm.Message)
- case cm, ok := <-consumer2.Chan():
- if !ok {
- break
- }
- receivedConsumer2++
- consumer2.Ack(cm.Message)
+ previousMsg, previousKey := "", ""
+ receive(t, consumer1, ctx, 3, func(t *testing.T, msg Message, i int) {
Review comment:
This seems way more complex than the previous test. The previous test to me
is more easy to reason about.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services