devinbost edited a comment on issue #6198: Flaky-test: PulsarStateTest.testSinkState URL: https://github.com/apache/pulsar/issues/6198#issuecomment-581674740 When looking at the approach used in SimpleProducerConsumerTest.testAsyncProducerAndAsyncAck(..), that approach looks like this: ``` // Asynchronously produce messages for (int i = 0; i < 10; i++) { final String message = "my-message-" + i; Future<MessageId> future = producer.sendAsync(message.getBytes()); futures.add(future); } log.info("Waiting for async publish to complete"); for (Future<MessageId> future : futures) { future.get(); } Message<byte[]> msg = null; Set<String> messageSet = Sets.newHashSet(); for (int i = 0; i < 10; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); String receivedMessage = new String(msg.getData()); log.info("Received message: [{}]", receivedMessage); String expectedMessage = "my-message-" + i; testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); } // Asynchronously acknowledge upto and including the last message Future<Void> ackFuture = consumer.acknowledgeCumulativeAsync(msg); log.info("Waiting for async ack to complete"); ackFuture.get(); consumer.close(); ``` If we wait for the futures to complete like that, does that really guarantee that the publish was **fully** completed? (i.e. When `future.get();` is done blocking, does that guarantee that the message can be received/consumed?) If so, then this approach could be used instead. However, I am not yet convinced that the loop is fully closed by this approach. How would we determine if this approach fully closes the loop?
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
