devinbost commented on a change in pull request #10154:
URL: https://github.com/apache/pulsar/pull/10154#discussion_r612084685
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -480,27 +481,54 @@ public void recordMetric(String metricName, double value)
{
if (producer == null) {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>)
pulsar.getProducerBuilder().clone())
- .schema(schema)
- .blockIfQueueFull(true)
- .enableBatching(true)
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
- .compressionType(CompressionType.LZ4)
- .hashingScheme(HashingScheme.Murmur3_32Hash) //
- .messageRoutingMode(MessageRoutingMode.CustomPartition)
- .messageRouter(FunctionResultRouter.of())
- // set send timeout to be infinity to prevent potential
deadlock with consumer
- // that might happen when consumer is blocked due to
unacked messages
- .sendTimeout(0, TimeUnit.SECONDS)
- .topic(topicName)
- .properties(InstanceUtils.getProperties(componentType,
- FunctionCommon.getFullyQualifiedName(
-
this.config.getFunctionDetails().getTenant(),
-
this.config.getFunctionDetails().getNamespace(),
-
this.config.getFunctionDetails().getName()),
- this.config.getInstanceId()))
- .create();
-
+ Producer<O> newProducer = null;
+
+ if (this.config.getFunctionDetails() != null &&
this.config.getFunctionDetails().getSink() != null &&
+ this.config.getFunctionDetails().getSink().getProducerSpec() !=
null){
Review comment:
@jerrypeng Thanks for reviewing the PR. I want to better understand your
suggestion. I'm hearing several things, so please correct any of my
misunderstandings:
1. Instead of performing the null check on
`this.config.getFunctionDetails().getSink().getProducerSpec()`, I could skip
the null check and just pass through the protobuf properties directly. The
defaults they use match the existing configurations, but it sounds like there's
a concern that it could still change existing behavior?
2. You're saying that I could define a new method, `newProducerBuilder()` to
inherit the protobuf settings (instead of passing them through directly, like I
mentioned above), but it sounds like there are other settings I'd want to
inherit as well (such as relating to authentication)?
Would the `newProducerBuilder()` method be used instead of `getProducer()`
by functions that want to use the new settings? It sounds like that would
require a code change for any existing function that we want to update to use
the new settings. Is that right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]