xiangzilv123 commented on issue #3226: Unclosed consumer after calling closeAsync URL: https://github.com/apache/pulsar/issues/3226#issuecomment-449265114 ```java package test; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.TimeUnit; public class TestMain { private static void waitTillNextMinute() throws InterruptedException { System.out.println("waiting for next minute"); long m = System.currentTimeMillis() / (60 * 1000); do { Thread.sleep(1000); } while (System.currentTimeMillis() / (60 * 1000) == m); } public static void main(String[] args) throws Exception { String url = "pulsar://localhost:6650"; String topic = "test-topic"; String subscriptionName = "test-subscription"; PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(url) .build(); System.out.println("client created"); while (true) { Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .ackTimeout(2 * 60, TimeUnit.MINUTES) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .subscribe(); System.out.println("consumer created"); waitTillNextMinute(); System.out.println("will unsub or close"); Consumer<byte[]> ref = consumer; consumer.unsubscribeAsync().whenComplete((r, e) -> { if (e == null) { System.out.println("unsub succeeded"); } else { e.printStackTrace(); } ref.closeAsync().whenComplete((r2, e2) -> { if (e2 == null) { System.out.println("close succeeded"); } else { e2.printStackTrace(); } }); }); waitTillNextMinute(); } } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
