flowchartsman commented on PR #20587:
URL: https://github.com/apache/pulsar/pull/20587#issuecomment-1602864557

   So initially I thought you were attempting to modify the primary output 
producer, but if you are talking about testing sending on other topics, then I 
still think another approach would be better. My primary concern is with adding 
a new, potentially confusing, context method to the API that could cause 
problems when used outside of a testing use-case.
   
   I could see some utility in modifying the producers settings for other 
output messages, but I think that any further functionality in this direction 
should avoid using the consuming "output message" nomenclature, which is an 
artifact of trying to make it look like the Java API and should probably be 
deprecated.
   
   I realize this doesn't solve your problem with regards to testing, and I 
think that `pf` definitely needs a better testing story, but I think that 
deserves a separate issue. We might be able to create a dedicated library that 
would consume pulsar function types and let you inspect the messages.  In the 
meantime, what you might be able to do is to use an approach similar to what I 
ended up with, which is to make your pulsar function closure wrapper around a 
function for the actual business logic, returning a type that encapsulates all 
of the messages you want to send, and then you can test that output instead.
   
   something like:
   
   ```go
   type FunctionResults struct {
        Output        []byte
        TopicMessages []TopicMsg
   }
   
   func (fr *FunctionResults) AddAdditionalMessage(topic string, payload 
[]byte) {
        fr.TopicMessages = append(fr.TopicMessages, TopicMsg{Topic: topic, 
Payload: payload})
   }
   
   type TopicMsg struct {
        Topic   string
        Payload []byte
   }
   
   func PulsarFunction(ctx context.Context, in []byte) ([]byte, error) {
        fctx, ok := pf.FromContext(ctx)
        if !ok {
                return nil, errors.New("get Go Functions Context error")
        }
        res, err := MyLogic(ctx, in)
        if err != nil {
                return nil, err
        }
        for _, additionalMsg := range res.TopicMessages {
                topicP := fctx.NewOutputMessage(additionalMsg.Topic)
                _, err := topicP.Send(ctx, &pulsar.ProducerMessage{
                        Payload: additionalMsg.Payload,
                })
                if err != nil {
                        return nil, fmt.Errorf("error sending message to %s: 
%v", additionalMsg.Topic, err)
                }
        }
        return res.Output, nil
   }
   
   func MyLogic(ctx context.Context, in []byte) (*FunctionResults, error) {
        res := &FunctionResults{}
        // your work here
        res.AddAdditionalMessage("sometopic", []byte("some output"))
        res.Output = []byte("this is my primary output")
        return res, nil
   }
   
   func main() {
        pf.Start(PulsarFunction)
   }
   ```
   
   You could even make a couple of convenience functions for returning only 
primary output or only a single topic message:
   
   ```go
   func TopicMessage(topic string, payload []byte) *FunctionResults {
        return &FunctionResults{
                TopicMessages: []TopicMsg{{
                        Topic:   topic,
                        Payload: payload,
                }},
        }
   }
   
   func OutputMessage(payload []byte) *FunctionResults {
        return &FunctionResults{
                Output: payload,
        }
   }
   ```
   
   Sorry if I misunderstood your initial proposal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to