Copilot commented on code in PR #1456:
URL: https://github.com/apache/pulsar-client-go/pull/1456#discussion_r2689351938


##########
pulsar/consumer_regex.go:
##########
@@ -454,6 +476,11 @@ func (c *regexConsumer) topics() ([]string, error) {
        }
 
        filtered := filterTopics(topics, c.pattern)
+
+       if c.options.RetryEnable {

Review Comment:
   Consider adding a nil check for c.options.DLQ before accessing 
c.options.DLQ.RetryLetterTopic for defensive programming. While the current 
code path through newConsumer ensures DLQ is initialized when RetryEnable is 
true, adding this check would make the code more robust and easier to maintain.
   ```suggestion
        if c.options.RetryEnable && c.options.DLQ != nil {
   ```



##########
pulsar/consumer_regex.go:
##########
@@ -186,12 +186,34 @@ func (c *regexConsumer) Ack(msg Message) error {
        return c.AckID(msg.ID())
 }
 
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+       c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
 }
 
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ 
map[string]string, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support 
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, 
customProperties map[string]string,
+       delay time.Duration) {
+       names, err := validateTopicNames(msg.Topic())
+       if err != nil {
+               c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), 
err)
+               return
+       }
+       if len(names) != 1 {
+               c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), 
names)
+               return
+       }
+
+       tn := names[0]
+       fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+       consumer, ok := c.consumers[fqdnTopic]
+       if !ok {
+               // check to see if the topic with the partition part is in the 
consumers
+               // this can happen when the consumer is configured to consume 
from a specific partition
+               if consumer, ok = c.consumers[tn.Name]; !ok {
+                       c.log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
+                       return
+               }
+       }
+       consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, 
delay)

Review Comment:
   There's a potential race condition here. The consumers map is being accessed 
without holding consumersLock, while the discover() method (which runs in a 
separate goroutine) modifies this map with the lock held. This could lead to 
concurrent map read/write panics. The map access should be protected with 
consumersLock to ensure thread safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to