Copilot commented on code in PR #61324: URL: https://github.com/apache/doris/pull/61324#discussion_r3007350964
########## be/src/runtime/aws_msk_iam_auth.cpp: ########## @@ -0,0 +1,506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/aws_msk_iam_auth.h" + +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> +#include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/platform/Environment.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> +#include <aws/sts/STSClient.h> +#include <aws/sts/model/AssumeRoleRequest.h> +#include <openssl/hmac.h> +#include <openssl/sha.h> + +#include <algorithm> +#include <chrono> +#include <iomanip> +#include <sstream> + +#include "common/logging.h" + +namespace doris { + +AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) { + _credentials_provider = _create_credentials_provider(); +} + +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> AwsMskIamAuth::_create_credentials_provider() { + if (!_config.role_arn.empty() && !_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with explicit credentials (cross-account): " + << _config.role_arn << " (Access Key ID: " << _config.access_key.substr(0, 4) + << "****)"; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + // Use explicit AK/SK as base credentials to assume the role + Aws::Auth::AWSCredentials base_credentials(_config.access_key, _config.secret_key); + auto base_provider = + std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials); + + auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 2. Explicit AK/SK credentials (direct access) + if (!_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using explicit AWS credentials (Access Key ID: " + << _config.access_key.substr(0, 4) << "****)"; + + Aws::Auth::AWSCredentials credentials(_config.access_key, _config.secret_key); + + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials); + } + // 3. Assume Role with Instance Profile (for same-account access from within AWS) + if (!_config.role_arn.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << _config.role_arn; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + auto sts_client = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 4. AWS Profile (reads from ~/.aws/credentials) + if (!_config.profile_name.empty()) { + LOG(INFO) << "Using AWS Profile: " << _config.profile_name; + + return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>( + _config.profile_name.c_str()); + } + // 5. Custom Credentials Provider + if (!_config.credentials_provider.empty()) { + LOG(INFO) << "Using custom credentials provider: " << _config.credentials_provider; + + // Parse credentials provider type string + std::string provider_upper = _config.credentials_provider; + std::transform(provider_upper.begin(), provider_upper.end(), provider_upper.begin(), + ::toupper); + + if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") { + return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); + } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == "INSTANCEPROFILE") { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") { + return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( + Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); + } else if (provider_upper == "DEFAULT") { + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } else { + LOG(WARNING) << "Unknown credentials provider type: " << _config.credentials_provider + << ", falling back to default credentials provider chain"; + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } + } + // No valid credentials configuration found + LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please provide."; + return nullptr; +} + +Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) { + std::lock_guard<std::mutex> lock(_mutex); + + if (!_credentials_provider) { + return Status::InternalError("AWS credentials provider not initialized"); + } + + // Refresh if needed + if (_should_refresh_credentials()) { + _cached_credentials = _credentials_provider->GetAWSCredentials(); + if (_cached_credentials.GetAWSAccessKeyId().empty()) { + return Status::InternalError("Failed to get AWS credentials"); + } + + // Set expiry time (assume 1 hour for instance profile, or use the credentials expiration) + _credentials_expiry = std::chrono::system_clock::now() + std::chrono::hours(1); + + LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication"; + } + + *credentials = _cached_credentials; + return Status::OK(); +} + +bool AwsMskIamAuth::_should_refresh_credentials() { + auto now = std::chrono::system_clock::now(); + auto refresh_time = + _credentials_expiry - std::chrono::milliseconds(_config.token_refresh_margin_ms); + return now >= refresh_time || _cached_credentials.GetAWSAccessKeyId().empty(); +} + +Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, std::string* token, + int64_t* token_lifetime_ms) { + Aws::Auth::AWSCredentials credentials; + RETURN_IF_ERROR(get_credentials(&credentials)); + + std::string timestamp = _get_timestamp(); + std::string date_stamp = _get_date_stamp(timestamp); + + // AWS MSK IAM token is a base64-encoded presigned URL + // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python + + // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM signer reference) + static constexpr int TOKEN_EXPIRY_SECONDS = 900; + + // Build the endpoint URL + std::string endpoint_url = "https://kafka." + _config.region + ".amazonaws.com/"; + + // Build credential scope + std::string credential_scope = + date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request"; + + // Build the canonical query string (sorted alphabetically) + // IMPORTANT: All query parameters must be included in the signature calculation + // Session Token must be in canonical query string if using temporary credentials + std::stringstream canonical_query_ss; + canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded : + + // Add Algorithm + canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256"; + + // Add Credential + std::string credential = std::string(credentials.GetAWSAccessKeyId()) + "/" + credential_scope; + canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential); + + // Add Date + canonical_query_ss << "&X-Amz-Date=" << timestamp; + + // Add Expires + canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS; + + // Add Security Token if present (MUST be before signature calculation) + if (!credentials.GetSessionToken().empty()) { + canonical_query_ss << "&X-Amz-Security-Token=" + << _url_encode(std::string(credentials.GetSessionToken())); + } + + // Add SignedHeaders + canonical_query_ss << "&X-Amz-SignedHeaders=host"; + + std::string canonical_query_string = canonical_query_ss.str(); + + // Build the canonical headers + std::string host = "kafka." + _config.region + ".amazonaws.com"; + std::string canonical_headers = "host:" + host + "\n"; + std::string signed_headers = "host"; + + // Build the canonical request + std::string method = "GET"; + std::string uri = "/"; + std::string payload_hash = _sha256(""); + + std::string canonical_request = method + "\n" + uri + "\n" + canonical_query_string + "\n" + + canonical_headers + "\n" + signed_headers + "\n" + payload_hash; + + // Build the string to sign + std::string algorithm = "AWS4-HMAC-SHA256"; + std::string canonical_request_hash = _sha256(canonical_request); + std::string string_to_sign = + algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + canonical_request_hash; + + // Calculate signature + std::string signing_key = _calculate_signing_key(std::string(credentials.GetAWSSecretKey()), + date_stamp, _config.region, "kafka-cluster"); + std::string signature = _hmac_sha256_hex(signing_key, string_to_sign); + + // Build the final presigned URL + // All parameters are already in canonical_query_string, just add signature + // Then add User-Agent AFTER signature (not part of signed content, matching reference impl) + std::string signed_url = endpoint_url + "?" + canonical_query_string + + "&X-Amz-Signature=" + signature + + "&User-Agent=doris-msk-iam-auth%2F1.0"; + + // Base64url encode the signed URL (without padding) + *token = _base64url_encode(signed_url); + + // Token lifetime in milliseconds + *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000; + + LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url; Review Comment: generate_token() logs the full presigned URL at INFO level. This URL contains SigV4 query params (including access key id, signature, and potentially session token), which should be treated as sensitive and must not be written to logs. Please remove this log or redact sensitive query parameters (or log only high-level metadata like region/expiry). ```suggestion LOG(INFO) << "Generated AWS MSK IAM token for broker " << broker_hostname << " in region " << _config.region << ", validity " << TOKEN_EXPIRY_SECONDS << " seconds."; ``` ########## be/src/runtime/aws_msk_iam_auth.h: ########## @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <librdkafka/rdkafkacpp.h> + +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "common/status.h" + +namespace doris { +/** + * AWS MSK IAM authentication token generator. + * + * This class generates SASL/OAUTHBEARER tokens for AWS MSK IAM authentication. + * It uses AWS SDK for C++ to obtain IAM credentials and generates signed tokens + * that can be used with librdkafka's OAUTHBEARER mechanism. + */ +class AwsMskIamAuth { +public: + struct Config { + std::string region; // AWS region (e.g., "us-east-1"), required + std::string access_key; // AWS Access Key ID (optional, for explicit credentials) + std::string secret_key; // AWS Secret Access Key (optional, for explicit credentials) + std::string role_arn; // IAM role ARN (optional, for assume role) + std::string profile_name; // AWS profile name (optional, reads from ~/.aws/credentials) + std::string + credentials_provider; // Credentials provider type (optional, e.g., "ENV", "INSTANCE_PROFILE") + int token_refresh_margin_ms = 60000; // Refresh token 60s before expiry + }; + + explicit AwsMskIamAuth(Config config); + ~AwsMskIamAuth() = default; + + /** + * Generate AWS MSK IAM authentication token. + * + * The token is a base64url-encoded presigned URL following AWS SigV4 format: + * https://kafka.<region>.amazonaws.com/?Action=kafka-cluster:Connect + * &X-Amz-Algorithm=AWS4-HMAC-SHA256 + * &X-Amz-Credential=<access-key>/<date>/<region>/kafka-cluster/aws4_request + * &X-Amz-Date=<timestamp> + * &X-Amz-Expires=900 + * &X-Amz-SignedHeaders=host + * &X-Amz-Signature=<signature> + * &X-Amz-Security-Token=<session-token> // if using temporary credentials + * &User-Agent=doris-msk-iam-auth/1.0 + * + * Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python + * + * @param broker_hostname The MSK broker hostname (used for logging, not in token) + * @param token Output: base64url-encoded signed URL token + * @param token_lifetime_ms Output: token lifetime in milliseconds (3600000ms = 1 hour) + * @return Status indicating success or failure + */ + Status generate_token(const std::string& broker_hostname, std::string* token, + int64_t* token_lifetime_ms); + + /** + * Get current AWS credentials. + * This will refresh credentials if they are expired or about to expire. + */ + Status get_credentials(Aws::Auth::AWSCredentials* credentials); + +private: + // Create AWS credentials provider based on configuration + std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _create_credentials_provider(); + + // HMAC-SHA256 returning hex string + std::string _hmac_sha256_hex(const std::string& key, const std::string& data); + + std::string _url_encode(const std::string& value); + + std::string _base64url_encode(const std::string& input); + + // Calculate AWS SigV4 signing key + std::string _calculate_signing_key(const std::string& secret_key, const std::string& date_stamp, + const std::string& region, const std::string& service); + + std::string _hmac_sha256(const std::string& key, const std::string& data); + + std::string _sha256(const std::string& data); + + std::string _get_timestamp(); + + std::string _get_date_stamp(const std::string& timestamp); + + bool _should_refresh_credentials(); + + Config _config; + std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _credentials_provider; + std::mutex _mutex; + Aws::Auth::AWSCredentials _cached_credentials; + std::chrono::time_point<std::chrono::system_clock> _credentials_expiry; +}; Review Comment: aws_msk_iam_auth.h uses std::chrono::time_point/_credentials_expiry but doesn’t include <chrono>. Since aws_msk_iam_auth.cpp includes this header before including <chrono>, this is a compile error (missing std::chrono definition). Add the required header include(s) to aws_msk_iam_auth.h. ########## be/src/load/routine_load/data_consumer.cpp: ########## @@ -315,6 +336,20 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, } Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) { + if (_aws_msk_oauth_callback) { + // Trigger OAuth token refresh by polling the event loop + // librdkafka's OAUTHBEARER callback is only triggered through consume()/poll() + // Without this, metadata() will fail because broker is waiting for OAuth token + LOG(INFO) << "Polling to trigger OAuth token refresh before metadata request"; + int max_poll_attempts = 10; // 10 × 500ms = 5 seconds max + for (int i = 0; i < max_poll_attempts; i++) { + RdKafka::Message* msg = _k_consumer->consume(500); + if (msg) { + delete msg; // We don't expect messages before partition assignment + } Review Comment: get_partition_meta() uses _k_consumer->consume() in a loop to trigger the OAUTHBEARER refresh callback. consume() can have side effects (it may consume real messages / advance internal state) and adds a fixed ~5s delay (10×500ms) to every metadata request. Prefer driving the librdkafka event loop via poll() instead of consume(), and consider breaking early once the token has been set / authentication succeeds. ```suggestion // Trigger OAuth token refresh by polling the event loop. // librdkafka's OAUTHBEARER callback is only triggered through consume()/poll(). // Without this, metadata() may fail because the broker is waiting for an OAuth token. LOG(INFO) << "Polling to trigger OAuth token refresh before metadata request"; int max_poll_attempts = 10; // 10 × 500ms = 5 seconds max for (int i = 0; i < max_poll_attempts; i++) { // Use poll() instead of consume() to avoid consuming real messages // or advancing the consumer's internal state. _k_consumer->poll(500); ``` ########## be/test/runtime/aws_msk_iam_auth_test.cpp: ########## @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/aws_msk_iam_auth.h" + +#include <gtest/gtest.h> + +#include <memory> +#include <string> + +#include "common/status.h" + +namespace doris { + +class AwsMskIamAuthTest : public testing::Test { +protected: + void SetUp() override { + // Setup test configuration + config.region = "us-east-1"; + } + + AwsMskIamAuth::Config config; +}; + +TEST_F(AwsMskIamAuthTest, TestConfigCreation) { + // Test basic configuration creation + AwsMskIamAuth auth(config); + + // This test just ensures the object can be created without crashing + ASSERT_TRUE(true); +} + +TEST_F(AwsMskIamAuthTest, TestTokenGeneration) { + // This test requires AWS credentials to be available + // In a real environment, you would mock the AWS SDK or use test credentials + + AwsMskIamAuth auth(config); + std::string token; + int64_t token_lifetime_ms; + + std::string broker_hostname = "b-1.test-msk.us-east-1.amazonaws.com"; + + // In a real test environment with credentials, this should succeed + // For CI/CD without credentials, we expect it to fail gracefully + Status status = auth.generate_token(broker_hostname, &token, &token_lifetime_ms); + + if (status.ok()) { + // If we have credentials, verify token properties + ASSERT_FALSE(token.empty()); + ASSERT_GT(token_lifetime_ms, 0); + ASSERT_LT(token_lifetime_ms, 3600000); // Less than 1 hour + + // Token should be valid JSON + ASSERT_NE(token.find("version"), std::string::npos); + ASSERT_NE(token.find("host"), std::string::npos); + ASSERT_NE(token.find(broker_hostname), std::string::npos); Review Comment: TestTokenGeneration asserts the generated token contains JSON fields like "version"/"host", but AwsMskIamAuth::generate_token() returns a base64url-encoded presigned URL (not JSON). If this test ever runs in an environment with AWS credentials (status.ok()), it will fail. Consider decoding the token and validating expected URL components instead, or adjust the assertion to match the actual token format. ```suggestion // AwsMskIamAuth::generate_token() returns a base64url-encoded presigned URL, // not JSON. Here we only validate basic properties of the returned token. ``` ########## be/src/runtime/aws_msk_iam_auth.cpp: ########## @@ -0,0 +1,506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/aws_msk_iam_auth.h" + +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> +#include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/platform/Environment.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> +#include <aws/sts/STSClient.h> +#include <aws/sts/model/AssumeRoleRequest.h> +#include <openssl/hmac.h> +#include <openssl/sha.h> + +#include <algorithm> +#include <chrono> +#include <iomanip> +#include <sstream> + +#include "common/logging.h" + +namespace doris { + +AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) { + _credentials_provider = _create_credentials_provider(); +} + +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> AwsMskIamAuth::_create_credentials_provider() { + if (!_config.role_arn.empty() && !_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with explicit credentials (cross-account): " + << _config.role_arn << " (Access Key ID: " << _config.access_key.substr(0, 4) + << "****)"; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + // Use explicit AK/SK as base credentials to assume the role + Aws::Auth::AWSCredentials base_credentials(_config.access_key, _config.secret_key); + auto base_provider = + std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials); + + auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 2. Explicit AK/SK credentials (direct access) + if (!_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using explicit AWS credentials (Access Key ID: " + << _config.access_key.substr(0, 4) << "****)"; + + Aws::Auth::AWSCredentials credentials(_config.access_key, _config.secret_key); + + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials); + } + // 3. Assume Role with Instance Profile (for same-account access from within AWS) + if (!_config.role_arn.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << _config.role_arn; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + auto sts_client = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 4. AWS Profile (reads from ~/.aws/credentials) + if (!_config.profile_name.empty()) { + LOG(INFO) << "Using AWS Profile: " << _config.profile_name; + + return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>( + _config.profile_name.c_str()); + } + // 5. Custom Credentials Provider + if (!_config.credentials_provider.empty()) { + LOG(INFO) << "Using custom credentials provider: " << _config.credentials_provider; + + // Parse credentials provider type string + std::string provider_upper = _config.credentials_provider; + std::transform(provider_upper.begin(), provider_upper.end(), provider_upper.begin(), + ::toupper); + + if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") { + return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); + } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == "INSTANCEPROFILE") { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") { + return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( + Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); + } else if (provider_upper == "DEFAULT") { + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } else { + LOG(WARNING) << "Unknown credentials provider type: " << _config.credentials_provider + << ", falling back to default credentials provider chain"; + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } + } + // No valid credentials configuration found + LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please provide."; + return nullptr; +} + +Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) { + std::lock_guard<std::mutex> lock(_mutex); + + if (!_credentials_provider) { + return Status::InternalError("AWS credentials provider not initialized"); + } + + // Refresh if needed + if (_should_refresh_credentials()) { + _cached_credentials = _credentials_provider->GetAWSCredentials(); + if (_cached_credentials.GetAWSAccessKeyId().empty()) { + return Status::InternalError("Failed to get AWS credentials"); + } + + // Set expiry time (assume 1 hour for instance profile, or use the credentials expiration) + _credentials_expiry = std::chrono::system_clock::now() + std::chrono::hours(1); + + LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication"; + } + + *credentials = _cached_credentials; + return Status::OK(); +} + +bool AwsMskIamAuth::_should_refresh_credentials() { + auto now = std::chrono::system_clock::now(); + auto refresh_time = + _credentials_expiry - std::chrono::milliseconds(_config.token_refresh_margin_ms); + return now >= refresh_time || _cached_credentials.GetAWSAccessKeyId().empty(); +} + +Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, std::string* token, + int64_t* token_lifetime_ms) { + Aws::Auth::AWSCredentials credentials; + RETURN_IF_ERROR(get_credentials(&credentials)); + + std::string timestamp = _get_timestamp(); + std::string date_stamp = _get_date_stamp(timestamp); + + // AWS MSK IAM token is a base64-encoded presigned URL + // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python + + // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM signer reference) + static constexpr int TOKEN_EXPIRY_SECONDS = 900; + + // Build the endpoint URL + std::string endpoint_url = "https://kafka." + _config.region + ".amazonaws.com/"; + + // Build credential scope + std::string credential_scope = + date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request"; + + // Build the canonical query string (sorted alphabetically) + // IMPORTANT: All query parameters must be included in the signature calculation + // Session Token must be in canonical query string if using temporary credentials + std::stringstream canonical_query_ss; + canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded : + + // Add Algorithm + canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256"; + + // Add Credential + std::string credential = std::string(credentials.GetAWSAccessKeyId()) + "/" + credential_scope; + canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential); + + // Add Date + canonical_query_ss << "&X-Amz-Date=" << timestamp; + + // Add Expires + canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS; + + // Add Security Token if present (MUST be before signature calculation) + if (!credentials.GetSessionToken().empty()) { + canonical_query_ss << "&X-Amz-Security-Token=" + << _url_encode(std::string(credentials.GetSessionToken())); + } + + // Add SignedHeaders + canonical_query_ss << "&X-Amz-SignedHeaders=host"; + + std::string canonical_query_string = canonical_query_ss.str(); + + // Build the canonical headers + std::string host = "kafka." + _config.region + ".amazonaws.com"; + std::string canonical_headers = "host:" + host + "\n"; + std::string signed_headers = "host"; + + // Build the canonical request + std::string method = "GET"; + std::string uri = "/"; + std::string payload_hash = _sha256(""); + + std::string canonical_request = method + "\n" + uri + "\n" + canonical_query_string + "\n" + + canonical_headers + "\n" + signed_headers + "\n" + payload_hash; + + // Build the string to sign + std::string algorithm = "AWS4-HMAC-SHA256"; + std::string canonical_request_hash = _sha256(canonical_request); + std::string string_to_sign = + algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + canonical_request_hash; + + // Calculate signature + std::string signing_key = _calculate_signing_key(std::string(credentials.GetAWSSecretKey()), + date_stamp, _config.region, "kafka-cluster"); + std::string signature = _hmac_sha256_hex(signing_key, string_to_sign); + + // Build the final presigned URL + // All parameters are already in canonical_query_string, just add signature + // Then add User-Agent AFTER signature (not part of signed content, matching reference impl) + std::string signed_url = endpoint_url + "?" + canonical_query_string + + "&X-Amz-Signature=" + signature + + "&User-Agent=doris-msk-iam-auth%2F1.0"; + + // Base64url encode the signed URL (without padding) + *token = _base64url_encode(signed_url); + + // Token lifetime in milliseconds + *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000; + + LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url; + return Status::OK(); +} + +std::string AwsMskIamAuth::_hmac_sha256_hex(const std::string& key, const std::string& data) { + std::string raw = _hmac_sha256(key, data); + std::stringstream ss; + for (unsigned char c : raw) { + ss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(c); + } + return ss.str(); +} + +std::string AwsMskIamAuth::_url_encode(const std::string& value) { + std::ostringstream escaped; + escaped.fill('0'); + escaped << std::hex; + + for (char c : value) { + // Keep alphanumeric and other accepted characters intact + if (isalnum(static_cast<unsigned char>(c)) || c == '-' || c == '_' || c == '.' || + c == '~') { + escaped << c; + } else { + // Any other characters are percent-encoded + escaped << std::uppercase; + escaped << '%' << std::setw(2) << static_cast<int>(static_cast<unsigned char>(c)); + escaped << std::nouppercase; + } + } + + return escaped.str(); +} + +std::string AwsMskIamAuth::_base64url_encode(const std::string& input) { + // Standard base64 alphabet + static const char* base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + std::string result; + result.reserve(((input.size() + 2) / 3) * 4); + + const unsigned char* bytes = reinterpret_cast<const unsigned char*>(input.c_str()); + size_t len = input.size(); + + for (size_t i = 0; i < len; i += 3) { + uint32_t n = static_cast<uint32_t>(bytes[i]) << 16; + if (i + 1 < len) n |= static_cast<uint32_t>(bytes[i + 1]) << 8; + if (i + 2 < len) n |= static_cast<uint32_t>(bytes[i + 2]); + + result += base64_chars[(n >> 18) & 0x3F]; + result += base64_chars[(n >> 12) & 0x3F]; + if (i + 1 < len) result += base64_chars[(n >> 6) & 0x3F]; + if (i + 2 < len) result += base64_chars[n & 0x3F]; + } + + // Convert to URL-safe base64 (replace + with -, / with _) + // and remove padding (=) + for (char& c : result) { + if (c == '+') + c = '-'; + else if (c == '/') + c = '_'; + } + + return result; +} + +std::string AwsMskIamAuth::_calculate_signing_key(const std::string& secret_key, + const std::string& date_stamp, + const std::string& region, + const std::string& service) { + std::string k_secret = "AWS4" + secret_key; + std::string k_date = _hmac_sha256(k_secret, date_stamp); + std::string k_region = _hmac_sha256(k_date, region); + std::string k_service = _hmac_sha256(k_region, service); + std::string k_signing = _hmac_sha256(k_service, "aws4_request"); + return k_signing; +} + +std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const std::string& data) { + unsigned char* digest; + digest = HMAC(EVP_sha256(), key.c_str(), static_cast<int>(key.length()), + reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), nullptr, + nullptr); + return {reinterpret_cast<char*>(digest), SHA256_DIGEST_LENGTH}; +} + +std::string AwsMskIamAuth::_sha256(const std::string& data) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hash); + + std::stringstream ss; + for (unsigned char i : hash) { + ss << std::hex << std::setw(2) << std::setfill('0') << (int)i; + } + return ss.str(); +} + +std::string AwsMskIamAuth::_get_timestamp() { + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + std::tm tm_now; + gmtime_r(&time_t_now, &tm_now); + + std::stringstream ss; + ss << std::put_time(&tm_now, "%Y%m%dT%H%M%SZ"); + return ss.str(); +} + +std::string AwsMskIamAuth::_get_date_stamp(const std::string& timestamp) { + // Extract YYYYMMDD from YYYYMMDDTHHMMSSz + return timestamp.substr(0, 8); +} + +// AwsMskIamOAuthCallback implementation + +namespace { +// Property keys for AWS MSK IAM authentication +constexpr const char* PROP_SECURITY_PROTOCOL = "security.protocol"; +constexpr const char* PROP_SASL_MECHANISM = "sasl.mechanism"; +constexpr const char* PROP_AWS_REGION = "aws.region"; +constexpr const char* PROP_AWS_ACCESS_KEY = "aws.access.key"; +constexpr const char* PROP_AWS_SECRET_KEY = "aws.secret.key"; +constexpr const char* PROP_AWS_ROLE_ARN = "aws.msk.iam.role.arn"; +constexpr const char* PROP_AWS_PROFILE_NAME = "aws.profile.name"; +constexpr const char* PROP_AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; +} // namespace + +std::unique_ptr<AwsMskIamOAuthCallback> AwsMskIamOAuthCallback::create_from_properties( + const std::unordered_map<std::string, std::string>& custom_properties, + const std::string& brokers) { + auto security_protocol_it = custom_properties.find(PROP_SECURITY_PROTOCOL); + auto sasl_mechanism_it = custom_properties.find(PROP_SASL_MECHANISM); + bool is_sasl_ssl = security_protocol_it != custom_properties.end() && + security_protocol_it->second == "SASL_SSL"; + bool is_oauthbearer = sasl_mechanism_it != custom_properties.end() && + sasl_mechanism_it->second == "OAUTHBEARER"; + + if (!is_sasl_ssl || !is_oauthbearer) { + return nullptr; + } Review Comment: AwsMskIamOAuthCallback::create_from_properties() checks security.protocol and sasl.mechanism with exact-case string comparisons ("SASL_SSL" / "OAUTHBEARER"). FE validation uses case-insensitive checks and may pass through lower/other casing, which would prevent the callback from being enabled and break MSK IAM auth. Consider normalizing to upper case or using a case-insensitive comparison here. ########## be/src/load/routine_load/data_consumer.cpp: ########## @@ -99,6 +100,15 @@ Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { } for (auto& item : ctx->kafka_info->properties) { + _custom_properties.emplace(item.first, item.second); + + // AWS properties (aws.*) are Doris-specific for MSK IAM authentication + // and should not be passed to librdkafka + if (starts_with(item.second, "AWS:")) { + LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first; Review Comment: This AWS-property filter checks the *value* for an "AWS:" prefix, but nothing else in the codebase appears to produce values with that prefix (search for "AWS:" only finds this spot). As written, AWS properties (keys like aws.region) will still flow through set_conf(), and the comment/logging is misleading. If the intent is to avoid passing Doris-specific keys to librdkafka, filter by key prefix (e.g., starts_with(item.first, "aws.")) or remove this block since set_conf() already ignores unknown configs. ```suggestion if (starts_with(item.first, "aws.")) { LOG(INFO) << "Skipping AWS-specific Kafka property for librdkafka: " << item.first; ``` ########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java: ########## @@ -46,6 +46,20 @@ public enum KafkaConfiguration { KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER("kafka_text_table_name_field_delimiter", ",", value -> value.replace(" ", "")); + + // Common Kafka property names used for security and authentication + // These are passed through property.* prefix and validated for correctness + public static final String SECURITY_PROTOCOL = "security.protocol"; + public static final String SASL_MECHANISM = "sasl.mechanism"; + public static final String SASL_JAAS_CONFIG = "sasl.jaas.config"; + + // AWS MSK IAM authentication properties + public static final String AWS_REGION = "aws.region"; + public static final String AWS_ACCESS_KEY = "aws.access_key"; + public static final String AWS_SECRET_KEY = "aws.secret_key"; + public static final String AWS_MSK_IAM_ROLE_ARN = "aws. role_arn"; + public static final String AWS_PROFILE_NAME = "aws.profile_name"; Review Comment: KafkaConfiguration AWS-related constant values don’t match the keys used elsewhere in this PR (and in tests/BE). E.g. access/secret keys use underscores ("aws.access_key"/"aws.secret_key"), role ARN has an embedded space ("aws. role_arn"), and profile uses underscore ("aws.profile_name"). This breaks FE validation (containsKey checks won’t match) and will cause the new FE unit tests to fail. Please align these constants with the actual property names ("aws.access.key", "aws.secret.key", "aws.msk.iam.role.arn", "aws.profile.name"). ```suggestion public static final String AWS_ACCESS_KEY = "aws.access.key"; public static final String AWS_SECRET_KEY = "aws.secret.key"; public static final String AWS_MSK_IAM_ROLE_ARN = "aws.msk.iam.role.arn"; public static final String AWS_PROFILE_NAME = "aws.profile.name"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
