This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b6c2743eff1 [fix][fn] Go functions must retrieve consumers by 
non-particioned topic ID (#20413)
b6c2743eff1 is described below

commit b6c2743eff1cfd51bcdae12b7dd2e6134ef72f7f
Author: Andy Walker <[email protected]>
AuthorDate: Tue May 30 23:03:36 2023 -0400

    [fix][fn] Go functions must retrieve consumers by non-particioned topic ID 
(#20413)
    
    Co-authored-by: Andy Walker <[email protected]>
    (cherry picked from commit fb1b46e5e993d77c583f715d2bea2eadbb052a81)
---
 pulsar-function-go/pf/instance.go | 18 ++++++++++++++++--
 pulsar-function-go/pf/util.go     | 11 +++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index c5b7500803d..3a00182585e 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -388,11 +388,25 @@ func (gi *goInstance) processResult(msgInput 
pulsar.Message, output []byte) {
 // ackInputMessage doesn't produce any result, or the user doesn't want the 
result.
 func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
        log.Debugf("ack input message topic name is: %s", inputMessage.Topic())
-       gi.consumers[inputMessage.Topic()].Ack(inputMessage)
+       gi.respondMessage(inputMessage, true)
 }
 
 func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
-       gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+       gi.respondMessage(inputMessage, false)
+}
+
+func (gi *goInstance) respondMessage(inputMessage pulsar.Message, ack bool) {
+       topicName, err := ParseTopicName(inputMessage.Topic())
+       if err != nil {
+               log.Errorf("unable respond to message ID %s - invalid topic: 
%v", messageIDStr(inputMessage), err)
+               return
+       }
+       // consumers are indexed by topic name only (no partition)
+       if ack {
+               gi.consumers[topicName.NameWithoutPartition()].Ack(inputMessage)
+               return
+       }
+       gi.consumers[topicName.NameWithoutPartition()].Nack(inputMessage)
 }
 
 func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
diff --git a/pulsar-function-go/pf/util.go b/pulsar-function-go/pf/util.go
index d5b32da8411..1d1aa1cab93 100644
--- a/pulsar-function-go/pf/util.go
+++ b/pulsar-function-go/pf/util.go
@@ -21,6 +21,8 @@ package pf
 
 import (
        "fmt"
+
+       "github.com/apache/pulsar-client-go/pulsar"
 )
 
 func getProperties(fullyQualifiedName string, instanceID int) 
map[string]string {
@@ -39,3 +41,12 @@ func getDefaultSubscriptionName(tenant, namespace, name 
string) string {
 func getFullyQualifiedInstanceID(tenant, namespace, name string, instanceID 
int) string {
        return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID)
 }
+
+func messageIDStr(msg pulsar.Message) string {
+       // <ledger ID>:<entry ID>:<partition index>:<batch index>
+       return fmt.Sprintf("%d:%d:%d:%d",
+               msg.ID().LedgerID(),
+               msg.ID().EntryID(),
+               msg.ID().PartitionIdx(),
+               msg.ID().BatchIdx())
+}

Reply via email to