devinbost commented on a change in pull request #10154:
URL: https://github.com/apache/pulsar/pull/10154#discussion_r612084685



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -480,27 +481,54 @@ public void recordMetric(String metricName, double value) 
{
 
         if (producer == null) {
 
-            Producer<O> newProducer = ((ProducerBuilderImpl<O>) 
pulsar.getProducerBuilder().clone())
-                    .schema(schema)
-                    .blockIfQueueFull(true)
-                    .enableBatching(true)
-                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
-                    .compressionType(CompressionType.LZ4)
-                    .hashingScheme(HashingScheme.Murmur3_32Hash) //
-                    .messageRoutingMode(MessageRoutingMode.CustomPartition)
-                    .messageRouter(FunctionResultRouter.of())
-                    // set send timeout to be infinity to prevent potential 
deadlock with consumer
-                    // that might happen when consumer is blocked due to 
unacked messages
-                    .sendTimeout(0, TimeUnit.SECONDS)
-                    .topic(topicName)
-                    .properties(InstanceUtils.getProperties(componentType,
-                            FunctionCommon.getFullyQualifiedName(
-                                    
this.config.getFunctionDetails().getTenant(),
-                                    
this.config.getFunctionDetails().getNamespace(),
-                                    
this.config.getFunctionDetails().getName()),
-                            this.config.getInstanceId()))
-                    .create();
-
+            Producer<O> newProducer = null;
+
+            if (this.config.getFunctionDetails() != null && 
this.config.getFunctionDetails().getSink() != null &&
+            this.config.getFunctionDetails().getSink().getProducerSpec() != 
null){

Review comment:
       @jerrypeng Thanks for reviewing the PR. I want to better understand your 
suggestion. I'm hearing several things, so please correct any of my 
misunderstandings:
   1. Instead of performing the null check on 
`this.config.getFunctionDetails().getSink().getProducerSpec()`, I could skip 
the null check and just pass through the protobuf properties directly. The 
defaults they use match the existing configurations, but it sounds like there's 
a concern that it could still change existing behavior? 
   2. You're saying that I could define a new method, `newProducerBuilder()` to 
inherit the protobuf settings (instead of passing them through directly, like I 
mentioned above), but it sounds like there are other settings I'd want to 
inherit as well (such as relating to authentication)? 
   
   Would the `newProducerBuilder()` method be used instead of `getProducer()` 
by functions that want to use the new settings? It sounds like that would 
require a code change for any existing function that we want to update to use 
the new settings. Is that right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to