github-actions[bot] commented on code in PR #61324:
URL: https://github.com/apache/doris/pull/61324#discussion_r3007358631
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java:
##########
@@ -46,6 +46,20 @@ public enum KafkaConfiguration {
KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER("kafka_text_table_name_field_delimiter",
",",
value -> value.replace(" ", ""));
+
+ // Common Kafka property names used for security and authentication
+ // These are passed through property.* prefix and validated for correctness
+ public static final String SECURITY_PROTOCOL = "security.protocol";
+ public static final String SASL_MECHANISM = "sasl.mechanism";
+ public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
+
+ // AWS MSK IAM authentication properties
+ public static final String AWS_REGION = "aws.region";
+ public static final String AWS_ACCESS_KEY = "aws.access_key";
+ public static final String AWS_SECRET_KEY = "aws.secret_key";
Review Comment:
**Bug (Critical — FE-BE Mismatch):** These four constant values do **not
match** the property keys used on the BE side
(`be/src/runtime/aws_msk_iam_auth.cpp` lines 368-372), nor do they match the
property names used in the PR's SQL examples or FE tests.
| Constant | Current FE value | BE lookup key | SQL example |
|---|---|---|---|
| `AWS_ACCESS_KEY` | `"aws.access_key"` | `"aws.access.key"` |
`"aws.access.key"` |
| `AWS_SECRET_KEY` | `"aws.secret_key"` | `"aws.secret.key"` |
`"aws.secret.key"` |
| `AWS_MSK_IAM_ROLE_ARN` | `"aws. role_arn"` (**note the space!**) |
`"aws.msk.iam.role.arn"` | `"aws.msk.iam.role.arn"` |
| `AWS_PROFILE_NAME` | `"aws.profile_name"` | `"aws.profile.name"` |
`"aws.profile.name"` |
**Consequences:**
1. FE validation in `validateAwsMskIamConfig()` uses these constants to
check `customKafkaProperties.containsKey(...)`. Since properties arrive via the
`"property."` prefix path and get stripped to dot-style keys (e.g.,
`"aws.access.key"`), the `containsKey` lookups with underscore-style constants
will **never match**.
2. This means `hasAccessKey`, `hasSecretKey`, `hasRoleArn`, `hasProfileName`
are **always false** in the validation code, making most validation checks
ineffective.
3. At least 2 FE tests should fail: `testMissingRegionWithRoleArn` (expects
exception but `isAwsMskIam` is false since role ARN key doesn't match), and
`testPublicAccessWithExplicitCredentials` (expects success but validation can't
see the credentials).
Fix all four constants to match the dot-style names:
```java
public static final String AWS_ACCESS_KEY = "aws.access.key";
public static final String AWS_SECRET_KEY = "aws.secret.key";
public static final String AWS_MSK_IAM_ROLE_ARN = "aws.msk.iam.role.arn";
public static final String AWS_PROFILE_NAME = "aws.profile.name";
```
##########
be/src/load/routine_load/data_consumer.cpp:
##########
@@ -99,6 +100,15 @@ Status
KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
}
for (auto& item : ctx->kafka_info->properties) {
+ _custom_properties.emplace(item.first, item.second);
+
+ // AWS properties (aws.*) are Doris-specific for MSK IAM authentication
+ // and should not be passed to librdkafka
+ if (starts_with(item.second, "AWS:")) {
+ LOG(INFO) << "Skipping AWS property for librdkafka: " <<
item.first;
Review Comment:
**Bug (High):** This checks `item.second` (the property **value**), but AWS
property values are things like `"us-east-1"`, `"AKIAIOSFODNN7EXAMPLE"`, etc. —
none start with `"AWS:"`. This filter is dead code and never matches.
The intent appears to be filtering AWS properties by **key** prefix. The
correct check should be:
```cpp
if (starts_with(item.first, "aws.")) {
LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
continue;
}
```
Currently the AWS properties are passed to `set_conf()` which calls
`conf->set()` on librdkafka. Librdkafka silently ignores unknown keys (returns
`CONF_UNKNOWN` which maps to `Status::OK()`), so the feature happens to work
despite this bug. But this is fragile — a future librdkafka version could treat
unknown keys as errors.
##########
be/src/load/routine_load/data_consumer.cpp:
##########
@@ -315,6 +336,20 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
}
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>*
partition_ids) {
+ if (_aws_msk_oauth_callback) {
+ // Trigger OAuth token refresh by polling the event loop
+ // librdkafka's OAUTHBEARER callback is only triggered through
consume()/poll()
+ // Without this, metadata() will fail because broker is waiting for
OAuth token
+ LOG(INFO) << "Polling to trigger OAuth token refresh before metadata
request";
+ int max_poll_attempts = 10; // 10 × 500ms = 5 seconds max
+ for (int i = 0; i < max_poll_attempts; i++) {
Review Comment:
**Performance concern (Medium):** This unconditionally polls for 10 × 500ms
= 5 seconds every time `get_partition_meta()` is called with OAuth enabled.
This adds significant latency to partition discovery.
Consider:
1. Breaking out of the loop early once the token refresh callback has been
invoked (e.g., check a flag set by the callback).
2. Reducing the iteration count or timeout.
3. Using `_k_consumer->poll(500)` instead of `consume(500)` since we only
need the event loop to trigger the callback, not actual messages.
##########
be/src/runtime/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+ _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
AwsMskIamAuth::_create_credentials_provider() {
+ if (!_config.role_arn.empty() && !_config.access_key.empty() &&
!_config.secret_key.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with explicit credentials
(cross-account): "
+ << _config.role_arn << " (Access Key ID: " <<
_config.access_key.substr(0, 4)
+ << "****)";
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ // Use explicit AK/SK as base credentials to assume the role
+ Aws::Auth::AWSCredentials base_credentials(_config.access_key,
_config.secret_key);
+ auto base_provider =
+
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider,
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 2. Explicit AK/SK credentials (direct access)
+ if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+ LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+ << _config.access_key.substr(0, 4) << "****)";
+
+ Aws::Auth::AWSCredentials credentials(_config.access_key,
_config.secret_key);
+
+ return
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+ }
+ // 3. Assume Role with Instance Profile (for same-account access from
within AWS)
+ if (!_config.role_arn.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " <<
_config.role_arn;
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(
+
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(),
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 4. AWS Profile (reads from ~/.aws/credentials)
+ if (!_config.profile_name.empty()) {
+ LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+ return
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+ _config.profile_name.c_str());
+ }
+ // 5. Custom Credentials Provider
+ if (!_config.credentials_provider.empty()) {
+ LOG(INFO) << "Using custom credentials provider: " <<
_config.credentials_provider;
+
+ // Parse credentials provider type string
+ std::string provider_upper = _config.credentials_provider;
+ std::transform(provider_upper.begin(), provider_upper.end(),
provider_upper.begin(),
+ ::toupper);
+
+ if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+ return
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+ } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper ==
"INSTANCEPROFILE") {
+ return
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+ } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+ return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+ } else if (provider_upper == "DEFAULT") {
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ } else {
+ LOG(WARNING) << "Unknown credentials provider type: " <<
_config.credentials_provider
+ << ", falling back to default credentials provider
chain";
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+ }
+ // No valid credentials configuration found
+ LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please
provide.";
+ return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ if (!_credentials_provider) {
+ return Status::InternalError("AWS credentials provider not
initialized");
+ }
+
+ // Refresh if needed
+ if (_should_refresh_credentials()) {
+ _cached_credentials = _credentials_provider->GetAWSCredentials();
+ if (_cached_credentials.GetAWSAccessKeyId().empty()) {
+ return Status::InternalError("Failed to get AWS credentials");
+ }
+
+ // Set expiry time (assume 1 hour for instance profile, or use the
credentials expiration)
+ _credentials_expiry = std::chrono::system_clock::now() +
std::chrono::hours(1);
+
+ LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication";
+ }
+
+ *credentials = _cached_credentials;
+ return Status::OK();
+}
+
+bool AwsMskIamAuth::_should_refresh_credentials() {
+ auto now = std::chrono::system_clock::now();
+ auto refresh_time =
+ _credentials_expiry -
std::chrono::milliseconds(_config.token_refresh_margin_ms);
+ return now >= refresh_time ||
_cached_credentials.GetAWSAccessKeyId().empty();
+}
+
+Status AwsMskIamAuth::generate_token(const std::string& broker_hostname,
std::string* token,
+ int64_t* token_lifetime_ms) {
+ Aws::Auth::AWSCredentials credentials;
+ RETURN_IF_ERROR(get_credentials(&credentials));
+
+ std::string timestamp = _get_timestamp();
+ std::string date_stamp = _get_date_stamp(timestamp);
+
+ // AWS MSK IAM token is a base64-encoded presigned URL
+ // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
+
+ // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM
signer reference)
+ static constexpr int TOKEN_EXPIRY_SECONDS = 900;
+
+ // Build the endpoint URL
+ std::string endpoint_url = "https://kafka." + _config.region +
".amazonaws.com/";
+
+ // Build credential scope
+ std::string credential_scope =
+ date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request";
+
+ // Build the canonical query string (sorted alphabetically)
+ // IMPORTANT: All query parameters must be included in the signature
calculation
+ // Session Token must be in canonical query string if using temporary
credentials
+ std::stringstream canonical_query_ss;
+ canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded :
+
+ // Add Algorithm
+ canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256";
+
+ // Add Credential
+ std::string credential = std::string(credentials.GetAWSAccessKeyId()) +
"/" + credential_scope;
+ canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential);
+
+ // Add Date
+ canonical_query_ss << "&X-Amz-Date=" << timestamp;
+
+ // Add Expires
+ canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS;
+
+ // Add Security Token if present (MUST be before signature calculation)
+ if (!credentials.GetSessionToken().empty()) {
+ canonical_query_ss << "&X-Amz-Security-Token="
+ <<
_url_encode(std::string(credentials.GetSessionToken()));
+ }
+
+ // Add SignedHeaders
+ canonical_query_ss << "&X-Amz-SignedHeaders=host";
+
+ std::string canonical_query_string = canonical_query_ss.str();
+
+ // Build the canonical headers
+ std::string host = "kafka." + _config.region + ".amazonaws.com";
+ std::string canonical_headers = "host:" + host + "\n";
+ std::string signed_headers = "host";
+
+ // Build the canonical request
+ std::string method = "GET";
+ std::string uri = "/";
+ std::string payload_hash = _sha256("");
+
+ std::string canonical_request = method + "\n" + uri + "\n" +
canonical_query_string + "\n" +
+ canonical_headers + "\n" + signed_headers
+ "\n" + payload_hash;
+
+ // Build the string to sign
+ std::string algorithm = "AWS4-HMAC-SHA256";
+ std::string canonical_request_hash = _sha256(canonical_request);
+ std::string string_to_sign =
+ algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" +
canonical_request_hash;
+
+ // Calculate signature
+ std::string signing_key =
_calculate_signing_key(std::string(credentials.GetAWSSecretKey()),
+ date_stamp,
_config.region, "kafka-cluster");
+ std::string signature = _hmac_sha256_hex(signing_key, string_to_sign);
+
+ // Build the final presigned URL
+ // All parameters are already in canonical_query_string, just add signature
+ // Then add User-Agent AFTER signature (not part of signed content,
matching reference impl)
+ std::string signed_url = endpoint_url + "?" + canonical_query_string +
+ "&X-Amz-Signature=" + signature +
+ "&User-Agent=doris-msk-iam-auth%2F1.0";
+
+ // Base64url encode the signed URL (without padding)
+ *token = _base64url_encode(signed_url);
+
+ // Token lifetime in milliseconds
+ *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000;
+
+ LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url;
+ return Status::OK();
Review Comment:
**Security concern (Medium):** The presigned URL contains security-sensitive
information:
- `X-Amz-Credential` includes the AWS Access Key ID
- `X-Amz-Security-Token` (when using temporary credentials) contains the
full session token
Logging this at INFO level means it will appear in production logs. Consider
reducing to `VLOG` or at minimum masking the credential values:
```cpp
VLOG_NOTICE << "Generated AWS MSK IAM token for region: " << _config.region;
```
##########
be/src/runtime/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+ _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
AwsMskIamAuth::_create_credentials_provider() {
+ if (!_config.role_arn.empty() && !_config.access_key.empty() &&
!_config.secret_key.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with explicit credentials
(cross-account): "
+ << _config.role_arn << " (Access Key ID: " <<
_config.access_key.substr(0, 4)
+ << "****)";
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ // Use explicit AK/SK as base credentials to assume the role
+ Aws::Auth::AWSCredentials base_credentials(_config.access_key,
_config.secret_key);
+ auto base_provider =
+
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider,
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 2. Explicit AK/SK credentials (direct access)
+ if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+ LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+ << _config.access_key.substr(0, 4) << "****)";
+
+ Aws::Auth::AWSCredentials credentials(_config.access_key,
_config.secret_key);
+
+ return
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+ }
+ // 3. Assume Role with Instance Profile (for same-account access from
within AWS)
+ if (!_config.role_arn.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " <<
_config.role_arn;
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(
+
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(),
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 4. AWS Profile (reads from ~/.aws/credentials)
+ if (!_config.profile_name.empty()) {
+ LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+ return
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+ _config.profile_name.c_str());
+ }
+ // 5. Custom Credentials Provider
+ if (!_config.credentials_provider.empty()) {
+ LOG(INFO) << "Using custom credentials provider: " <<
_config.credentials_provider;
+
+ // Parse credentials provider type string
+ std::string provider_upper = _config.credentials_provider;
+ std::transform(provider_upper.begin(), provider_upper.end(),
provider_upper.begin(),
+ ::toupper);
+
+ if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+ return
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+ } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper ==
"INSTANCEPROFILE") {
+ return
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+ } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+ return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+ } else if (provider_upper == "DEFAULT") {
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ } else {
+ LOG(WARNING) << "Unknown credentials provider type: " <<
_config.credentials_provider
+ << ", falling back to default credentials provider
chain";
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+ }
+ // No valid credentials configuration found
+ LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please
provide.";
+ return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ if (!_credentials_provider) {
+ return Status::InternalError("AWS credentials provider not
initialized");
+ }
+
+ // Refresh if needed
+ if (_should_refresh_credentials()) {
+ _cached_credentials = _credentials_provider->GetAWSCredentials();
+ if (_cached_credentials.GetAWSAccessKeyId().empty()) {
+ return Status::InternalError("Failed to get AWS credentials");
+ }
+
+ // Set expiry time (assume 1 hour for instance profile, or use the
credentials expiration)
+ _credentials_expiry = std::chrono::system_clock::now() +
std::chrono::hours(1);
+
+ LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication";
+ }
+
+ *credentials = _cached_credentials;
+ return Status::OK();
+}
+
+bool AwsMskIamAuth::_should_refresh_credentials() {
+ auto now = std::chrono::system_clock::now();
+ auto refresh_time =
+ _credentials_expiry -
std::chrono::milliseconds(_config.token_refresh_margin_ms);
+ return now >= refresh_time ||
_cached_credentials.GetAWSAccessKeyId().empty();
+}
+
+Status AwsMskIamAuth::generate_token(const std::string& broker_hostname,
std::string* token,
+ int64_t* token_lifetime_ms) {
+ Aws::Auth::AWSCredentials credentials;
+ RETURN_IF_ERROR(get_credentials(&credentials));
+
+ std::string timestamp = _get_timestamp();
+ std::string date_stamp = _get_date_stamp(timestamp);
+
+ // AWS MSK IAM token is a base64-encoded presigned URL
+ // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
+
+ // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM
signer reference)
+ static constexpr int TOKEN_EXPIRY_SECONDS = 900;
+
+ // Build the endpoint URL
+ std::string endpoint_url = "https://kafka." + _config.region +
".amazonaws.com/";
+
+ // Build credential scope
+ std::string credential_scope =
+ date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request";
+
+ // Build the canonical query string (sorted alphabetically)
+ // IMPORTANT: All query parameters must be included in the signature
calculation
+ // Session Token must be in canonical query string if using temporary
credentials
+ std::stringstream canonical_query_ss;
+ canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded :
+
+ // Add Algorithm
+ canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256";
+
+ // Add Credential
+ std::string credential = std::string(credentials.GetAWSAccessKeyId()) +
"/" + credential_scope;
+ canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential);
+
+ // Add Date
+ canonical_query_ss << "&X-Amz-Date=" << timestamp;
+
+ // Add Expires
+ canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS;
+
+ // Add Security Token if present (MUST be before signature calculation)
+ if (!credentials.GetSessionToken().empty()) {
+ canonical_query_ss << "&X-Amz-Security-Token="
+ <<
_url_encode(std::string(credentials.GetSessionToken()));
+ }
+
+ // Add SignedHeaders
+ canonical_query_ss << "&X-Amz-SignedHeaders=host";
+
+ std::string canonical_query_string = canonical_query_ss.str();
+
+ // Build the canonical headers
+ std::string host = "kafka." + _config.region + ".amazonaws.com";
+ std::string canonical_headers = "host:" + host + "\n";
+ std::string signed_headers = "host";
+
+ // Build the canonical request
+ std::string method = "GET";
+ std::string uri = "/";
+ std::string payload_hash = _sha256("");
+
+ std::string canonical_request = method + "\n" + uri + "\n" +
canonical_query_string + "\n" +
+ canonical_headers + "\n" + signed_headers
+ "\n" + payload_hash;
+
+ // Build the string to sign
+ std::string algorithm = "AWS4-HMAC-SHA256";
+ std::string canonical_request_hash = _sha256(canonical_request);
+ std::string string_to_sign =
+ algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" +
canonical_request_hash;
+
+ // Calculate signature
+ std::string signing_key =
_calculate_signing_key(std::string(credentials.GetAWSSecretKey()),
+ date_stamp,
_config.region, "kafka-cluster");
+ std::string signature = _hmac_sha256_hex(signing_key, string_to_sign);
+
+ // Build the final presigned URL
+ // All parameters are already in canonical_query_string, just add signature
+ // Then add User-Agent AFTER signature (not part of signed content,
matching reference impl)
+ std::string signed_url = endpoint_url + "?" + canonical_query_string +
+ "&X-Amz-Signature=" + signature +
+ "&User-Agent=doris-msk-iam-auth%2F1.0";
+
+ // Base64url encode the signed URL (without padding)
+ *token = _base64url_encode(signed_url);
+
+ // Token lifetime in milliseconds
+ *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000;
+
+ LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url;
+ return Status::OK();
+}
+
+std::string AwsMskIamAuth::_hmac_sha256_hex(const std::string& key, const
std::string& data) {
+ std::string raw = _hmac_sha256(key, data);
+ std::stringstream ss;
+ for (unsigned char c : raw) {
+ ss << std::hex << std::setw(2) << std::setfill('0') <<
static_cast<int>(c);
+ }
+ return ss.str();
+}
+
+std::string AwsMskIamAuth::_url_encode(const std::string& value) {
+ std::ostringstream escaped;
+ escaped.fill('0');
+ escaped << std::hex;
+
+ for (char c : value) {
+ // Keep alphanumeric and other accepted characters intact
+ if (isalnum(static_cast<unsigned char>(c)) || c == '-' || c == '_' ||
c == '.' ||
+ c == '~') {
+ escaped << c;
+ } else {
+ // Any other characters are percent-encoded
+ escaped << std::uppercase;
+ escaped << '%' << std::setw(2) <<
static_cast<int>(static_cast<unsigned char>(c));
+ escaped << std::nouppercase;
+ }
+ }
+
+ return escaped.str();
+}
+
+std::string AwsMskIamAuth::_base64url_encode(const std::string& input) {
+ // Standard base64 alphabet
+ static const char* base64_chars =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+
+ std::string result;
+ result.reserve(((input.size() + 2) / 3) * 4);
+
+ const unsigned char* bytes = reinterpret_cast<const unsigned
char*>(input.c_str());
+ size_t len = input.size();
+
+ for (size_t i = 0; i < len; i += 3) {
+ uint32_t n = static_cast<uint32_t>(bytes[i]) << 16;
+ if (i + 1 < len) n |= static_cast<uint32_t>(bytes[i + 1]) << 8;
+ if (i + 2 < len) n |= static_cast<uint32_t>(bytes[i + 2]);
+
+ result += base64_chars[(n >> 18) & 0x3F];
+ result += base64_chars[(n >> 12) & 0x3F];
+ if (i + 1 < len) result += base64_chars[(n >> 6) & 0x3F];
+ if (i + 2 < len) result += base64_chars[n & 0x3F];
+ }
+
+ // Convert to URL-safe base64 (replace + with -, / with _)
+ // and remove padding (=)
+ for (char& c : result) {
+ if (c == '+')
+ c = '-';
+ else if (c == '/')
+ c = '_';
+ }
+
+ return result;
+}
+
+std::string AwsMskIamAuth::_calculate_signing_key(const std::string&
secret_key,
+ const std::string&
date_stamp,
+ const std::string& region,
+ const std::string& service) {
+ std::string k_secret = "AWS4" + secret_key;
+ std::string k_date = _hmac_sha256(k_secret, date_stamp);
+ std::string k_region = _hmac_sha256(k_date, region);
+ std::string k_service = _hmac_sha256(k_region, service);
+ std::string k_signing = _hmac_sha256(k_service, "aws4_request");
+ return k_signing;
+}
+
+std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const
std::string& data) {
+ unsigned char* digest;
Review Comment:
**Bug (Critical — Thread Safety):** `HMAC()` is called with `md=nullptr`
(line 330-331), which makes it return a pointer to a **process-global static
buffer**. OpenSSL documentation explicitly states: *"passing a NULL value for
md to use the static array is not thread safe."*
The signing operations (`_calculate_signing_key` calls `_hmac_sha256` 4
times, `_hmac_sha256_hex` calls it once) all execute **outside** `_mutex` (the
mutex only protects `get_credentials()`). When multiple Kafka routine load jobs
with MSK IAM auth are running, concurrent calls to this function will race on
the same static buffer, producing corrupted HMAC results and invalid AWS
signatures.
Fix: provide a caller-owned buffer:
```cpp
std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const
std::string& data) {
unsigned char digest[EVP_MAX_MD_SIZE];
unsigned int digest_len = 0;
unsigned char* result = HMAC(EVP_sha256(), key.c_str(),
static_cast<int>(key.length()),
reinterpret_cast<const unsigned
char*>(data.c_str()), data.length(),
digest, &digest_len);
if (result == nullptr) {
return {};
}
return {reinterpret_cast<char*>(digest), digest_len};
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]