This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new efed983c3e8 [C++] Remove the flaky and meaningless tests (#15271) efed983c3e8 is described below commit efed983c3e8f3ab9ee69ac069203bcdb8c8affc2 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Fri Apr 22 23:48:42 2022 +0800 [C++] Remove the flaky and meaningless tests (#15271) Fixes #13849 Fixes #14848 ### Motivation #11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for the case that some `sendAsync` calls that are invoked after `closeAsync` is called in another thread must complete with `ResultAlreadyClosed`. It's flaky because the synchronization between two threads is not strict. This test uses `sendStartLatch` for the order of `sendAsync` and `closeAsync`: ``` sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync ``` However, it cannot guarantee the rest `sendAsync` calls happen after `closeAsync` is called. If so, all `sendAsync` calls will complete with `ResultOk`. On the other hand, this test is meaningless because it requires strict synchronization between two threads so there is no need to run `sendAsync` and `closeAsync` in two threads. The verification of this test is also wrong, see https://github.com/apache/pulsar/issues/13849#issuecomment-1079098248. When `closeAsync` is called, the previous `sendAsync` calls might not complete, so all `sendAsync` will complete with `ResultAlreadyClosed`, not only those called after `closeAsync`. In addition, this PR also tries to fix the flaky `testReferenceCount`, which assumes too strictly. ### Modifications - Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` - Only check the reference count is greater than 0 instead of equal to 1 (cherry picked from commit eeea9ca1f6eeef1248b7fe8f36be30be835d2480) --- pulsar-client-cpp/tests/ClientTest.cc | 2 +- pulsar-client-cpp/tests/ProducerTest.cc | 83 --------------------------------- 2 files changed, 1 insertion(+), 84 deletions(-) diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 1ba0164ad87..364e170f896 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -211,7 +211,7 @@ TEST(ClientTest, testReferenceCount) { LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count()); readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader); - ASSERT_EQ(readerWeakPtr.use_count(), 1); + ASSERT_TRUE(readerWeakPtr.use_count() > 0); LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count()); } diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 258811fcdaf..9ddca1f7042 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -159,89 +159,6 @@ TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) { ASSERT_EQ(ResultOk, result); } -TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { - // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called - // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed. - for (int run = 0; run < 20; run++) { - LOG_INFO("Start of run " << run); - Client client(serviceUrl); - const std::string partitionedTopic = - "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); - - int res = makePutRequest( - adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10"); - ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; - - ProducerConfiguration producerConfiguration; - producerConfiguration.setLazyStartPartitionedProducers(true); - producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); - producerConfiguration.setBatchingEnabled(true); - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer)); - - int sendCount = 100; - std::vector<Promise<Result, MessageId>> promises(sendCount); - Promise<bool, Result> promiseClose; - - // only call closeAsync once at least 10 messages have been sent - Latch sendStartLatch(10); - Latch closeLatch(1); - int closedAt = 0; - - std::thread t1([&]() { - for (int i = 0; i < sendCount; i++) { - sendStartLatch.countdown(); - Message msg = MessageBuilder().setContent("test").build(); - - if (closeLatch.getCount() == 0 && closedAt == 0) { - closedAt = i; - LOG_INFO("closedAt set to " << closedAt) - } - - producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i])); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - }); - - std::thread t2([&]() { - sendStartLatch.wait(std::chrono::milliseconds(1000)); - LOG_INFO("Closing"); - producer.closeAsync(WaitForCallback(promiseClose)); - LOG_INFO("Close called"); - closeLatch.countdown(); - Result result; - promiseClose.getFuture().get(result); - ASSERT_EQ(ResultOk, result); - LOG_INFO("Closed"); - }); - - t1.join(); - t2.join(); - - // make sure that all messages after the moment when closeAsync was invoked - // return AlreadyClosed - for (int i = 0; i < sendCount; i++) { - LOG_DEBUG("Checking " << i) - - // whether a message was sent successfully or not, it's callback - // must have been invoked - ASSERT_EQ(true, promises[i].isComplete()); - MessageId mi; - Result res = promises[i].getFuture().get(mi); - LOG_DEBUG("Result is " << res); - - // for the messages sent after closeAsync was invoked, they - // should all return ResultAlreadyClosed - if (i >= closedAt) { - ASSERT_EQ(ResultAlreadyClosed, res); - } - } - - client.close(); - LOG_INFO("End of run " << run); - } -} - TEST(ProducerTest, testBacklogQuotasExceeded) { std::string ns = "public/test-backlog-quotas"; std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));