github-actions[bot] commented on code in PR #61324:
URL: https://github.com/apache/doris/pull/61324#discussion_r3016559940
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java:
##########
@@ -233,16 +238,74 @@ private void analyzeKafkaPartitionProperty(List<Integer>
partitions) {
private void analyzeCustomProperties() throws AnalysisException {
this.customKafkaProperties = new HashMap<>();
for (Map.Entry<String, String> dataSourceProperty :
originalDataSourceProperties.entrySet()) {
- if
(dataSourceProperty.getKey().startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)) {
- String propertyKey = dataSourceProperty.getKey();
- String propertyValue = dataSourceProperty.getValue();
+ String propertyKey = dataSourceProperty.getKey();
+ String propertyValue = dataSourceProperty.getValue();
+ if (propertyKey.startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)) {
+ // property.xxx -> xxx (strip "property." prefix)
String[] propertyValueArr = propertyKey.split("\\.");
if (propertyValueArr.length < 2) {
throw new AnalysisException("kafka property value could
not be a empty string");
}
this.customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") +
1), propertyValue);
+ } else if (propertyKey.startsWith(AWS_PROPERTY_PREFIX)) {
+ // aws.xxx -> aws.xxx (keep as-is, no prefix stripping)
+ // AWS properties are Doris-specific and passed directly to BE
+ this.customKafkaProperties.put(propertyKey, propertyValue);
+ }
+ }
+
+ // Validate AWS MSK IAM authentication configuration if present
+ validateAwsMskIamConfig();
+ }
+
+ private void validateAwsMskIamConfig() throws AnalysisException {
+ String securityProtocol =
customKafkaProperties.get(KafkaConfiguration.SECURITY_PROTOCOL);
+ String saslMechanism =
customKafkaProperties.get(KafkaConfiguration.SASL_MECHANISM);
+
+ // Check if AWS MSK IAM authentication is being used
+ boolean hasAwsRegion =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_REGION);
+ boolean hasAccessKey =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_ACCESS_KEY);
+ boolean hasSecretKey =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_SECRET_KEY);
+ boolean hasRoleArn =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_MSK_IAM_ROLE_ARN);
+ boolean hasProfileName =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_PROFILE_NAME);
+ boolean hasCredentialsProvider =
customKafkaProperties.containsKey(KafkaConfiguration.AWS_CREDENTIALS_PROVIDER);
+
+ boolean isAwsMskIam = hasAwsRegion || hasAccessKey || hasSecretKey ||
hasRoleArn
+ || hasProfileName || hasCredentialsProvider;
+
+ // If AWS-related property is set, validate the complete configuration
+ if (isAwsMskIam) {
+ // aws.region is required for AWS MSK IAM authentication
+ if (!hasAwsRegion) {
+ throw new AnalysisException("When using AWS MSK IAM
authentication, "
+ + "'aws.region' is required. Please specify the AWS
region.");
+ }
+
+ // security.protocol should be SASL_SSL for AWS MSK IAM
+ if (securityProtocol == null ||
!"SASL_SSL".equalsIgnoreCase(securityProtocol)) {
+ throw new AnalysisException("For AWS MSK IAM authentication, "
+ + "'property.security.protocol' must be 'SASL_SSL' "
+ + "(SSL encryption + SASL authentication required), "
+ + "but got: " + securityProtocol);
+ }
+
+ // sasl.mechanism should be OAUTHBEARER for AWS MSK IAM
+ if (saslMechanism == null ||
!"OAUTHBEARER".equalsIgnoreCase(saslMechanism)) {
+ throw new AnalysisException("For AWS MSK IAM authentication, "
+ + "'property.sasl.mechanism' must be 'OAUTHBEARER',
but got: " + saslMechanism + ". "
+ + "Other SASL mechanisms are not supported now.");
+ }
+
+ // Detect if this is public (internet) access or internal AWS
access
Review Comment:
The explicit AK/SK pairing check is only enforced inside the
`isPublicAccess` branch. For the normal/private MSK path, `aws.access_key`
without `aws.secret_key` (or vice versa) still passes FE validation. BE then
drops the partial credentials in `create_from_properties()`, returns `nullptr`,
and the consumer reaches librdkafka with `OAUTHBEARER` configured but no token
callback. That turns a deterministic analysis error into a runtime auth
failure. Please reject partial explicit credentials regardless of broker type;
the public-access branch can stay as an additional restriction if needed.
##########
be/src/runtime/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,495 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+ _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
AwsMskIamAuth::_create_credentials_provider() {
+ if (!_config.role_arn.empty() && !_config.access_key.empty() &&
!_config.secret_key.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with explicit credentials
(cross-account): "
+ << _config.role_arn << " (Access Key ID: " <<
_config.access_key.substr(0, 4)
+ << "****)";
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ // Use explicit AK/SK as base credentials to assume the role
+ Aws::Auth::AWSCredentials base_credentials(_config.access_key,
_config.secret_key);
+ auto base_provider =
+
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider,
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 2. Explicit AK/SK credentials (direct access)
+ if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+ LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+ << _config.access_key.substr(0, 4) << "****)";
+
+ Aws::Auth::AWSCredentials credentials(_config.access_key,
_config.secret_key);
+
+ return
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+ }
+ // 3. Assume Role with Instance Profile (for same-account access from
within AWS)
+ if (!_config.role_arn.empty()) {
+ LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " <<
_config.role_arn;
+
+ Aws::Client::ClientConfiguration client_config;
+ if (!_config.region.empty()) {
+ client_config.region = _config.region;
+ }
+
+ auto sts_client = std::make_shared<Aws::STS::STSClient>(
+
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(),
client_config);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ _config.role_arn, Aws::String(), /* external_id */
Aws::String(),
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+ }
+ // 4. AWS Profile (reads from ~/.aws/credentials)
+ if (!_config.profile_name.empty()) {
+ LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+ return
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+ _config.profile_name.c_str());
+ }
+ // 5. Custom Credentials Provider
+ if (!_config.credentials_provider.empty()) {
+ LOG(INFO) << "Using custom credentials provider: " <<
_config.credentials_provider;
+
+ // Parse credentials provider type string
+ std::string provider_upper = _config.credentials_provider;
+ std::transform(provider_upper.begin(), provider_upper.end(),
provider_upper.begin(),
+ ::toupper);
+
+ if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+ return
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+ } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper ==
"INSTANCEPROFILE") {
+ return
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+ } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+ return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+ } else if (provider_upper == "DEFAULT") {
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ } else {
+ LOG(WARNING) << "Unknown credentials provider type: " <<
_config.credentials_provider
+ << ", falling back to default credentials provider
chain";
+ return
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+ }
+ // No valid credentials configuration found
+ LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please
provide.";
+ return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ if (!_credentials_provider) {
+ return Status::InternalError("AWS credentials provider not
initialized");
+ }
+
+ // Refresh if needed
+ if (_should_refresh_credentials()) {
+ _cached_credentials = _credentials_provider->GetAWSCredentials();
+ if (_cached_credentials.GetAWSAccessKeyId().empty()) {
+ return Status::InternalError("Failed to get AWS credentials");
+ }
+
+ // Set expiry time (assume 1 hour for instance profile, or use the
credentials expiration)
+ _credentials_expiry = std::chrono::system_clock::now() +
std::chrono::hours(1);
Review Comment:
Using a fixed 1 hour cache TTL here breaks credential sources whose session
is shorter than one hour. Routine Load refreshes the OAUTH token every 15
minutes, but `get_credentials()` will keep returning `_cached_credentials`
until this synthetic deadline is reached, so an STS/session credential that
expires earlier will still be used to sign new tokens after it is already
invalid. Please either consult the provider on each refresh and let the SDK
cache internally, or track the real credential expiration instead of
hard-coding `+ std::chrono::hours(1)`.
##########
regression-test/conf/regression-conf.groovy:
##########
@@ -96,6 +96,7 @@ excludeDirectories = "000_the_start_sentinel_do_not_touch," +
// keep this line
"segcompaction_p2," +
"workload_manager_p1," +
"cloud_p0/cache," +
+ "load_p0/msk_load," +
Review Comment:
This excludes the new `load_p0/msk_load` suites from every regression run,
so the PR's end-to-end coverage is disabled by the same change that adds it. It
also modifies `regression-conf.groovy`, which AGENTS.md treats as local
environment config that should not be committed. Please drop this exclusion and
rely on the in-suite `Strings.isNullOrEmpty(...)` guards to skip when the
external MSK environment is unavailable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]