michaeljmarshall commented on code in PR #20737:
URL: https://github.com/apache/pulsar/pull/20737#discussion_r1266202496


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -827,39 +826,41 @@ public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         synchronized (this) {
             setClientCnx(cnx);
-            ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
-                    priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
-                    conf.isReplicateSubscriptionState(),
-                    
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
-                    startMessageRollbackDuration, si, 
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
-                    // Use the current epoch to subscribe.
-                    conf.getSubscriptionProperties(), 
CONSUMER_EPOCH.get(this));
-
-            cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-                synchronized (ConsumerImpl.this) {
-                    if (changeToReadyState()) {
-                        consumerIsReconnectedToBroker(cnx, currentSize);
-                    } else {
-                        // Consumer was closed while reconnecting, close the 
connection to make sure the broker
-                        // drops the consumer on its side
-                        setState(State.Closed);
-                        deregisterFromClientCnx();
-                        client.cleanupConsumer(this);
-                        cnx.channel().close();
-                        future.complete(null);
-                        return;
+            final SchemaInfo finalSi = si;
+            sendCloseConsumerCommand(cnx).thenCompose(ignore -> {

Review Comment:
   This is a fundamental change to the protocol. It feels like there must be a 
bug somewhere else.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -872,20 +873,17 @@ public CompletableFuture<Void> connectionOpened(final 
ClientCnx cnx) {
                 log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
                         subscription, cnx.channel().remoteAddress());
 
-                if (e.getCause() instanceof 
PulsarClientException.TimeoutException) {
-                    // Creating the consumer has timed out. We need to ensure 
the broker closes the consumer
-                    // in case it was indeed created, otherwise it might 
prevent new create consumer operation,
-                    // since we are not necessarily closing the connection.
-                    long closeRequestId = client.newRequestId();
-                    ByteBuf cmd = Commands.newCloseConsumer(consumerId, 
closeRequestId);
-                    cnx.sendRequestWithId(cmd, closeRequestId);
-                }
-
                 if (e.getCause() instanceof PulsarClientException
                         && PulsarClientException.isRetriableError(e.getCause())
                         && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
                     future.completeExceptionally(e.getCause());
                 } else if (!subscribeFuture.isDone()) {
+                    // Creating the consumer has timed out. We need to ensure 
the broker closes the consumer
+                    // in case it was indeed created, otherwise it might 
prevent new create consumer operation,
+                    // since we are not necessarily closing the connection.
+                    if (e.getCause() instanceof 
PulsarClientException.TimeoutException) {
+                        sendCloseConsumerCommand(cnx);
+                    }

Review Comment:
   By moving this logic here, we won't _always_ send the `CLOSE_CONSUMER` 
protocol message after the initial connection is made, which will be 
problematic for timeouts that happen after load balancing. As the comment in 
the code suggests, the broker expects a client to send the `CLOSE_CONSUMER` 
command before sending a second `SUBSCRIBE` command.
   
   I am concerned that moving this logic here could lead to unintended 
consequences in the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to