This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4591b884d638951c9dda8ae87bd2c630b191e3ea Author: Yunze Xu <[email protected]> AuthorDate: Wed Nov 3 23:10:15 2021 +0800 [C++] Fix request timeout for GetLastMessageId doesn't work (#12586) * Fix request timeout for GetLastMessageId doesn't work * Fix CentOS 7 build error * Revert refactors * Remove redundant clear for listeners * Use swap instead of move (cherry picked from commit a54c6c003c626cb16d90200ad81dd3ec37be2133) --- pulsar-client-cpp/lib/ClientConnection.cc | 7 ++- pulsar-client-cpp/lib/Future.h | 32 +++++++----- pulsar-client-cpp/tests/PromiseTest.cc | 84 +++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 14 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 6e5245e..21fdd64 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -1597,7 +1597,12 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise)); lock.unlock(); - sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId); + sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId) + .addListener([promise](Result result, const ResponseData& data) { + if (result != ResultOk) { + promise.setFailed(result); + } + }); return promise.getFuture(); } diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h index cafb63f..b695e5e 100644 --- a/pulsar-client-cpp/lib/Future.h +++ b/pulsar-client-cpp/lib/Future.h @@ -90,7 +90,8 @@ class Promise { public: Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {} - bool setValue(const Type& value) { + bool setValue(const Type& value) const { + static Result DEFAULT_RESULT; InternalState<Result, Type>* state = state_.get(); Lock lock(state->mutex); @@ -99,21 +100,24 @@ class Promise { } state->value = value; - state->result = Result(); + state->result = DEFAULT_RESULT; state->complete = true; - typename std::list<ListenerCallback>::iterator it; - for (it = state->listeners.begin(); it != state->listeners.end(); ++it) { - ListenerCallback& callback = *it; - callback(state->result, state->value); + decltype(state->listeners) listeners; + listeners.swap(state->listeners); + + lock.unlock(); + + for (auto& callback : listeners) { + callback(DEFAULT_RESULT, value); } - state->listeners.clear(); state->condition.notify_all(); return true; } - bool setFailed(Result result) { + bool setFailed(Result result) const { + static Type DEFAULT_VALUE; InternalState<Result, Type>* state = state_.get(); Lock lock(state->mutex); @@ -124,13 +128,15 @@ class Promise { state->result = result; state->complete = true; - typename std::list<ListenerCallback>::iterator it; - for (it = state->listeners.begin(); it != state->listeners.end(); ++it) { - ListenerCallback& callback = *it; - callback(state->result, state->value); + decltype(state->listeners) listeners; + listeners.swap(state->listeners); + + lock.unlock(); + + for (auto& callback : listeners) { + callback(result, DEFAULT_VALUE); } - state->listeners.clear(); state->condition.notify_all(); return true; } diff --git a/pulsar-client-cpp/tests/PromiseTest.cc b/pulsar-client-cpp/tests/PromiseTest.cc new file mode 100644 index 0000000..73c6f8c --- /dev/null +++ b/pulsar-client-cpp/tests/PromiseTest.cc @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> +#include <lib/Future.h> +#include <chrono> +#include <string> +#include <thread> +#include <vector> + +using namespace pulsar; + +TEST(PromiseTest, testSetValue) { + Promise<int, std::string> promise; + std::thread t{[promise] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + promise.setValue("hello"); + }}; + t.detach(); + + std::string value; + ASSERT_EQ(promise.getFuture().get(value), 0); + ASSERT_EQ(value, "hello"); +} + +TEST(PromiseTest, testSetFailed) { + Promise<int, std::string> promise; + std::thread t{[promise] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + promise.setFailed(-1); + }}; + t.detach(); + + std::string value; + ASSERT_EQ(promise.getFuture().get(value), -1); + ASSERT_EQ(value, ""); +} + +TEST(PromiseTest, testListeners) { + Promise<int, std::string> promise; + auto future = promise.getFuture(); + + bool resultSetFailed = true; + bool resultSetValue = true; + std::vector<int> results; + std::vector<std::string> values; + + future + .addListener([promise, &resultSetFailed, &results, &values](int result, const std::string& value) { + resultSetFailed = promise.setFailed(-1L); + results.emplace_back(result); + values.emplace_back(value); + }) + .addListener([promise, &resultSetValue, &results, &values](int result, const std::string& value) { + resultSetValue = promise.setValue("WRONG"); + results.emplace_back(result); + values.emplace_back(value); + }); + + promise.setValue("hello"); + std::string value; + ASSERT_EQ(future.get(value), 0); + ASSERT_EQ(value, "hello"); + + ASSERT_FALSE(resultSetFailed); + ASSERT_FALSE(resultSetValue); + ASSERT_EQ(results, (std::vector<int>(2, 0))); + ASSERT_EQ(values, (std::vector<std::string>(2, "hello"))); +}
