This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 673ed2ce70a Revert "[fix][client-c++] Close `messages_` when
PartitionedConsumer is closed (#16887)"
673ed2ce70a is described below
commit 673ed2ce70a7936473390f174c3a2a7f204a0699
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Aug 6 01:42:49 2022 +0800
Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is
closed (#16887)"
This reverts commit 62a3590028988243372a0ab9f4bcb163bcd95135.
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 ---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 7 ----
pulsar-client-cpp/tests/ConsumerTest.cc | 43 ------------------------
4 files changed, 58 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index ec145a247ae..1f4a6b5b988 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -701,10 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int
timeout) {
messageProcessed(msg);
return ResultOk;
} else {
- Lock lock(mutex_);
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 361a863d247..0ae86d5879a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,10 +493,6 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int
timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
- lock.lock();
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 48f1e4a398b..e43b5090e43 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,10 +106,6 @@ Result PartitionedConsumerImpl::receive(Message& msg, int
timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
- lock.lock();
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
@@ -432,9 +428,6 @@ void PartitionedConsumerImpl::messageReceived(Consumer
consumer, const Message&
void PartitionedConsumerImpl::failPendingReceiveCallback() {
Message msg;
-
- messages_.close();
-
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index 3d9fc291ff7..4680281478b 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -505,47 +505,4 @@ TEST(ConsumerTest, testIsConnected) {
ASSERT_FALSE(consumer.isConnected());
}
-TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
- Client client(lookupUrl);
- const std::string partitionedTopic = "testPartitionsWithCloseUnblock" +
std::to_string(time(nullptr));
- constexpr int numPartitions = 2;
-
- int res =
- makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
- std::to_string(numPartitions));
- ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
- Consumer consumer;
- ConsumerConfiguration consumerConfig;
- ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName",
consumerConfig, consumer));
-
- // send messages
- ProducerConfiguration producerConfig;
- Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic,
producerConfig, producer));
- Message msg = MessageBuilder().setContent("message").build();
- ASSERT_EQ(ResultOk, producer.send(msg));
-
- producer.close();
-
- // receive message on another thread
- pulsar::Latch latch(1);
- auto thread = std::thread([&]() {
- Message msg;
- ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
- consumer.acknowledge(msg.getMessageId());
- ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
- latch.countdown();
- });
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
-
- consumer.close();
-
- bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
-
- ASSERT_TRUE(wasUnblocked);
- thread.join();
-}
-
} // namespace pulsar