This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d8e2743914d [improve][fn] Introduce NewOutputMessageWithError to enable error handling (#24122) d8e2743914d is described below commit d8e2743914d0c2f6685af253825279a558efe848 Author: Simon Beeli <simon.be...@gmx.ch> AuthorDate: Fri Mar 28 11:31:47 2025 +0100 [improve][fn] Introduce NewOutputMessageWithError to enable error handling (#24122) Co-authored-by: Simon Beeli <simon.be...@bsi-software.com> Co-authored-by: Zixuan Liu <node...@gmail.com> --- pulsar-function-go/pf/context.go | 20 +++++++++++----- pulsar-function-go/pf/context_test.go | 45 +++++++++++++++++++++++++++++++++++ pulsar-function-go/pf/instance.go | 9 +++++++ pulsar-function-go/pf/util_test.go | 4 ++-- 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go index 0b269a568ff..69959cb3c1a 100644 --- a/pulsar-function-go/pf/context.go +++ b/pulsar-function-go/pf/context.go @@ -34,12 +34,13 @@ import ( // message, what are our operating constraints, etc can be accessed by the // executing function type FunctionContext struct { - instanceConf *instanceConf - userConfigs map[string]interface{} - logAppender *LogAppender - outputMessage func(topic string) pulsar.Producer - userMetrics sync.Map - record pulsar.Message + instanceConf *instanceConf + userConfigs map[string]interface{} + logAppender *LogAppender + outputMessage func(topic string) pulsar.Producer + outputMessageWithError func(topic string) (pulsar.Producer, error) + userMetrics sync.Map + record pulsar.Message } // NewFuncContext returns a new Function context @@ -161,6 +162,13 @@ func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer { return c.outputMessage(topicName) } +// NewOutputMessageWithError send message to the topic and returns a potential error +// @param topicName: The name of the topic for output message +// @return A Pulsar producer for the given topic and an error, if any. +func (c *FunctionContext) NewOutputMessageWithError(topicName string) (pulsar.Producer, error) { + return c.outputMessageWithError(topicName) +} + // SetCurrentRecord sets the current message into the function context called // for each message before executing a handler function func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) { diff --git a/pulsar-function-go/pf/context_test.go b/pulsar-function-go/pf/context_test.go index 52a8abc3117..0c9f78775ca 100644 --- a/pulsar-function-go/pf/context_test.go +++ b/pulsar-function-go/pf/context_test.go @@ -21,6 +21,8 @@ package pf import ( "context" + "errors" + "fmt" "testing" "time" @@ -82,3 +84,46 @@ func TestFunctionContext_NewOutputMessage(t *testing.T) { actualProducer := fc.NewOutputMessage(publishTopic) assert.IsType(t, &MockPulsarProducer{}, actualProducer) } + +func TestFunctionContext_NewOutputMessageWithError(t *testing.T) { + testErr := errors.New("test error") + + testCases := []struct { + name string + outputFunc func(topic string) (pulsar.Producer, error) + expectedError error + expectedProducerType *MockPulsarProducer + }{ + + { + name: "Test producer", + outputFunc: func(topic string) (pulsar.Producer, error) { return &MockPulsarProducer{}, nil }, + expectedError: nil, + expectedProducerType: &MockPulsarProducer{}, + }, + { + name: "Test error", + outputFunc: func(topic string) (pulsar.Producer, error) { return nil, errors.New("test error") }, + expectedError: testErr, + expectedProducerType: nil, + }, + } + + for i, testCase := range testCases { + t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t *testing.T) { + + fc := NewFuncContext() + publishTopic := "publish-topic" + + fc.outputMessageWithError = testCase.outputFunc + + actualProducer, err := fc.NewOutputMessageWithError(publishTopic) + if testCase.expectedProducerType == nil { + assert.Nil(t, actualProducer) + } else { + assert.IsType(t, testCase.expectedProducerType, actualProducer) + } + assert.Equal(t, testCase.expectedError, err) + }) + } +} diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 1064aece46f..4c6294d22bd 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -77,6 +77,15 @@ func newGoInstance() *goInstance { return producer } + goInstance.context.outputMessageWithError = func(topic string) (pulsar.Producer, error) { + producer, err := goInstance.getProducer(topic) + if err != nil { + log.Errorf("getting producer failed, error is:%v", err) + return nil, err + } + return producer, nil + } + goInstance.lastHealthCheckTS = now.UnixNano() goInstance.properties = make(map[string]string) goInstance.stats = NewStatWithLabelValues(goInstance.getMetricsLabels()...) diff --git a/pulsar-function-go/pf/util_test.go b/pulsar-function-go/pf/util_test.go index fa0871b49ff..d6d3a2b54b8 100644 --- a/pulsar-function-go/pf/util_test.go +++ b/pulsar-function-go/pf/util_test.go @@ -44,8 +44,8 @@ func TestUtils(t *testing.T) { expectedFQFN := getDefaultSubscriptionName(tenant, namespace, name) assert.Equal(t, expectedFQFN, fqfn) - actualtMap := getProperties(fqfn, 100) - assert.Equal(t, propertiesMap, actualtMap) + actualMap := getProperties(fqfn, 100) + assert.Equal(t, propertiesMap, actualMap) expectedRes := getFullyQualifiedInstanceID(tenant, namespace, name, instanceID) assert.Equal(t, expectedRes, "pulsar/function/go:100")