flowchartsman commented on PR #20587:
URL: https://github.com/apache/pulsar/pull/20587#issuecomment-1602864557
So initially I thought you were attempting to modify the primary output
producer, but if you are talking about testing sending on other topics, then I
still think another approach would be better. My primary concern is with adding
a new, potentially confusing, context method to the API that could cause
problems when used outside of a testing use-case.
I could see some utility in modifying the producers settings for other
output messages, but I think that any further functionality in this direction
should avoid using the consuming "output message" nomenclature, which is an
artifact of trying to make it look like the Java API and should probably be
deprecated.
I realize this doesn't solve your problem with regards to testing, and I
think that `pf` definitely needs a better testing story, but I think that
deserves a separate issue. We might be able to create a dedicated library that
would consume pulsar function types and let you inspect the messages. In the
meantime, what you might be able to do is to use an approach similar to what I
ended up with, which is to make your pulsar function closure wrapper around a
function for the actual business logic, returning a type that encapsulates all
of the messages you want to send, and then you can test that output instead.
something like:
```go
type FunctionResults struct {
Output []byte
TopicMessages []TopicMsg
}
func (fr *FunctionResults) AddAdditionalMessage(topic string, payload
[]byte) {
fr.TopicMessages = append(fr.TopicMessages, TopicMsg{Topic: topic,
Payload: payload})
}
type TopicMsg struct {
Topic string
Payload []byte
}
func PulsarFunction(ctx context.Context, in []byte) ([]byte, error) {
fctx, ok := pf.FromContext(ctx)
if !ok {
return nil, errors.New("get Go Functions Context error")
}
res, err := MyLogic(ctx, in)
if err != nil {
return nil, err
}
for _, additionalMsg := range res.TopicMessages {
topicP := fctx.NewOutputMessage(additionalMsg.Topic)
_, err := topicP.Send(ctx, &pulsar.ProducerMessage{
Payload: additionalMsg.Payload,
})
if err != nil {
return nil, fmt.Errorf("error sending message to %s:
%v", additionalMsg.Topic, err)
}
}
return res.Output, nil
}
func MyLogic(ctx context.Context, in []byte) (*FunctionResults, error) {
res := &FunctionResults{}
// your work here
res.AddAdditionalMessage("sometopic", []byte("some output"))
res.Output = []byte("this is my primary output")
return res, nil
}
func main() {
pf.Start(PulsarFunction)
}
```
You could even make a couple of convenience functions for returning only
primary output or only a single topic message:
```go
func TopicMessage(topic string, payload []byte) *FunctionResults {
return &FunctionResults{
TopicMessages: []TopicMsg{{
Topic: topic,
Payload: payload,
}},
}
}
func OutputMessage(payload []byte) *FunctionResults {
return &FunctionResults{
Output: payload,
}
}
```
Sorry if I misunderstood your initial proposal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]