lhotari commented on issue #25936: URL: https://github.com/apache/pulsar/issues/25936#issuecomment-4626513842
> [@lhotari](https://github.com/lhotari) Thank you. Yes, we are in the process of upgrading. As you know, we have to deal with the current situation. if you could share your view on the proposal here [#25936 (comment)](https://github.com/apache/pulsar/issues/25936#issuecomment-4626330592) , that would be great. There shouldn't be a `.get(10, TimeUnit.SECONDS)` call in the code. For clarity, this would be the way: ```java public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client, String topic, String producerName, Supplier<Boolean> isLeader, int sleepInBetweenMs) throws NotLeaderAnymore { try { int tries = 0; do { try { CompletableFuture<Producer<byte[]>> producerFuture = client.newProducer().topic(topic) .accessMode(ProducerAccessMode.Exclusive) .enableBatching(false) .blockIfQueueFull(true) .compressionType(CompressionType.LZ4) .producerName(producerName) .createAsync(); return FutureUtil.getAndCleanupOnInterrupt(producerFuture, Producer::closeAsync); } catch (Exception e) { log.info().attr("topic", topic).exception(e) .log("Encountered exception while creating exclusive producer"); } tries++; if (tries % 6 == 0) { log.debug().attr("topic", topic) .attr("attempts", tries) .log("Failed to acquire exclusive producer." + " Will retry if we are still the leader."); } Thread.sleep(sleepInBetweenMs); } while (isLeader.get()); } catch (InterruptedException e) { throw new RuntimeException("Failed to create exclusive producer on topic " + topic, e); } throw new NotLeaderAnymore(); } ``` The `FutureUtil.getAndCleanupOnInterrupt` handles the `InterruptedException` and that's sufficient. The timeout will need to rely on Pulsar client's operationTimeout. That can be configured with `brokerClient_operationTimeoutMs` in `functions_worker.yml`. https://github.com/apache/pulsar/blob/cf2faefd3ba7b1700921e4e104d02a057d316130/conf/functions_worker.yml#L460-L462 https://github.com/apache/pulsar/blob/cf2faefd3ba7b1700921e4e104d02a057d316130/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java#L297-L301 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
