This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8e60fc golang: some optimizations regarding transaction messages and 
pushconsumer (#1021)
dd8e60fc is described below

commit dd8e60fc2a83104a1c2ce158f2034d5e827963ae
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Mon Jun 23 14:33:48 2025 +0800

    golang: some optimizations regarding transaction messages and pushconsumer 
(#1021)
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 golang/consumer_service.go | 6 ++++++
 golang/producer.go         | 6 ------
 golang/push_consumer.go    | 1 +
 3 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 846bd04c..4bb42959 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -25,6 +25,7 @@ import (
 type ConsumeService interface {
        consume(ProcessQueue, []*MessageView)
        consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult, 
error))
+       Shutdown() error
 }
 
 type baseConsumeService struct {
@@ -55,6 +56,11 @@ func (bcs *baseConsumeService) 
consumeWithDuration(messageView *MessageView, dur
        time.AfterFunc(duration, func() { bcs.consumptionExecutor.Submit(task) 
})
 }
 
+func (bcs *baseConsumeService) Shutdown() error {
+       bcs.consumptionExecutor.Shutdown()
+       return nil
+}
+
 func (bcs *baseConsumeService) newConsumeTask(clientId string, messageListener 
MessageListener, messageView *MessageView, messageInterceptor 
MessageInterceptor, callback func(ConsumerResult, error)) func() {
        return func() {
                consumeResult := FAILURE
diff --git a/golang/producer.go b/golang/producer.go
index 58d4a226..b30dc5b7 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -235,12 +235,6 @@ func (p *defaultProducer) send1(ctx context.Context, topic 
string, messageType v
                                topic, messageIds, maxAttempts, attempt, 
endpoints, utils.GetRequestID(ctx))
                        return nil, err
                }
-               // No need more attempts for transactional message.
-               if messageType == v2.MessageType_TRANSACTION {
-                       p.cli.log.Errorf("failed to send transactional message 
finally, topic=%s, messageId(s)=%v,  maxAttempts=%d, attempt=%d, endpoints=%s, 
requestId=%s",
-                               topic, messageIds, maxAttempts, attempt, 
endpoints, utils.GetRequestID(ctx))
-                       return nil, err
-               }
                // Try to do more attempts.
                nextAttempt := attempt + 1
                // Retry immediately if the request is not throttled.
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 75a9c26e..f6a446a7 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -499,6 +499,7 @@ func (pc *defaultPushConsumer) dropProcessQueue(mqstr 
utils.MessageQueueStr) {
 }
 
 func (pc *defaultPushConsumer) GracefulStop() error {
+       pc.consumerService.Shutdown()
        return pc.cli.GracefulStop()
 }
 

Reply via email to