This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8e2743914d [improve][fn] Introduce NewOutputMessageWithError to 
enable error handling (#24122)
d8e2743914d is described below

commit d8e2743914d0c2f6685af253825279a558efe848
Author: Simon Beeli <simon.be...@gmx.ch>
AuthorDate: Fri Mar 28 11:31:47 2025 +0100

    [improve][fn] Introduce NewOutputMessageWithError to enable error handling 
(#24122)
    
    Co-authored-by: Simon Beeli <simon.be...@bsi-software.com>
    Co-authored-by: Zixuan Liu <node...@gmail.com>
---
 pulsar-function-go/pf/context.go      | 20 +++++++++++-----
 pulsar-function-go/pf/context_test.go | 45 +++++++++++++++++++++++++++++++++++
 pulsar-function-go/pf/instance.go     |  9 +++++++
 pulsar-function-go/pf/util_test.go    |  4 ++--
 4 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 0b269a568ff..69959cb3c1a 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -34,12 +34,13 @@ import (
 // message, what are our operating constraints, etc can be accessed by the
 // executing function
 type FunctionContext struct {
-       instanceConf  *instanceConf
-       userConfigs   map[string]interface{}
-       logAppender   *LogAppender
-       outputMessage func(topic string) pulsar.Producer
-       userMetrics   sync.Map
-       record        pulsar.Message
+       instanceConf           *instanceConf
+       userConfigs            map[string]interface{}
+       logAppender            *LogAppender
+       outputMessage          func(topic string) pulsar.Producer
+       outputMessageWithError func(topic string) (pulsar.Producer, error)
+       userMetrics            sync.Map
+       record                 pulsar.Message
 }
 
 // NewFuncContext returns a new Function context
@@ -161,6 +162,13 @@ func (c *FunctionContext) NewOutputMessage(topicName 
string) pulsar.Producer {
        return c.outputMessage(topicName)
 }
 
+// NewOutputMessageWithError send message to the topic and returns a potential 
error
+// @param topicName: The name of the topic for output message
+// @return A Pulsar producer for the given topic and an error, if any.
+func (c *FunctionContext) NewOutputMessageWithError(topicName string) 
(pulsar.Producer, error) {
+       return c.outputMessageWithError(topicName)
+}
+
 // SetCurrentRecord sets the current message into the function context called
 // for each message before executing a handler function
 func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
diff --git a/pulsar-function-go/pf/context_test.go 
b/pulsar-function-go/pf/context_test.go
index 52a8abc3117..0c9f78775ca 100644
--- a/pulsar-function-go/pf/context_test.go
+++ b/pulsar-function-go/pf/context_test.go
@@ -21,6 +21,8 @@ package pf
 
 import (
        "context"
+       "errors"
+       "fmt"
        "testing"
        "time"
 
@@ -82,3 +84,46 @@ func TestFunctionContext_NewOutputMessage(t *testing.T) {
        actualProducer := fc.NewOutputMessage(publishTopic)
        assert.IsType(t, &MockPulsarProducer{}, actualProducer)
 }
+
+func TestFunctionContext_NewOutputMessageWithError(t *testing.T) {
+       testErr := errors.New("test error")
+
+       testCases := []struct {
+               name                 string
+               outputFunc           func(topic string) (pulsar.Producer, error)
+               expectedError        error
+               expectedProducerType *MockPulsarProducer
+       }{
+
+               {
+                       name:                 "Test producer",
+                       outputFunc:           func(topic string) 
(pulsar.Producer, error) { return &MockPulsarProducer{}, nil },
+                       expectedError:        nil,
+                       expectedProducerType: &MockPulsarProducer{},
+               },
+               {
+                       name:                 "Test error",
+                       outputFunc:           func(topic string) 
(pulsar.Producer, error) { return nil, errors.New("test error") },
+                       expectedError:        testErr,
+                       expectedProducerType: nil,
+               },
+       }
+
+       for i, testCase := range testCases {
+               t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t 
*testing.T) {
+
+                       fc := NewFuncContext()
+                       publishTopic := "publish-topic"
+
+                       fc.outputMessageWithError = testCase.outputFunc
+
+                       actualProducer, err := 
fc.NewOutputMessageWithError(publishTopic)
+                       if testCase.expectedProducerType == nil {
+                               assert.Nil(t, actualProducer)
+                       } else {
+                               assert.IsType(t, testCase.expectedProducerType, 
actualProducer)
+                       }
+                       assert.Equal(t, testCase.expectedError, err)
+               })
+       }
+}
diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index 1064aece46f..4c6294d22bd 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -77,6 +77,15 @@ func newGoInstance() *goInstance {
                return producer
        }
 
+       goInstance.context.outputMessageWithError = func(topic string) 
(pulsar.Producer, error) {
+               producer, err := goInstance.getProducer(topic)
+               if err != nil {
+                       log.Errorf("getting producer failed, error is:%v", err)
+                       return nil, err
+               }
+               return producer, nil
+       }
+
        goInstance.lastHealthCheckTS = now.UnixNano()
        goInstance.properties = make(map[string]string)
        goInstance.stats = 
NewStatWithLabelValues(goInstance.getMetricsLabels()...)
diff --git a/pulsar-function-go/pf/util_test.go 
b/pulsar-function-go/pf/util_test.go
index fa0871b49ff..d6d3a2b54b8 100644
--- a/pulsar-function-go/pf/util_test.go
+++ b/pulsar-function-go/pf/util_test.go
@@ -44,8 +44,8 @@ func TestUtils(t *testing.T) {
        expectedFQFN := getDefaultSubscriptionName(tenant, namespace, name)
        assert.Equal(t, expectedFQFN, fqfn)
 
-       actualtMap := getProperties(fqfn, 100)
-       assert.Equal(t, propertiesMap, actualtMap)
+       actualMap := getProperties(fqfn, 100)
+       assert.Equal(t, propertiesMap, actualMap)
 
        expectedRes := getFullyQualifiedInstanceID(tenant, namespace, name, 
instanceID)
        assert.Equal(t, expectedRes, "pulsar/function/go:100")

Reply via email to