eolivelli commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546070668
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer
cmdProducer) {
}
return null;
});
+
+ producerQueuedFuture.thenRun(() -> {
+ // If the producer is queued waiting, we
will get an immediate notification
+ // that we need to pass to client
+ if (isActive()) {
+ log.info("[{}] Producer is waiting in
qeuue: {}", remoteAddress, producer);
Review comment:
Typo qeuue
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -468,6 +469,19 @@ protected void
handleProducerSuccess(CommandProducerSuccess success) {
success.getRequestId(), success.getProducerName());
}
long requestId = success.getRequestId();
+ if (!success.getProducerReady()) {
+ // We got a success operation but the producer is not ready. This
means that the producer has been queued up
+ // in broker. We need to leave the future pending until we get the
final confirmation. We just mark that
+ // we have received a response, in order to avoid the timeout.
+ TimedCompletableFuture<?> requestFuture =
(TimedCompletableFuture<?>) pendingRequests.get(requestId);
+ if (requestFuture != null) {
+ log.info("{} Producer {} has been queued up at broker.
request: {}", ctx.channel(),
+ success.getProducerName(), requestId);
+ requestFuture.markAsResponded();
Review comment:
What happens if the PulsarClient gets closed or there is a network error?
Are we guaranteed to fail and do not wait forever?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]