lhotari commented on issue #25936:
URL: https://github.com/apache/pulsar/issues/25936#issuecomment-4626513842

   > [@lhotari](https://github.com/lhotari) Thank you. Yes, we are in the 
process of upgrading. As you know, we have to deal with the current situation. 
if you could share your view on the proposal here [#25936 
(comment)](https://github.com/apache/pulsar/issues/25936#issuecomment-4626330592)
 , that would be great.
   
   There shouldn't be a `.get(10, TimeUnit.SECONDS)` call in the code.
   
   For clarity, this would be the way:
   ```java
       public static Producer<byte[]> 
createExclusiveProducerWithRetry(PulsarClient client,
                                                                       String 
topic,
                                                                       String 
producerName,
                                                                       
Supplier<Boolean> isLeader,
                                                                       int 
sleepInBetweenMs) throws NotLeaderAnymore {
           try {
               int tries = 0;
               do {
                   try {
                       CompletableFuture<Producer<byte[]>> producerFuture = 
client.newProducer().topic(topic)
                               .accessMode(ProducerAccessMode.Exclusive)
                               .enableBatching(false)
                               .blockIfQueueFull(true)
                               .compressionType(CompressionType.LZ4)
                               .producerName(producerName)
                               .createAsync();
                       return 
FutureUtil.getAndCleanupOnInterrupt(producerFuture, Producer::closeAsync);
                   } catch (Exception e) {
                       log.info().attr("topic", topic).exception(e)
                               .log("Encountered exception while creating 
exclusive producer");
                   }
                   tries++;
                   if (tries % 6 == 0) {
                       log.debug().attr("topic", topic)
                               .attr("attempts", tries)
                               .log("Failed to acquire exclusive producer."
                                       + " Will retry if we are still the 
leader.");
                   }
                   Thread.sleep(sleepInBetweenMs);
               } while (isLeader.get());
           } catch (InterruptedException e) {
               throw new RuntimeException("Failed to create exclusive producer 
on topic " + topic, e);
           }
   
           throw new NotLeaderAnymore();
       }
   ```
   
   The `FutureUtil.getAndCleanupOnInterrupt` handles the `InterruptedException` 
and that's sufficient.
   The timeout will need to rely on Pulsar client's operationTimeout. That can 
be configured with `brokerClient_operationTimeoutMs` in `functions_worker.yml`.
   
   
https://github.com/apache/pulsar/blob/cf2faefd3ba7b1700921e4e104d02a057d316130/conf/functions_worker.yml#L460-L462
   
   
https://github.com/apache/pulsar/blob/cf2faefd3ba7b1700921e4e104d02a057d316130/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java#L297-L301
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to