flowchartsman commented on a change in pull request #8327:
URL: https://github.com/apache/pulsar/pull/8327#discussion_r511014789



##########
File path: pulsar-function-go/pf/context.go
##########
@@ -119,6 +120,12 @@ func (c *FunctionContext) GetUserConfMap() 
map[string]interface{} {
        return c.userConfigs
 }
 
+// NewOutputMessage send message to the topic
+// @param topicName: The name of the topic for output message
+func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {

Review comment:
       I can understand the desire for naming parity, but I wonder whether or 
not it is better to call this something different, given how different the 
concepts are.  In Go, if I see `New<Foo>` I expect that I am going to be 
receiving a pointer to a new struct of that type, or at least an interface of 
some kind with that name.  In this case, I would expect to get a new Message.
   
   I think I would probably recommend something like the following:
   
   ```go
   func (c *FunctionContext) SendTopic(topicName string, message 
*pulsar.ProducerMessage)
   ```
   
   or even
   ```go
   func (c *FunctionContext) SendTopicMessage(topicName string, message 
*pulsar.ProducerMessage)
   func (c *FunctionContext) SendTopic(topicName string, payload []byte)
   ```
   
   which, given the way producers are currently packaged, might benefit from a 
registry of producers per topic:
   
   ```go
   var producerRegistry = map[string]pulsar.Producer{}
   func (gi *goInstance) getProducer(topicName string) (pulsar.Producer, error) 
{
           if found := producerRegistry[topicName]; found != nil {
                   return found, nil
           }
        properties := getProperties(getDefaultSubscriptionName(
                gi.context.instanceConf.funcDetails.Tenant,
                gi.context.instanceConf.funcDetails.Namespace,
                gi.context.instanceConf.funcDetails.Name), 
gi.context.instanceConf.instanceID)
   
   
        producer, err := gi.client.CreateProducer(pulsar.ProducerOptions{
                Topic:                   topicName,
                Properties:              properties,
                CompressionType:         pulsar.LZ4,
                BatchingMaxPublishDelay: time.Millisecond * 10,
                // Set send timeout to be infinity to prevent potential 
deadlock with consumer
                // that might happen when consumer is blocked due to unacked 
messages
        })
        if err != nil {
                gi.stats.incrTotalSysExceptions(err)
                log.Errorf("create producer error:%s", err.Error())
                return nil, err
        }
           producerRegistry[topicName] = producer
        return producer, err
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to