This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 522af14 [C++] Handle OAuth 2.0 exceptional cases gracefully (#12335)
522af14 is described below
commit 522af14a098a05862e61f19228a7b291fcd7cba6
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Oct 13 11:05:26 2021 +0800
[C++] Handle OAuth 2.0 exceptional cases gracefully (#12335)
Fixes #12324
Currently if any error happened during OAuth 2.0 authentication in C++
client, a runtime error would be thrown and could only be caught when creating
an `AuthOauth` object, but could not be caught in `Client`'s method like
`createProducer`. It's not graceful. What's worse, there's no way for Python
client that is a wrapper of C++ client to caught this exception.
When `ClientCredentialFlow::authenticate` returns an invalid
`Oauth2TokenResult`, catch the `runtime_error` thrown in `Oauth2CachedToken`'s
constructor and returns `ResultAuthenticationError` as
`AuthOauth2::getAuthData`'s returned value. Since `getAuthData` always returns
`ResultOk` before this PR, the related docs are also modified.
Then when a CONNECT or AUTH_RESPONSE command is created, expose the result
of `getAuthData`. If it's not `ResultOk`, close the connection and complete the
connection's future with the result. After that, the `Client`'s API will be
completed with the result.
In addition, this PR also makes the error code of libcurl human readable by
configuring `CURLOPT_ERRORBUFFER`.
- [x] Make sure that the change passes the CI checks.
This change added tests `AuthPluginTest.testOauth2Failure` to verify when
OAuth 2.0 authentication failed, the `createProducer` would return
`ResultAuthenticationError` without any exception thrown.
(cherry picked from commit 06b68bb1d5859fd969f66c29888feda271817ec5)
---
pulsar-client-cpp/include/pulsar/Authentication.h | 6 +--
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 2 +-
pulsar-client-cpp/lib/ClientConnection.cc | 21 ++++++--
pulsar-client-cpp/lib/ClientConnection.h | 2 +-
pulsar-client-cpp/lib/Commands.cc | 18 +++++--
pulsar-client-cpp/lib/Commands.h | 4 +-
pulsar-client-cpp/lib/HTTPLookupService.cc | 5 +-
pulsar-client-cpp/lib/auth/AuthOauth2.cc | 42 +++++++++++----
pulsar-client-cpp/tests/AuthPluginTest.cc | 62 ++++++++++++++++++-----
9 files changed, 121 insertions(+), 41 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 991e35b..185ac33 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -98,7 +98,7 @@ class PULSAR_PUBLIC Authentication {
*
* @param[out] authDataContent the shared pointer of AuthenticationData.
The content of AuthenticationData
* is changed to the internal data of the current instance.
- * @return ResultOk
+ * @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
authDataContent = authData_;
@@ -450,7 +450,7 @@ class CachedToken {
/**
* Get AuthenticationData from the current instance
*
- * @return ResultOk
+ * @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual AuthenticationDataPtr getAuthData() = 0;
@@ -504,7 +504,7 @@ class PULSAR_PUBLIC AuthOauth2 : public Authentication {
*
* @param[out] authDataOauth2 the shared pointer of AuthenticationData.
The content of AuthenticationData
* is changed to the internal data of the current instance.
- * @return ResultOk
+ * @return ResultOk or ResultAuthenticationError if authentication failed
*/
Result getAuthData(AuthenticationDataPtr& authDataOauth2);
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 069e0e2..c7b7b09 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -130,7 +130,7 @@ void
BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
const
ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
- promise->setFailed(ResultConnectError);
+ promise->setFailed(result);
Future<Result, LookupDataResultPtr> future = promise->getFuture();
return;
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 82cb48c..3bcacc1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -467,7 +467,14 @@ void ClientConnection::handleHandshake(const
boost::system::error_code& err) {
}
bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
- SharedBuffer buffer = Commands::newConnect(authentication_,
logicalAddress_, connectingThroughProxy);
+ Result result = ResultOk;
+ SharedBuffer buffer =
+ Commands::newConnect(authentication_, logicalAddress_,
connectingThroughProxy, result);
+ if (result != ResultOk) {
+ LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
+ close(result);
+ return;
+ }
// Send CONNECT command to broker
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentPulsarConnect,
shared_from_this(),
std::placeholders::_1, buffer));
@@ -1133,7 +1140,13 @@ void ClientConnection::handleIncomingCommand() {
case BaseCommand::AUTH_CHALLENGE: {
LOG_DEBUG(cnxString_ << "Received auth challenge from
broker");
- SharedBuffer buffer =
Commands::newAuthResponse(authentication_);
+ Result result;
+ SharedBuffer buffer =
Commands::newAuthResponse(authentication_, result);
+ if (result != ResultOk) {
+ LOG_ERROR(cnxString_ << "Failed to send auth response:
" << result);
+ close(result);
+ break;
+ }
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
std::placeholders::_1, buffer));
@@ -1458,7 +1471,7 @@ void ClientConnection::handleConsumerStatsTimeout(const
boost::system::error_cod
startConsumerStatsTimer(consumerStatsRequests);
}
-void ClientConnection::close() {
+void ClientConnection::close(Result result) {
Lock lock(mutex_);
if (isClosed()) {
return;
@@ -1515,7 +1528,7 @@ void ClientConnection::close() {
HandlerBase::handleDisconnection(ResultConnectError,
shared_from_this(), it->second);
}
- connectPromise_.setFailed(ResultConnectError);
+ connectPromise_.setFailed(result);
// Fail all pending requests, all these type are map whose value type
contains the Promise object
for (auto& kv : pendingRequests) {
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index a20219c..cd88f6e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -113,7 +113,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();
- void close();
+ void close(Result result = ResultConnectError);
bool isClosed() const;
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index 5b17d24..b90f5a8 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -209,7 +209,7 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers,
BaseCommand& cmd, uint
}
SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication,
const std::string& logicalAddress,
- bool connectingThroughProxy) {
+ bool connectingThroughProxy, Result& result)
{
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
@@ -226,13 +226,18 @@ SharedBuffer Commands::newConnect(const
AuthenticationPtr& authentication, const
}
AuthenticationDataPtr authDataContent;
- if (authentication->getAuthData(authDataContent) == ResultOk &&
authDataContent->hasDataFromCommand()) {
+ result = authentication->getAuthData(authDataContent);
+ if (result != ResultOk) {
+ return SharedBuffer{};
+ }
+
+ if (authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
}
return writeMessageWithSize(cmd);
}
-SharedBuffer Commands::newAuthResponse(const AuthenticationPtr&
authentication) {
+SharedBuffer Commands::newAuthResponse(const AuthenticationPtr&
authentication, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::AUTH_RESPONSE);
CommandAuthResponse* authResponse = cmd.mutable_authresponse();
@@ -242,7 +247,12 @@ SharedBuffer Commands::newAuthResponse(const
AuthenticationPtr& authentication)
authData->set_auth_method_name(authentication->getAuthMethodName());
AuthenticationDataPtr authDataContent;
- if (authentication->getAuthData(authDataContent) == ResultOk &&
authDataContent->hasDataFromCommand()) {
+ result = authentication->getAuthData(authDataContent);
+ if (result != ResultOk) {
+ return SharedBuffer{};
+ }
+
+ if (authDataContent->hasDataFromCommand()) {
authData->set_auth_data(authDataContent->getCommandData());
}
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 02ebaad..c3d8343 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -70,9 +70,9 @@ class Commands {
const static int checksumSize = 4;
static SharedBuffer newConnect(const AuthenticationPtr& authentication,
const std::string& logicalAddress,
- bool connectingThroughProxy);
+ bool connectingThroughProxy, Result&
result);
- static SharedBuffer newAuthResponse(const AuthenticationPtr&
authentication);
+ static SharedBuffer newAuthResponse(const AuthenticationPtr&
authentication, Result& result);
static SharedBuffer newPartitionMetadataRequest(const std::string& topic,
uint64_t requestId);
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index e881ac4..45e56d0 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -190,10 +190,7 @@ Result HTTPLookupService::sendHTTPRequest(const
std::string completeUrl, std::st
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
- LOG_ERROR(
- "All Authentication methods should have AuthenticationData and
return true on getAuthData for "
- "url "
- << completeUrl);
+ LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index c7cc2bf..a9a1498 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -159,6 +159,10 @@ static size_t curlWriteCallback(void* contents, size_t
size, size_t nmemb, void*
}
void ClientCredentialFlow::initialize() {
+ if (issuerUrl_.empty()) {
+ LOG_ERROR("Failed to initialize ClientCredentialFlow: issuer_url is
not set");
+ return;
+ }
if (!keyFile_.isValid()) {
return;
}
@@ -188,6 +192,9 @@ void ClientCredentialFlow::initialize() {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);
+ char errorBuffer[CURL_ERROR_SIZE];
+ curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);
+
// Make get call to server
res = curl_easy_perform(handle);
@@ -217,8 +224,8 @@ void ClientCredentialFlow::initialize() {
}
break;
default:
- LOG_ERROR("Response failed for getting the well-known
configuration " << issuerUrl_
-
<< ". Error Code " << res);
+ LOG_ERROR("Response failed for getting the well-known
configuration "
+ << issuerUrl_ << ". Error Code " << res << ": " <<
errorBuffer);
break;
}
// Free header list
@@ -282,6 +289,9 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
curl_easy_setopt(handle, CURLOPT_POSTFIELDS, jsonBody.c_str());
+ char errorBuffer[CURL_ERROR_SIZE];
+ curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);
+
// Make get call to server
res = curl_easy_perform(handle);
@@ -302,19 +312,26 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate()
{
break;
}
-
resultPtr->setAccessToken(root.get<std::string>("access_token"));
- resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));
-
- LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
- << " expires_in: " <<
resultPtr->getExpiresIn());
+
resultPtr->setAccessToken(root.get<std::string>("access_token", ""));
+ resultPtr->setExpiresIn(
+ root.get<uint32_t>("expires_in",
Oauth2TokenResult::undefined_expiration));
+
resultPtr->setRefreshToken(root.get<std::string>("refresh_token", ""));
+ resultPtr->setIdToken(root.get<std::string>("id_token", ""));
+
+ if (!resultPtr->getAccessToken().empty()) {
+ LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
+ << " expires_in: " <<
resultPtr->getExpiresIn());
+ } else {
+ LOG_ERROR("Response doesn't contain access_token, the
response is: " << responseData);
+ }
} else {
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ".
response Code "
<< response_code <<
" passedin: " << jsonBody);
}
break;
default:
- LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ".
Error Code " << res
- << " passedin: " <<
jsonBody);
+ LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ".
ErrorCode " << res << ": "
+ << errorBuffer << "
passedin: " << jsonBody);
break;
}
// Free header list
@@ -362,7 +379,12 @@ const std::string AuthOauth2::getAuthMethodName() const {
return "token"; }
Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
- cachedTokenPtr_ = CachedTokenPtr(new
Oauth2CachedToken(flowPtr_->authenticate()));
+ try {
+ cachedTokenPtr_ = CachedTokenPtr(new
Oauth2CachedToken(flowPtr_->authenticate()));
+ } catch (const std::runtime_error& e) {
+ // The real error logs have already been printed in authenticate()
+ return ResultAuthenticationError;
+ }
}
authDataContent = cachedTokenPtr_->getAuthData();
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc
b/pulsar-client-cpp/tests/AuthPluginTest.cc
index d15a383..5794723 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -364,18 +364,10 @@ TEST(AuthPluginTest, testOauth2WrongSecret) {
"client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
"audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
- int expectedTokenLength = 3379;
- LOG_INFO("PARAMS: " << params);
- pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
- ASSERT_EQ(auth->getAuthMethodName(), "token");
-
- auth->getAuthData(data);
-
- FAIL() << "Expected fail for wrong secret when to get token from
server";
-
- } catch (...) {
- // expected
- }
+ LOG_INFO("PARAMS: " << params);
+ pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
+ ASSERT_EQ(auth->getAuthMethodName(), "token");
+ ASSERT_EQ(auth->getAuthData(data), ResultAuthenticationError);
}
TEST(AuthPluginTest, testOauth2CredentialFile) {
@@ -427,3 +419,49 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
ClientCredentialFlow flow2(params);
ASSERT_EQ(flow2.generateJsonBody(), expectedJson);
}
+
+TEST(AuthPluginTest, testOauth2Failure) {
+ ParamMap params;
+ auto addKeyValue = [&](const std::string& key, const std::string& value) {
+ params[key] = value;
+ LOG_INFO("Configure \"" << key << "\" to \"" << value << "\"");
+ };
+
+ auto createClient = [&]() -> Client {
+ ClientConfiguration conf;
+ conf.setAuth(AuthOauth2::create(params));
+ return {"pulsar://localhost:6650", conf};
+ };
+
+ const std::string topic = "AuthPluginTest-testOauth2Failure";
+ Producer producer;
+
+ // No issuer_url
+ auto client1 = createClient();
+ ASSERT_EQ(client1.createProducer(topic, producer),
ResultAuthenticationError);
+ client1.close();
+
+ // Invalid issuer_url
+ addKeyValue("issuer_url", "hello");
+ auto client2 = createClient();
+ ASSERT_EQ(client2.createProducer(topic, producer),
ResultAuthenticationError);
+ client2.close();
+
+ addKeyValue("issuer_url", "https://google.com");
+ auto client3 = createClient();
+ ASSERT_EQ(client3.createProducer(topic, producer),
ResultAuthenticationError);
+ client3.close();
+
+ // No client id and secret
+ addKeyValue("issuer_url", "https://dev-kt-aa9ne.us.auth0.com");
+ auto client4 = createClient();
+ ASSERT_EQ(client4.createProducer(topic, producer),
ResultAuthenticationError);
+ client4.close();
+
+ // Invalid client_id and client_secret
+ addKeyValue("client_id", "my_id");
+ addKeyValue("client_secret", "my-secret");
+ auto client5 = createClient();
+ ASSERT_EQ(client5.createProducer(topic, producer),
ResultAuthenticationError);
+ client5.close();
+}