This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5991a1d685e4057b9c41c3973afbb7c568f188eb Author: Dream95 <[email protected]> AuthorDate: Mon May 25 20:28:29 2026 +0800 [fix][fn] Fix Go function runtime to continue after user exceptions and add neg-ack tests (#25867) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit a5c1029668e182b05579d223683d5e6ea82f84fe) --- pulsar-function-go/pf/instance.go | 41 +++++++++++++++------- pulsar-function-go/pf/instance_test.go | 32 +++++++++++++++++ .../docker-images/latest-version-image/Dockerfile | 1 + .../go-examples/exceptionFunc/exceptionFunc.go | 41 ++++++++++++++++++++++ .../integration/functions/PulsarFunctionsTest.java | 4 +++ .../functions/PulsarFunctionsTestBase.java | 1 + .../functions/go/PulsarFunctionsGoTest.java | 5 +++ 7 files changed, 112 insertions(+), 13 deletions(-) diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index af8a4e0157b..2cdfc8a6e94 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -164,7 +164,6 @@ CLOSE: case cm := <-channel: msgInput := cm.Message atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE - atLeastOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE autoAck := gi.context.instanceConf.funcDetails.AutoAck //nolint:staticcheck if autoAck && atMostOnce { gi.ackInputMessage(msgInput) @@ -177,12 +176,8 @@ CLOSE: output, err := gi.handlerMsg(msgInput) if err != nil { - log.Errorf("handler message error:%v", err) - if autoAck && atLeastOnce { - gi.nackInputMessage(msgInput) - } - gi.stats.incrTotalUserExceptions(err) - return err + gi.handleUserError(msgInput, err) + continue } gi.stats.processTimeEnd() @@ -391,6 +386,29 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) { return channel, nil } +func (gi *goInstance) shouldNackInputOnFailure() bool { + guarantee := gi.context.instanceConf.funcDetails.ProcessingGuarantees + return guarantee == pb.ProcessingGuarantees_ATLEAST_ONCE || + guarantee == pb.ProcessingGuarantees_MANUAL +} + +func (gi *goInstance) handleUserError(msgInput pulsar.Message, err error) { + log.Errorf("handler message error:%v", err) + if gi.shouldNackInputOnFailure() { + gi.nackInputMessage(msgInput) + } + gi.stats.incrTotalUserExceptions(err) + gi.stats.processTimeEnd() +} + +func (gi *goInstance) handlePublishError(msgInput pulsar.Message, err error) { + if gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE { + gi.nackInputMessage(msgInput) + } + gi.stats.incrTotalSysExceptions(err) + log.Errorf("failed to publish output message: %v", err) +} + func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -420,11 +438,8 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) { // semantics, ensure we nack so someone else can get it, in case we are the only handler. Then mark // exception and fail out. if err != nil { - if autoAck && atLeastOnce { - gi.nackInputMessage(msgInput) - } - gi.stats.incrTotalSysExceptions(err) - log.Fatal(err) + gi.handlePublishError(msgInput, err) + return } // Otherwise the message succeeded. If the SDK is entrusted with responding and we are using // atLeastOnce delivery semantics, ack the message. @@ -437,7 +452,7 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) { return } - // No output from the function or no output topic. Ack if we need to and mark the success before rturning. + // No output from the function or no output topic. Ack if we need to and mark the success before returning. if autoAck && atLeastOnce { gi.ackInputMessage(msgInput) } diff --git a/pulsar-function-go/pf/instance_test.go b/pulsar-function-go/pf/instance_test.go index bf45ae3a891..447d3a22f8d 100644 --- a/pulsar-function-go/pf/instance_test.go +++ b/pulsar-function-go/pf/instance_test.go @@ -27,6 +27,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + pb "github.com/apache/pulsar/pulsar-function-go/pb" ) func testProcessSpawnerHealthCheckTimer( @@ -115,3 +117,33 @@ func Test_goInstance_handlerMsg(t *testing.T) { assert.Equal(t, "output", string(output)) assert.Equal(t, message, fc.record) } + +func newTestGoInstance(guarantee pb.ProcessingGuarantees) *goInstance { + return &goInstance{ + context: &FunctionContext{ + instanceConf: &instanceConf{ + funcDetails: pb.FunctionDetails{ + ProcessingGuarantees: guarantee, + }, + }, + }, + } +} + +func TestShouldNackInputOnFailure(t *testing.T) { + tests := []struct { + name string + guarantee pb.ProcessingGuarantees + want bool + }{ + {"atLeastOnce", pb.ProcessingGuarantees_ATLEAST_ONCE, true}, + {"manual", pb.ProcessingGuarantees_MANUAL, true}, + {"atMostOnce", pb.ProcessingGuarantees_ATMOST_ONCE, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + instance := newTestGoInstance(tt.guarantee) + assert.Equal(t, tt.want, instance.shouldNackInputOnFailure()) + }) + } +} diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index 6fe2ef656a4..08587af16c7 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -24,6 +24,7 @@ ARG GOLANG_IMAGE FROM $GOLANG_IMAGE as pulsar-function-go COPY target/pulsar-function-go/ /go/src/github.com/apache/pulsar/pulsar-function-go +COPY go-examples/exceptionFunc/ /go/src/github.com/apache/pulsar/pulsar-function-go/examples/exceptionFunc/ RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go && go install ./... RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go install ./... diff --git a/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go new file mode 100644 index 00000000000..80ace6e21b9 --- /dev/null +++ b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go @@ -0,0 +1,41 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package main + +import ( + "context" + "errors" + + "github.com/apache/pulsar/pulsar-function-go/pf" +) + +var i int + +func HandleException(ctx context.Context, in []byte) ([]byte, error) { + i++ + if i%10 == 0 { + return nil, errors.New("test") + } + return []byte(string(in) + "!"), nil +} + +func main() { + pf.Start(HandleException) +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index fdda3144e8b..82418d9d9dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -404,6 +404,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { submitFunction( runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_FUNCTION_PYTHON_FILE, EXCEPTION_PYTHON_CLASS, schema, null); + } else if (runtime == Runtime.GO) { + submitFunction( + runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_GO_FILE, + null, schema, null); } else { submitFunction( runtime, inputTopicName, outputTopicName, functionName, null, EXCEPTION_JAVA_CLASS, schema, null); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index 288ced63ae5..84b31dca9a0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -78,6 +78,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite { public static final String EXCLAMATION_GO_FILE = "exclamationFunc"; public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc"; + public static final String EXCEPTION_GO_FILE = "exceptionFunc"; public static final String LOGGING_JAVA_CLASS = "org.apache.pulsar.functions.api.examples.LoggingFunction"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java index 0550fd94ebe..6e631adafbf 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java @@ -39,4 +39,9 @@ public abstract class PulsarFunctionsGoTest extends PulsarFunctionsTest { testExclamationFunction(Runtime.GO, false, false, true, false); } + @Test(groups = {"go_function", "function"}) + public void testGoFunctionNegAck() throws Exception { + testFunctionNegAck(Runtime.GO); + } + }
