BewareMyPower commented on code in PR #1502:
URL: https://github.com/apache/pulsar-client-go/pull/1502#discussion_r3332108851


##########
pulsar/blue_green_migration_test.go:
##########
@@ -159,14 +161,29 @@ func testTopicMigrate(
                        }
 
                        pm := ProducerMessage{Payload: 
[]byte(fmt.Sprintf("hello-%d", i))}
+                       retryStarted := time.Now()
 
                        for true {
                                ctx, cancel := 
context.WithTimeout(context.Background(), 1*time.Second)
-                               defer cancel()
                                _, err := producer.Send(ctx, &pm)
+                               cancel()
                                if err == nil {
                                        break
                                }
+                               if errors.Is(err, ErrTopicTerminated) || 
errors.Is(err, ErrProducerClosed) {
+                                       select {
+                                       case errCh <- fmt.Errorf("producer 
became terminal during migration at message %d: %w", i, err):
+                                       default:
+                                       }

Review Comment:
   Why did you use a select block rather than a simple `errCh <- 
fmt.Errorf(...)`?



##########
pulsar/blue_green_migration_test.go:
##########
@@ -159,14 +161,29 @@ func testTopicMigrate(
                        }
 
                        pm := ProducerMessage{Payload: 
[]byte(fmt.Sprintf("hello-%d", i))}
+                       retryStarted := time.Now()
 
                        for true {
                                ctx, cancel := 
context.WithTimeout(context.Background(), 1*time.Second)
-                               defer cancel()
                                _, err := producer.Send(ctx, &pm)
+                               cancel()
                                if err == nil {
                                        break
                                }
+                               if errors.Is(err, ErrTopicTerminated) || 
errors.Is(err, ErrProducerClosed) {
+                                       select {
+                                       case errCh <- fmt.Errorf("producer 
became terminal during migration at message %d: %w", i, err):
+                                       default:
+                                       }
+                                       return
+                               }
+                               if time.Since(retryStarted) > 30*time.Second {
+                                       select {
+                                       case errCh <- fmt.Errorf("producer send 
retry exceeded 30s at message %d: %w", i, err):
+                                       default:
+                                       }

Review Comment:
   The same question with my previous comment



##########
pulsar/blue_green_migration_test.go:
##########
@@ -185,16 +202,24 @@ func testTopicMigrate(
                                wgUnload.Wait()
                        }
 
+                       retryStarted := time.Now()
                        for true {
                                ctx, cancel := 
context.WithTimeout(context.Background(), 1*time.Second)
-                               defer cancel()
                                m, err := consumer.Receive(ctx)
+                               cancel()

Review Comment:
   The original `defer cancel()` call hits a typical error in golang: the 
deferred function won't be called until returned from a function, so if the 
loop has been iterated for many times, there could be many context objects not 
cancelled.
   
   A better approach should be declaring an anonymous function and keep `defer 
cancel()` not changed:
   
   ```go
   err := func() error {
       ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
       defer cancel()
       /* ... */
   }
   ```



##########
pulsar/blue_green_migration_test.go:
##########
@@ -185,16 +202,24 @@ func testTopicMigrate(
                                wgUnload.Wait()
                        }
 
+                       retryStarted := time.Now()
                        for true {
                                ctx, cancel := 
context.WithTimeout(context.Background(), 1*time.Second)
-                               defer cancel()
                                m, err := consumer.Receive(ctx)
+                               cancel()
                                if err == nil {
                                        err = consumer.Ack(m)
                                        if err == nil {
                                                break
                                        }
                                }
+                               if time.Since(retryStarted) > 30*time.Second {
+                                       select {
+                                       case errCh <- fmt.Errorf("consumer 
receive/ack retry exceeded 30s at message %d: %w", i, err):
+                                       default:
+                                       }

Review Comment:
   The same question



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to