This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b6c2743eff1 [fix][fn] Go functions must retrieve consumers by
non-particioned topic ID (#20413)
b6c2743eff1 is described below
commit b6c2743eff1cfd51bcdae12b7dd2e6134ef72f7f
Author: Andy Walker <[email protected]>
AuthorDate: Tue May 30 23:03:36 2023 -0400
[fix][fn] Go functions must retrieve consumers by non-particioned topic ID
(#20413)
Co-authored-by: Andy Walker <[email protected]>
(cherry picked from commit fb1b46e5e993d77c583f715d2bea2eadbb052a81)
---
pulsar-function-go/pf/instance.go | 18 ++++++++++++++++--
pulsar-function-go/pf/util.go | 11 +++++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index c5b7500803d..3a00182585e 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -388,11 +388,25 @@ func (gi *goInstance) processResult(msgInput
pulsar.Message, output []byte) {
// ackInputMessage doesn't produce any result, or the user doesn't want the
result.
func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
log.Debugf("ack input message topic name is: %s", inputMessage.Topic())
- gi.consumers[inputMessage.Topic()].Ack(inputMessage)
+ gi.respondMessage(inputMessage, true)
}
func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
- gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+ gi.respondMessage(inputMessage, false)
+}
+
+func (gi *goInstance) respondMessage(inputMessage pulsar.Message, ack bool) {
+ topicName, err := ParseTopicName(inputMessage.Topic())
+ if err != nil {
+ log.Errorf("unable respond to message ID %s - invalid topic:
%v", messageIDStr(inputMessage), err)
+ return
+ }
+ // consumers are indexed by topic name only (no partition)
+ if ack {
+ gi.consumers[topicName.NameWithoutPartition()].Ack(inputMessage)
+ return
+ }
+ gi.consumers[topicName.NameWithoutPartition()].Nack(inputMessage)
}
func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
diff --git a/pulsar-function-go/pf/util.go b/pulsar-function-go/pf/util.go
index d5b32da8411..1d1aa1cab93 100644
--- a/pulsar-function-go/pf/util.go
+++ b/pulsar-function-go/pf/util.go
@@ -21,6 +21,8 @@ package pf
import (
"fmt"
+
+ "github.com/apache/pulsar-client-go/pulsar"
)
func getProperties(fullyQualifiedName string, instanceID int)
map[string]string {
@@ -39,3 +41,12 @@ func getDefaultSubscriptionName(tenant, namespace, name
string) string {
func getFullyQualifiedInstanceID(tenant, namespace, name string, instanceID
int) string {
return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID)
}
+
+func messageIDStr(msg pulsar.Message) string {
+ // <ledger ID>:<entry ID>:<partition index>:<batch index>
+ return fmt.Sprintf("%d:%d:%d:%d",
+ msg.ID().LedgerID(),
+ msg.ID().EntryID(),
+ msg.ID().PartitionIdx(),
+ msg.ID().BatchIdx())
+}