wolfstudy commented on a change in pull request #6023:
URL: https://github.com/apache/pulsar/pull/6023#discussion_r428176058
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ fmt.Printf("send message %d\n", i)
+ assert.Nil(t, err)
+ }
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ for i := 0; i < 10; i++ {
+ msg, err := failoverConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
Review comment:
Please replace` fmt.Printf` with `t.Logf()`.
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
Review comment:
This topic name is the same as the topic name in another test case, in
order to avoid test conflicts, please replace it
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ fmt.Printf("send message %d\n", i)
+ assert.Nil(t, err)
+ }
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ for i := 0; i < 10; i++ {
+ msg, err := failoverConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
+ err = failoverConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ fmt.Printf("after %v\n", time.Now())
Review comment:
Please replace` fmt.Printf` with `t.Logf()`.
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ fmt.Printf("send message %d\n", i)
+ assert.Nil(t, err)
+ }
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ for i := 0; i < 10; i++ {
+ msg, err := failoverConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
+ err = failoverConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ fmt.Printf("after %v\n", time.Now())
+ assert.True(t, time.Since(begin) < delay)
+ }
+
+ for i := 0; i < 10; i++ {
+ msg, err := sharedConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
Review comment:
Please replace` fmt.Printf` with `t.Logf()`.
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ fmt.Printf("send message %d\n", i)
Review comment:
Can we replace `fmt.Printf` with `t.Logf()`?
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ fmt.Printf("send message %d\n", i)
+ assert.Nil(t, err)
+ }
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ for i := 0; i < 10; i++ {
+ msg, err := failoverConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
+ err = failoverConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ fmt.Printf("after %v\n", time.Now())
+ assert.True(t, time.Since(begin) < delay)
+ }
+
+ for i := 0; i < 10; i++ {
+ msg, err := sharedConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ fmt.Printf("message: %s\n",msg.Payload())
+ err = sharedConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ fmt.Printf("after %v\n", time.Now())
Review comment:
Please replace` fmt.Printf` with `t.Logf()`.
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second*5
+ begin := time.Now()
+ fmt.Printf("begin %v\n", begin)
Review comment:
Please replace` fmt.Printf` with `t.Logf()`.
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
Review comment:
```
func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}
```
##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
Review comment:
Can we rename the topic name?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]