jerrypeng commented on a change in pull request #10154:
URL: https://github.com/apache/pulsar/pull/10154#discussion_r612026409
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -480,27 +481,54 @@ public void recordMetric(String metricName, double value)
{
if (producer == null) {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>)
pulsar.getProducerBuilder().clone())
- .schema(schema)
- .blockIfQueueFull(true)
- .enableBatching(true)
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
- .compressionType(CompressionType.LZ4)
- .hashingScheme(HashingScheme.Murmur3_32Hash) //
- .messageRoutingMode(MessageRoutingMode.CustomPartition)
- .messageRouter(FunctionResultRouter.of())
- // set send timeout to be infinity to prevent potential
deadlock with consumer
- // that might happen when consumer is blocked due to
unacked messages
- .sendTimeout(0, TimeUnit.SECONDS)
- .topic(topicName)
- .properties(InstanceUtils.getProperties(componentType,
- FunctionCommon.getFullyQualifiedName(
-
this.config.getFunctionDetails().getTenant(),
-
this.config.getFunctionDetails().getNamespace(),
-
this.config.getFunctionDetails().getName()),
- this.config.getInstanceId()))
- .create();
-
+ Producer<O> newProducer = null;
+
+ if (this.config.getFunctionDetails() != null &&
this.config.getFunctionDetails().getSink() != null &&
+ this.config.getFunctionDetails().getSink().getProducerSpec() !=
null){
Review comment:
`this.config.getFunctionDetails().getSink().getProducerSpec()` will
never return null. This is technically changing the existing behavior of
`publish` when called from `Context`. I don't think this is the right change.
If a user want to use a custom producer to produce in the function (instead of
just returning), the user should be able to:
1. Instantiate a PulsarClient, create a producer, and sent messages (already
can do this today)
2. We can add a new method called "newProducerBuilder()" to the Context in
which the returned builder automatically inherits some of the settings such as
authentication from the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]