BewareMyPower commented on code in PR #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340#discussion_r1972862055


##########
pulsar/reader_test.go:
##########
@@ -1070,3 +1070,159 @@ func TestReaderNextReturnsOnClosedConsumer(t 
*testing.T) {
        assert.ErrorAs(t, err, &e)
        assert.Equal(t, ConsumerClosed, e.Result())
 }
+
+func testReaderSeekByIDWithHasNext(t *testing.T, startMessageID MessageID, 
startMessageIDInclusive bool) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 100 messages
+       var lastMsgID MessageID
+       for i := 0; i < 10; i++ {
+               lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               fmt.Println(lastMsgID.String())
+               assert.NoError(t, err)
+               assert.NotNil(t, lastMsgID)
+       }
+
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:                   topic,
+               StartMessageID:          startMessageID,
+               StartMessageIDInclusive: startMessageIDInclusive,
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // Seek to last message ID
+       err = reader.Seek(lastMsgID)
+       assert.NoError(t, err)
+
+       if startMessageIDInclusive {
+               assert.True(t, reader.HasNext())
+               ctx, cancel := context.WithTimeout(context.Background(), 
1*time.Second)
+               msg, err := reader.Next(ctx)
+               assert.NoError(t, err)
+               assert.NotNil(t, msg)
+               assert.True(t, messageIDCompare(lastMsgID, msg.ID()) == 0)
+               cancel()
+       } else {
+               assert.False(t, reader.HasNext())
+               ctx, cancel := context.WithTimeout(context.Background(), 
1*time.Second)
+               msg, err := reader.Next(ctx)
+               assert.Error(t, err)
+               assert.Nil(t, msg)
+               cancel()
+       }
+
+}
+
+func TestReaderWithSeekByID(t *testing.T) {
+       testReaderSeekByIDWithHasNext(t, EarliestMessageID(), false)
+       testReaderSeekByIDWithHasNext(t, EarliestMessageID(), true)
+       testReaderSeekByIDWithHasNext(t, LatestMessageID(), false)
+       testReaderSeekByIDWithHasNext(t, LatestMessageID(), true)
+}
+
+func testReaderSeekByTimeWithHasNext(t *testing.T, startMessageID MessageID) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // 1. send 10 messages
+       var lastMsgID MessageID
+       for i := 0; i < 10; i++ {
+               lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               fmt.Println(lastMsgID.String())
+               assert.NoError(t, err)
+
+               assert.NotNil(t, lastMsgID)
+       }
+
+       // 2. create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:                   topic,
+               StartMessageID:          startMessageID,
+               StartMessageIDInclusive: false,
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // 3. Seek time to now
+       reader.SeekByTime(time.Now())
+
+       // 4. Should not receive msg
+       {
+               assert.False(t, reader.HasNext())
+               timeoutCtx, cancel := context.WithTimeout(context.Background(), 
1*time.Second)
+               msg, err := reader.Next(timeoutCtx)
+               assert.Error(t, err)
+               assert.Nil(t, msg)
+               cancel()
+       }
+
+       // 5. send more 10 messages
+       for i := 0; i < 10; i++ {
+               lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello2-%d", i)),
+               })
+               fmt.Println(lastMsgID.String())
+               assert.NoError(t, err)
+               assert.NotNil(t, lastMsgID)
+       }
+
+       // 6. Assert these messages are received
+       for i := 0; i < 10; i++ {
+               assert.True(t, reader.HasNext())
+               msg, err := reader.Next(context.Background())
+               assert.NoError(t, err)
+               assert.Equal(t, fmt.Sprintf("hello2-%d", i), 
string(msg.Payload()))
+       }
+
+       // assert not more msg
+       {
+               assert.False(t, reader.HasNext())
+               timeoutCtx, cancel := context.WithTimeout(context.Background(), 
1*time.Second)
+               msg, err := reader.Next(timeoutCtx)
+               assert.Error(t, err)
+               assert.Nil(t, msg)
+               cancel()
+       }
+}
+func TestReaderWithSeekByTime(t *testing.T) {
+       testReaderSeekByTimeWithHasNext(t, EarliestMessageID())
+       testReaderSeekByTimeWithHasNext(t, EarliestMessageID())
+       testReaderSeekByTimeWithHasNext(t, LatestMessageID())
+       testReaderSeekByTimeWithHasNext(t, LatestMessageID())

Review Comment:
   I don't get why did you run twice for each call? BTW, for parameterized 
tests, you can call `T.Run` so that each single sub-test can have its own test 
name and fail independently. See example here 
https://github.com/apache/pulsar-client-go/blob/020b55c1dce4475fe8781f4fb6f858de903844b8/pulsar/consumer_test.go#L4760



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to