michaeljmarshall commented on a change in pull request #12195:
URL: https://github.com/apache/pulsar/pull/12195#discussion_r719891727



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -843,72 +843,78 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        final State currentState = getAndUpdateState(state -> {
-            if (state == State.Closed) {
-                return state;
-            }
-            return State.Closing;
-        });
-
-        if (currentState == State.Closed || currentState == State.Closing) {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        Timeout timeout = sendTimeout;
-        if (timeout != null) {
-            timeout.cancel();
-            sendTimeout = null;
-        }
+        CompletableFuture<Void> flushAndCloseFuture = new 
CompletableFuture<>();
+        flushAsync().thenRun(() -> {
+            final State currentState = getAndUpdateState(state -> {
+                if (state == State.Closed) {
+                    return state;
+                }
+                return State.Closing;
+            });

Review comment:
       I think we should set the state of the producer to `Closing` before 
triggering the flush. Otherwise, a message could be added to 
`batchMessageContainer` after the flush and before the state is set to 
`Closing`.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -843,72 +843,78 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        final State currentState = getAndUpdateState(state -> {
-            if (state == State.Closed) {
-                return state;
-            }
-            return State.Closing;
-        });
-
-        if (currentState == State.Closed || currentState == State.Closing) {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        Timeout timeout = sendTimeout;
-        if (timeout != null) {
-            timeout.cancel();
-            sendTimeout = null;
-        }
+        CompletableFuture<Void> flushAndCloseFuture = new 
CompletableFuture<>();
+        flushAsync().thenRun(() -> {

Review comment:
       I am pretty sure we want to call `triggerFlush` instead of `flushAsync`. 
The future returned by `flushAsync` won't complete until the message has 
delivered and been acked. All we're woried about is ensuring that the buffered 
message has been sent on the TCP connection before sending the `CLOSE_PRODUCER` 
command. `triggerFlush` gives us these semantics.
   
   As a consequence, we won't need to wrap the majority of this method in a 
callback, and we won't need the `flushAndCloseFuture` variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to