github-actions[bot] commented on code in PR #60051: URL: https://github.com/apache/doris/pull/60051#discussion_r2882323655
########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisProgress.java: ########## @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.thrift.TKinesisRLTaskProgress; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Progress tracking for Kinesis Routine Load jobs. + * + * Kinesis uses sequence numbers instead of offsets like Kafka. + * A sequence number is a unique identifier for each record within a shard. + * Sequence numbers are string representations of 128-bit integers. + * + * Special position values: + * - TRIM_HORIZON: Start from the oldest record in the shard + * - LATEST: Start from the newest record (records arriving after the iterator is created) + * - AT_TIMESTAMP: Start from a specific timestamp + * - Specific sequence number: Start from or after a specific sequence number + */ +public class KinesisProgress extends RoutineLoadProgress { + private static final Logger LOG = LogManager.getLogger(KinesisProgress.class); + + // Special position constants + public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON"; + public static final String POSITION_LATEST = "LATEST"; + public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP"; + + // Internal representation for special positions + // Using negative values since sequence numbers are always positive + public static final String TRIM_HORIZON_VAL = "-2"; + public static final String LATEST_VAL = "-1"; + + /** + * Map from shard ID to sequence number. + * The sequence number saved here is the next sequence number to be consumed. + * + * Note: Unlike Kafka partitions which are integers, Kinesis shard IDs are strings + * like "shardId-000000000000". + */ + @SerializedName(value = "shardToSeqNum") + private ConcurrentMap<String, String> shardIdToSequenceNumber = Maps.newConcurrentMap(); + + // MillisBehindLatest per shard, reported by BE from GetRecords response. + // Not persisted — refreshed every task commit. Used only for lag display and scheduling. + private ConcurrentMap<String, Long> shardIdToMillsBehindLatest = Maps.newConcurrentMap(); + + private ReentrantLock lock = new ReentrantLock(true); + + public KinesisProgress() { + super(LoadDataSourceType.KINESIS); + } + + public KinesisProgress(TKinesisRLTaskProgress tKinesisRLTaskProgress) { + super(LoadDataSourceType.KINESIS); + this.shardIdToSequenceNumber = new ConcurrentHashMap<>(); + if (tKinesisRLTaskProgress.getShardCmtSeqNum() != null) { + shardIdToSequenceNumber.putAll(tKinesisRLTaskProgress.getShardCmtSeqNum()); + } + if (tKinesisRLTaskProgress.isSetShardMillsBehindLatest()) { + this.shardIdToMillsBehindLatest = new ConcurrentHashMap<>( + tKinesisRLTaskProgress.getShardMillsBehindLatest()); + } + } + + public KinesisProgress(Map<String, String> shardIdToSequenceNumber) { + super(LoadDataSourceType.KINESIS); + this.shardIdToSequenceNumber = new ConcurrentHashMap<>(); + this.shardIdToSequenceNumber.putAll(shardIdToSequenceNumber); + } + + public KinesisProgress(ConcurrentMap<String, String> shardIdToSequenceNumber) { + super(LoadDataSourceType.KINESIS); + this.shardIdToSequenceNumber = shardIdToSequenceNumber; + } + + /** + * Get sequence numbers for specified shard IDs. + */ + public ConcurrentMap<String, String> getShardIdToSequenceNumber(List<String> shardIds) { + ConcurrentMap<String, String> result = Maps.newConcurrentMap(); + for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) { + for (String shardId : shardIds) { + if (entry.getKey().equals(shardId)) { + result.put(shardId, entry.getValue()); + } + } + } + return result; + } + + /** + * Add a shard with its starting position. + */ + public void addShardPosition(Pair<String, String> shardPosition) { + shardIdToSequenceNumber.put(shardPosition.first, shardPosition.second); Review Comment: **MEDIUM: Thread safety concern — `addShardPosition()` does not acquire the lock.** The `update()` method (line 261) uses `this.lock` for thread safety, but `addShardPosition()`, `modifyPosition()`, and `getShardIdToSequenceNumber()` do NOT acquire the lock despite modifying/reading the same `shardIdToSequenceNumber` map. While `shardIdToSequenceNumber` is a `ConcurrentMap` (safe for individual operations), compound read-modify-write sequences across multiple map entries need external synchronization. For example, `modifyPosition()` calls `clear()` + multiple `put()` which is not atomic. Either: 1. Use the lock consistently in all mutating methods, or 2. Document why lock-free access is safe for these specific call sites. ########## KINESIS_IMPLEMENTATION_GUIDE.md: ########## @@ -0,0 +1,952 @@ +# Apache Doris Routine Load 的 AWS Kinesis 支持 - 实现指南 + Review Comment: **HIGH: This file should not be committed.** 1. Contains hardcoded local developer paths: `/mnt/disk2/huangruixin/apache/doris/` 2. Contains AI authorship attribution: `"Author: Claude Sonnet 4.5 (Anthropic)"` 3. Contains emojis in section headers, inconsistent with repo style 4. Per AGENTS.md commit standards: environment/documentation files unrelated to code functionality should not be committed Please remove this file from the PR. ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -556,4 +571,439 @@ bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { return true; } +// ==================== AWS Kinesis Data Consumer Implementation ==================== + +KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx) + : _region(ctx->kinesis_info->region), + _stream(ctx->kinesis_info->stream), + _endpoint(ctx->kinesis_info->endpoint) { + VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region; +} + +KinesisDataConsumer::~KinesisDataConsumer() { + VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream; + // AWS SDK client managed by shared_ptr, will be automatically cleaned up +} + +Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { + std::unique_lock<std::mutex> l(_lock); + if (_init) { + return Status::OK(); // Already initialized (idempotent) + } + + // Store custom properties (AWS credentials, etc.) + _custom_properties.insert(ctx->kinesis_info->properties.begin(), + ctx->kinesis_info->properties.end()); + + // Create AWS Kinesis client + RETURN_IF_ERROR(_create_kinesis_client(ctx)); + + VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream + << ", region=" << _region << ", " << ctx->brief(); + _init = true; + return Status::OK(); +} + +Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) { + // Reuse S3ClientFactory's credential provider logic + // This supports all AWS authentication methods: + // - Simple AK/SK + // - IAM instance profile (EC2) + // - STS assume role + // - Session tokens + // - Environment variables + // - Default credential chain + + S3ClientConf s3_conf; + s3_conf.region = _region; + s3_conf.endpoint = _endpoint; + + // Parse AWS credentials from properties + auto it_ak = _custom_properties.find("aws.access.key.id"); + auto it_sk = _custom_properties.find("aws.secret.access.key"); + auto it_token = _custom_properties.find("aws.session.token"); + auto it_role_arn = _custom_properties.find("aws.iam.role.arn"); + auto it_external_id = _custom_properties.find("aws.external.id"); + auto it_provider = _custom_properties.find("aws.credentials.provider"); + + if (it_ak != _custom_properties.end()) { + s3_conf.ak = it_ak->second; + } + if (it_sk != _custom_properties.end()) { + s3_conf.sk = it_sk->second; + } + if (it_token != _custom_properties.end()) { + s3_conf.token = it_token->second; + } + if (it_role_arn != _custom_properties.end()) { + s3_conf.role_arn = it_role_arn->second; + } + if (it_external_id != _custom_properties.end()) { + s3_conf.external_id = it_external_id->second; + } + if (it_provider != _custom_properties.end()) { + // Map provider type string to enum + if (it_provider->second == "instance_profile") { + s3_conf.cred_provider_type = CredProviderType::InstanceProfile; + } else if (it_provider->second == "env") { + s3_conf.cred_provider_type = CredProviderType::Env; + } else if (it_provider->second == "simple") { + s3_conf.cred_provider_type = CredProviderType::Simple; + } + } + + // Create AWS ClientConfiguration + Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); + aws_config.region = _region; + + if (!_endpoint.empty()) { + aws_config.endpointOverride = _endpoint; + } + + // Set timeouts from properties or use defaults + auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms"); + if (it_request_timeout != _custom_properties.end()) { + aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second); + } else { + aws_config.requestTimeoutMs = 30000; // 30s default + } + + auto it_conn_timeout = _custom_properties.find("aws.connection.timeout.ms"); + if (it_conn_timeout != _custom_properties.end()) { + aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second); + } + + // Get credentials provider (reuses S3 infrastructure) + auto credentials_provider = S3ClientFactory::instance().get_aws_credentials_provider(s3_conf); + + // Create Kinesis client + _kinesis_client = + std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config); + + if (!_kinesis_client) { + return Status::InternalError("Failed to create AWS Kinesis client for stream: {}, region: {}", + _stream, _region); + } + + LOG(INFO) << "Created Kinesis client for stream: " << _stream << ", region: " << _region; + return Status::OK(); +} + +Status KinesisDataConsumer::assign_shards( + const std::map<std::string, std::string>& shard_sequence_numbers, + const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx) { + DCHECK(_kinesis_client); + + std::stringstream ss; + ss << "Assigning shards to Kinesis consumer " << _id << ": "; + + for (auto& entry : shard_sequence_numbers) { + const std::string& shard_id = entry.first; + const std::string& sequence_number = entry.second; + + // Get shard iterator for this shard + std::string iterator; + RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number, &iterator)); + + _shard_iterators[shard_id] = iterator; + _consuming_shard_ids.insert(shard_id); + + ss << "[" << shard_id << ": " << sequence_number << "] "; + } + + LOG(INFO) << ss.str(); + return Status::OK(); +} + +Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id, + const std::string& sequence_number, + std::string* iterator) { + Aws::Kinesis::Model::GetShardIteratorRequest request; + request.SetStreamName(_stream); + request.SetShardId(shard_id); + + // Determine iterator type based on sequence number + if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" || + sequence_number == "-2") { + // Start from oldest record in shard + request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON); + } else if (sequence_number == "LATEST" || sequence_number == "-1") { + // Start from newest record (records arriving after iterator creation) + request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST); + } else { + // Resume from specific sequence number + request.SetShardIteratorType( + Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER); + request.SetStartingSequenceNumber(sequence_number); + } + + auto outcome = _kinesis_client->GetShardIterator(request); + if (!outcome.IsSuccess()) { + auto& error = outcome.GetError(); + return Status::InternalError( + "Failed to get shard iterator for shard {}: {} ({})", shard_id, + error.GetMessage(), static_cast<int>(error.GetErrorType())); + } + + *iterator = outcome.GetResult().GetShardIterator(); + VLOG_NOTICE << "Got shard iterator for shard: " << shard_id; + return Status::OK(); +} + +Status KinesisDataConsumer::group_consume( + BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, + int64_t max_running_time_ms) { + static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3; + static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second + static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000 + static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep between shards + + int64_t left_time = max_running_time_ms; + LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id + << ", stream: " << _stream << ", max running time(ms): " << left_time; + + int64_t received_rows = 0; + int64_t put_rows = 0; + int32_t retry_times = 0; + Status st = Status::OK(); + bool done = false; + + MonotonicStopWatch consumer_watch; + MonotonicStopWatch watch; + watch.start(); + + while (true) { + // Check cancellation flag + { + std::unique_lock<std::mutex> l(_lock); + if (_cancelled) { + break; + } + } + + if (left_time <= 0) { + break; + } + + // Round-robin through all active shards + for (auto it = _consuming_shard_ids.begin(); it != _consuming_shard_ids.end() && !done;) { + const std::string& shard_id = *it; + auto iter_it = _shard_iterators.find(shard_id); + + if (iter_it == _shard_iterators.end() || iter_it->second.empty()) { + // Shard exhausted (closed due to split/merge), remove from active set + LOG(INFO) << "Shard exhausted: " << shard_id; + it = _consuming_shard_ids.erase(it); + continue; + } + + // Call Kinesis GetRecords API + consumer_watch.start(); + + Aws::Kinesis::Model::GetRecordsRequest request; Review Comment: **LOW: `retry_times` counter is shared across all shards.** The `retry_times` variable is shared across the round-robin shard loop but is reset on *any* successful `GetRecords` call (any shard). This means: - Shard A fails with a retriable error (retry_times=1) - Shard B succeeds (retry_times reset to 0) - Shard A fails again (retry_times=1 again, not 2) This effectively removes the retry limit protection for individual shards. Consider tracking retry counts per shard. ########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: ########## @@ -0,0 +1,785 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.datasource.kinesis.KinesisUtil; +import org.apache.doris.load.routineload.kinesis.KinesisConfiguration; +import org.apache.doris.load.routineload.kinesis.KinesisDataSourceProperties; +import org.apache.doris.nereids.load.NereidsImportColumnDesc; +import org.apache.doris.nereids.load.NereidsLoadTaskInfo; +import org.apache.doris.nereids.load.NereidsLoadUtils; +import org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * KinesisRoutineLoadJob is a RoutineLoadJob that fetches data from AWS Kinesis streams. + * + * Key concepts: + * - Stream: Named collection of data records (similar to Kafka topic) + * - Shard: Sequence of data records in a stream (similar to Kafka partition) + * - Sequence Number: Unique identifier for each record within a shard (similar to Kafka offset) + * - Consumer: Application that reads from a stream + * + * The progress tracks sequence numbers for each shard, represented as: + * {"shardId-000000000000": "49590338271490256608559692538361571095921575989136588802", ...} + */ +public class KinesisRoutineLoadJob extends RoutineLoadJob { + private static final Logger LOG = LogManager.getLogger(KinesisRoutineLoadJob.class); + + public static final String KINESIS_FILE_CATALOG = "kinesis"; + + @SerializedName("rg") + private String region; + @SerializedName("stm") + private String stream; + @SerializedName("ep") + private String endpoint; + + // optional, user want to load shards(Kafka's cskp). + @SerializedName("csks") + private List<String> customKinesisShards = Lists.newArrayList(); + + // current shards being consumed. + // updated periodically because shards may split or merge. + private List<String> currentKinesisShards = Lists.newArrayList(); + + // Default starting position for new shards. + // Values: TRIM_HORIZON, LATEST, or a timestamp string. + private String kinesisDefaultPosition = ""; + + // custom Kinesis properties including AWS credentials and client settings. + @SerializedName("prop") + private Map<String, String> customProperties = Maps.newHashMap(); + private Map<String, String> convertedCustomProperties = Maps.newHashMap(); + + // The latest offset of each partition fetched from kinesis server. + // Will be updated periodically by calling hasMoreDataToConsume() + private Map<String, Long> cachedShardWithMillsBehindLatest = Maps.newConcurrentMap(); + + // newly discovered shards from Kinesis. + private List<String> newCurrentKinesisShards = Lists.newArrayList(); + + public KinesisRoutineLoadJob() { + // For serialization + super(-1, LoadDataSourceType.KINESIS); + } + + public KinesisRoutineLoadJob(Long id, String name, long dbId, long tableId, + String region, String stream, UserIdentity userIdentity) { + super(id, name, dbId, tableId, LoadDataSourceType.KINESIS, userIdentity); + this.region = region; + this.stream = stream; + this.progress = new KinesisProgress(); + } + + public KinesisRoutineLoadJob(Long id, String name, long dbId, + String region, String stream, + UserIdentity userIdentity, boolean isMultiTable) { + super(id, name, dbId, LoadDataSourceType.KINESIS, userIdentity); + this.region = region; + this.stream = stream; + this.progress = new KinesisProgress(); + setMultiTable(isMultiTable); + } + + public String getRegion() { + return region; + } + + public String getStream() { + return stream; + } + + public String getEndpoint() { + return endpoint; + } + + public Map<String, String> getConvertedCustomProperties() { + return convertedCustomProperties; + } + + @Override + public void prepare() throws UserException { + // should reset converted properties each time the job being prepared. + // because the file info can be changed anytime. + convertCustomProperties(true); + } + + private void convertCustomProperties(boolean rebuild) throws DdlException { + if (customProperties.isEmpty()) { + return; + } + + if (!rebuild && !convertedCustomProperties.isEmpty()) { + return; + } + + if (rebuild) { + convertedCustomProperties.clear(); + } + + for (Map.Entry<String, String> entry : customProperties.entrySet()) { + convertedCustomProperties.put(entry.getKey(), entry.getValue()); + } + + // Handle default position + if (convertedCustomProperties.containsKey(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName())) { + kinesisDefaultPosition = convertedCustomProperties.remove( + KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()); + } + } + + private String convertedDefaultPosition() { + if (this.kinesisDefaultPosition.isEmpty()) { + return KinesisProgress.POSITION_LATEST; + } + return this.kinesisDefaultPosition; + } + + @Override + public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException { + List<RoutineLoadTaskInfo> result = new ArrayList<>(); + writeLock(); + try { + if (state == JobState.NEED_SCHEDULE) { + // Divide shards into tasks + for (int i = 0; i < currentConcurrentTaskNum; i++) { + Map<String, String> taskKinesisProgress = Maps.newHashMap(); + for (int j = i; j < currentKinesisShards.size(); j = j + currentConcurrentTaskNum) { + String shardId = currentKinesisShards.get(j); + taskKinesisProgress.put(shardId, + ((KinesisProgress) progress).getSequenceNumberByShard(shardId)); + } + KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(UUID.randomUUID(), id, + getTimeout() * 1000, taskKinesisProgress, isMultiTable(), -1, false); + routineLoadTaskInfoList.add(kinesisTaskInfo); + result.add(kinesisTaskInfo); + } + // Change job state to running + if (!result.isEmpty()) { + unprotectUpdateState(JobState.RUNNING, null, false); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignore to divide routine load job while job state {}", state); + } + } + // Save task into queue of needScheduleTasks + Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(result); + } finally { + writeUnlock(); + } + } + + @Override + public int calculateCurrentConcurrentTaskNum() { + int shardNum = currentKinesisShards.size(); + if (desireTaskConcurrentNum == 0) { + desireTaskConcurrentNum = Config.max_routine_load_task_concurrent_num; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("current concurrent task number is min" + + "(shard num: {}, desire task concurrent num: {}, config: {})", + shardNum, desireTaskConcurrentNum, Config.max_routine_load_task_concurrent_num); + } + currentTaskConcurrentNum = Math.min(shardNum, Math.min(desireTaskConcurrentNum, + Config.max_routine_load_task_concurrent_num)); + return currentTaskConcurrentNum; + } + + @Override + protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason) { + if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED + || txnState.getTransactionStatus() == TransactionStatus.VISIBLE) { + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("no need to update the progress of kinesis routine load. txn status: {}, " + + "txnStatusChangeReason: {}, task: {}, job: {}", + txnState.getTransactionStatus(), txnStatusChangeReason, + DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id); + } + return false; + } + + private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) { + KinesisProgress taskProgress = (KinesisProgress) attachment.getProgress(); + + // Update cachedShardWithMillsBehindLatest from the MillisBehindLatest values + // reported by BE's GetRecords response. Keep the maximum value across concurrent tasks + // to avoid a stale (lower) value from one task overwriting a fresher value from another. + taskProgress.getShardIdToMillsBehindLatest().forEach((shardId, millis) -> + cachedShardWithMillsBehindLatest.merge(shardId, millis, Math::max)); + + this.progress.update(attachment); + } + + @Override + protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { + updateProgressAndOffsetsCache(attachment); + super.updateProgress(attachment); + } + + @Override + protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { + super.replayUpdateProgress(attachment); + updateProgressAndOffsetsCache(attachment); + } + + @Override + protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo, boolean delaySchedule) { + KinesisTaskInfo oldKinesisTaskInfo = (KinesisTaskInfo) routineLoadTaskInfo; + // Add new task + KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(oldKinesisTaskInfo, + ((KinesisProgress) progress).getShardIdToSequenceNumber(oldKinesisTaskInfo.getShards()), + isMultiTable()); + kinesisTaskInfo.setDelaySchedule(delaySchedule); + // Remove old task + routineLoadTaskInfoList.remove(routineLoadTaskInfo); + // Add new task + routineLoadTaskInfoList.add(kinesisTaskInfo); + return kinesisTaskInfo; + } + + @Override + protected void unprotectUpdateProgress() throws UserException { + updateNewShardProgress(); + } + + @Override + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { + // For Kinesis, we refresh shards instead of Kafka partitions + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) { + if (customKinesisShards != null && !customKinesisShards.isEmpty()) { + return true; + } + return updateKinesisShards(); + } + return true; + } + + private boolean updateKinesisShards() throws UserException { + try { + this.newCurrentKinesisShards = getAllKinesisShards(); + } catch (Exception e) { + String msg = e.getMessage() + + " may be Kinesis properties set in job is error" + + " or no shard in this stream that should check Kinesis"; + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("error_msg", msg) + .build(), e); + if (this.state == JobState.NEED_SCHEDULE) { + unprotectUpdateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), + false /* not replay */); + } + return false; + } + return true; + } + + @Override + protected boolean unprotectNeedReschedule() throws UserException { + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + return isKinesisShardsChanged(); + } + return false; + } + + private boolean isKinesisShardsChanged() throws UserException { + if (CollectionUtils.isNotEmpty(customKinesisShards)) { + currentKinesisShards = customKinesisShards; + return false; + } + + Preconditions.checkNotNull(this.newCurrentKinesisShards); + if (new HashSet<>(currentKinesisShards).containsAll(this.newCurrentKinesisShards)) { + if (currentKinesisShards.size() > this.newCurrentKinesisShards.size()) { + currentKinesisShards = this.newCurrentKinesisShards; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kinesis_shards", Joiner.on(",").join(currentKinesisShards)) + .add("msg", "current kinesis shards has been changed") + .build()); + } + return true; + } else { + // Check if progress is consistent + for (String shardId : currentKinesisShards) { + if (!((KinesisProgress) progress).containsShard(shardId)) { + return true; + } + } + return false; + } + } else { + currentKinesisShards = this.newCurrentKinesisShards; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kinesis_shards", Joiner.on(",").join(currentKinesisShards)) + .add("msg", "current kinesis shards has been changed") + .build()); + } + return true; + } + } + + @Override + protected boolean needAutoResume() { + writeLock(); + try { + if (this.state == JobState.PAUSED) { + return ScheduleRule.isNeedAutoSchedule(this); + } + return false; + } finally { + writeUnlock(); + } + } + + @Override + public String getStatistic() { + Map<String, Object> summary = this.jobStatistic.summary(); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(summary); + } + + /** + * Get all shard IDs from the Kinesis stream. + * Delegates to a BE node via gRPC, which calls AWS ListShards API using the SDK. + */ + private List<String> getAllKinesisShards() throws UserException { + convertCustomProperties(false); + if (!customKinesisShards.isEmpty()) { + return customKinesisShards; + } + return KinesisUtil.getAllKinesisShards(region, stream, endpoint, convertedCustomProperties); + } + + /** + * Create a KinesisRoutineLoadJob from CreateRoutineLoadInfo. + */ + public static KinesisRoutineLoadJob fromCreateInfo(CreateRoutineLoadInfo info, ConnectContext ctx) + throws UserException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDBName()); + + long id = Env.getCurrentEnv().getNextId(); + KinesisDataSourceProperties kinesisProperties = + (KinesisDataSourceProperties) info.getDataSourceProperties(); + KinesisRoutineLoadJob kinesisRoutineLoadJob; + + if (kinesisProperties.isMultiTable()) { + kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(), + db.getId(), + kinesisProperties.getRegion(), kinesisProperties.getStream(), + ctx.getCurrentUserIdentity(), true); + } else { + OlapTable olapTable = db.getOlapTableOrDdlException(info.getTableName()); + checkMeta(olapTable, info.getRoutineLoadDesc()); + // Check load_to_single_tablet compatibility + if (info.isLoadToSingleTablet() + && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new DdlException( + "if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + long tableId = olapTable.getId(); + kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(), + db.getId(), tableId, + kinesisProperties.getRegion(), kinesisProperties.getStream(), + ctx.getCurrentUserIdentity()); + } + + kinesisRoutineLoadJob.setOptional(info); + kinesisRoutineLoadJob.checkCustomProperties(); + + return kinesisRoutineLoadJob; + } + + private void checkCustomProperties() throws DdlException { + // Validate custom properties if needed Review Comment: **LOW: `checkCustomProperties()` is empty.** ```java private void checkCustomProperties() throws DdlException { // TODO: Add custom property validation if needed } ``` This method is called during `fromCreateInfo()` but does nothing. Consider either implementing validation (e.g., verify required credentials are present) or removing the call until it's needed. ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -22,6 +22,20 @@ #include <gen_cpp/internal_service.pb.h> #include <librdkafka/rdkafkacpp.h> +// AWS Kinesis SDK includes +#include <aws/core/client/ClientConfiguration.h> +#include <aws/core/utils/Outcome.h> +#include <aws/kinesis/KinesisClient.h> +#include <aws/kinesis/KinesisErrors.h> Review Comment: **MEDIUM: Heavy AWS SDK includes in `.cpp` file that are also included in the header (`data_consumer.h`).** These AWS SDK headers (`KinesisClient.h`, `GetRecordsRequest.h`, etc.) are included in both `data_consumer.h` and `data_consumer.cpp`. Since `data_consumer.h` is included by many other translation units, this will significantly increase compile times. Consider using forward declarations in the header and only including the full AWS SDK headers in the `.cpp` file. For example, forward-declare `namespace Aws { namespace Kinesis { class KinesisClient; } }` in the header. ########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisTaskInfo.java: ########## @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.load.NereidsLoadTaskInfo; +import org.apache.doris.nereids.load.NereidsStreamLoadPlanner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TKinesisLoadInfo; +import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TRoutineLoadTask; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * Task info for Kinesis Routine Load. + * + * Each task is responsible for consuming data from one or more Kinesis shards. + * The task tracks the sequence number for each shard and reports progress back + * to the FE after successful consumption. + */ +public class KinesisTaskInfo extends RoutineLoadTaskInfo { + private static final Logger LOG = LogManager.getLogger(KinesisTaskInfo.class); + + private RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager(); + Review Comment: **MEDIUM: `routineLoadManager` is captured at construction time, not at use time.** ```java private RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager(); ``` If `KinesisTaskInfo` is constructed during FE startup / EditLog replay when `Env` is not fully initialized, `getRoutineLoadManager()` may return null, causing NPE when methods like `hasMoreDataToConsume()` are called. The Kafka equivalent (`KafkaTaskInfo`) fetches the manager at each call site. Consider doing the same here: ```java private RoutineLoadManager getRoutineLoadManager() { return Env.getCurrentEnv().getRoutineLoadManager(); } ``` ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -556,4 +571,439 @@ bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { return true; } +// ==================== AWS Kinesis Data Consumer Implementation ==================== + +KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx) + : _region(ctx->kinesis_info->region), + _stream(ctx->kinesis_info->stream), + _endpoint(ctx->kinesis_info->endpoint) { + VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region; +} + +KinesisDataConsumer::~KinesisDataConsumer() { + VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream; + // AWS SDK client managed by shared_ptr, will be automatically cleaned up +} + +Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { + std::unique_lock<std::mutex> l(_lock); + if (_init) { + return Status::OK(); // Already initialized (idempotent) + } + + // Store custom properties (AWS credentials, etc.) + _custom_properties.insert(ctx->kinesis_info->properties.begin(), + ctx->kinesis_info->properties.end()); + + // Create AWS Kinesis client + RETURN_IF_ERROR(_create_kinesis_client(ctx)); + + VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream + << ", region=" << _region << ", " << ctx->brief(); + _init = true; + return Status::OK(); +} + +Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) { + // Reuse S3ClientFactory's credential provider logic + // This supports all AWS authentication methods: + // - Simple AK/SK + // - IAM instance profile (EC2) + // - STS assume role + // - Session tokens + // - Environment variables + // - Default credential chain + + S3ClientConf s3_conf; + s3_conf.region = _region; + s3_conf.endpoint = _endpoint; + + // Parse AWS credentials from properties + auto it_ak = _custom_properties.find("aws.access.key.id"); + auto it_sk = _custom_properties.find("aws.secret.access.key"); + auto it_token = _custom_properties.find("aws.session.token"); + auto it_role_arn = _custom_properties.find("aws.iam.role.arn"); + auto it_external_id = _custom_properties.find("aws.external.id"); + auto it_provider = _custom_properties.find("aws.credentials.provider"); + + if (it_ak != _custom_properties.end()) { + s3_conf.ak = it_ak->second; + } + if (it_sk != _custom_properties.end()) { + s3_conf.sk = it_sk->second; + } + if (it_token != _custom_properties.end()) { + s3_conf.token = it_token->second; + } + if (it_role_arn != _custom_properties.end()) { Review Comment: **HIGH: `std::stoi()` without try/catch will crash BE on invalid input.** If the user provides a non-numeric value for `aws.request.timeout.ms` (e.g., `"abc"`), `std::stoi()` will throw `std::invalid_argument`, which is uncaught here. This will propagate up and crash the BE process. Same issue on the `aws.connection.timeout.ms` path a few lines below. Fix: Wrap in try/catch or use a safe parsing function that returns a Status on failure. ########## be/src/runtime/routine_load/data_consumer_pool.cpp: ########## @@ -63,6 +63,9 @@ Status DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx, case TLoadSourceType::KAFKA: consumer = std::make_shared<KafkaDataConsumer>(ctx); break; + case TLoadSourceType::KINESIS: + consumer = std::make_shared<KinesisDataConsumer>(ctx); Review Comment: **BLOCKER: Kinesis tasks will never execute.** `get_consumer_grp()` (called unconditionally by `exec_task()` before any source-type branching) has a hard check at line 83: ```cpp if (ctx->load_src_type != TLoadSourceType::KAFKA) { return Status::InternalError("PAUSE: Currently only support consumer group for Kafka data source"); } ``` This means **every Kinesis task will fail immediately** with this error before any Kinesis-specific code in `exec_task()` is reached. All the Kinesis pipe creation, shard assignment, and consumption code below is effectively dead code. To fix this, you need to either: 1. Restructure `exec_task()` so Kinesis tasks bypass `get_consumer_grp()` entirely and obtain a single consumer via `get_consumer()`, OR 2. Create a `KinesisDataConsumerGroup` class and extend `get_consumer_grp()` to handle Kinesis. Also note that even if this barrier is removed, there is no `KinesisDataConsumerGroup::start_all()` that reads records from the `BlockingQueue` and writes them to the `KinesisConsumerPipe`. The data path from Kinesis SDK → pipe → StreamLoad is broken. ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -556,4 +571,439 @@ bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { return true; } +// ==================== AWS Kinesis Data Consumer Implementation ==================== + +KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx) + : _region(ctx->kinesis_info->region), + _stream(ctx->kinesis_info->stream), + _endpoint(ctx->kinesis_info->endpoint) { + VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region; +} + +KinesisDataConsumer::~KinesisDataConsumer() { + VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream; + // AWS SDK client managed by shared_ptr, will be automatically cleaned up +} + +Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { + std::unique_lock<std::mutex> l(_lock); + if (_init) { + return Status::OK(); // Already initialized (idempotent) + } + + // Store custom properties (AWS credentials, etc.) + _custom_properties.insert(ctx->kinesis_info->properties.begin(), + ctx->kinesis_info->properties.end()); + + // Create AWS Kinesis client + RETURN_IF_ERROR(_create_kinesis_client(ctx)); + + VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream + << ", region=" << _region << ", " << ctx->brief(); + _init = true; + return Status::OK(); +} + +Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) { + // Reuse S3ClientFactory's credential provider logic + // This supports all AWS authentication methods: + // - Simple AK/SK + // - IAM instance profile (EC2) + // - STS assume role + // - Session tokens + // - Environment variables + // - Default credential chain + + S3ClientConf s3_conf; + s3_conf.region = _region; + s3_conf.endpoint = _endpoint; + + // Parse AWS credentials from properties + auto it_ak = _custom_properties.find("aws.access.key.id"); + auto it_sk = _custom_properties.find("aws.secret.access.key"); + auto it_token = _custom_properties.find("aws.session.token"); + auto it_role_arn = _custom_properties.find("aws.iam.role.arn"); + auto it_external_id = _custom_properties.find("aws.external.id"); + auto it_provider = _custom_properties.find("aws.credentials.provider"); + + if (it_ak != _custom_properties.end()) { + s3_conf.ak = it_ak->second; + } + if (it_sk != _custom_properties.end()) { + s3_conf.sk = it_sk->second; + } + if (it_token != _custom_properties.end()) { + s3_conf.token = it_token->second; + } + if (it_role_arn != _custom_properties.end()) { + s3_conf.role_arn = it_role_arn->second; + } + if (it_external_id != _custom_properties.end()) { + s3_conf.external_id = it_external_id->second; + } + if (it_provider != _custom_properties.end()) { + // Map provider type string to enum + if (it_provider->second == "instance_profile") { + s3_conf.cred_provider_type = CredProviderType::InstanceProfile; + } else if (it_provider->second == "env") { + s3_conf.cred_provider_type = CredProviderType::Env; + } else if (it_provider->second == "simple") { + s3_conf.cred_provider_type = CredProviderType::Simple; + } + } + + // Create AWS ClientConfiguration + Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); + aws_config.region = _region; + + if (!_endpoint.empty()) { + aws_config.endpointOverride = _endpoint; + } + + // Set timeouts from properties or use defaults + auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms"); + if (it_request_timeout != _custom_properties.end()) { + aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second); + } else { Review Comment: **HIGH: Use `DORIS_CHECK` instead of `DCHECK`.** Per AGENTS.md coding standards: invariant assertions should use `DORIS_CHECK`, not `DCHECK`. `DCHECK` disappears in RELEASE builds, providing no protection in production. ```cpp // Should be: DORIS_CHECK(_kinesis_client); ``` ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -556,4 +571,439 @@ bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) { return true; } +// ==================== AWS Kinesis Data Consumer Implementation ==================== + +KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext> ctx) + : _region(ctx->kinesis_info->region), + _stream(ctx->kinesis_info->stream), + _endpoint(ctx->kinesis_info->endpoint) { + VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ", region=" << _region; +} + +KinesisDataConsumer::~KinesisDataConsumer() { + VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream; + // AWS SDK client managed by shared_ptr, will be automatically cleaned up +} + +Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { + std::unique_lock<std::mutex> l(_lock); + if (_init) { + return Status::OK(); // Already initialized (idempotent) + } + + // Store custom properties (AWS credentials, etc.) + _custom_properties.insert(ctx->kinesis_info->properties.begin(), + ctx->kinesis_info->properties.end()); + + // Create AWS Kinesis client + RETURN_IF_ERROR(_create_kinesis_client(ctx)); + + VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream + << ", region=" << _region << ", " << ctx->brief(); + _init = true; + return Status::OK(); +} + +Status KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext> ctx) { + // Reuse S3ClientFactory's credential provider logic + // This supports all AWS authentication methods: + // - Simple AK/SK + // - IAM instance profile (EC2) + // - STS assume role + // - Session tokens + // - Environment variables + // - Default credential chain + + S3ClientConf s3_conf; + s3_conf.region = _region; + s3_conf.endpoint = _endpoint; + + // Parse AWS credentials from properties + auto it_ak = _custom_properties.find("aws.access.key.id"); + auto it_sk = _custom_properties.find("aws.secret.access.key"); + auto it_token = _custom_properties.find("aws.session.token"); + auto it_role_arn = _custom_properties.find("aws.iam.role.arn"); + auto it_external_id = _custom_properties.find("aws.external.id"); + auto it_provider = _custom_properties.find("aws.credentials.provider"); + + if (it_ak != _custom_properties.end()) { + s3_conf.ak = it_ak->second; + } + if (it_sk != _custom_properties.end()) { + s3_conf.sk = it_sk->second; + } + if (it_token != _custom_properties.end()) { + s3_conf.token = it_token->second; + } + if (it_role_arn != _custom_properties.end()) { + s3_conf.role_arn = it_role_arn->second; + } + if (it_external_id != _custom_properties.end()) { + s3_conf.external_id = it_external_id->second; + } + if (it_provider != _custom_properties.end()) { + // Map provider type string to enum + if (it_provider->second == "instance_profile") { + s3_conf.cred_provider_type = CredProviderType::InstanceProfile; + } else if (it_provider->second == "env") { + s3_conf.cred_provider_type = CredProviderType::Env; + } else if (it_provider->second == "simple") { + s3_conf.cred_provider_type = CredProviderType::Simple; + } + } + + // Create AWS ClientConfiguration + Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); + aws_config.region = _region; + + if (!_endpoint.empty()) { + aws_config.endpointOverride = _endpoint; + } + + // Set timeouts from properties or use defaults + auto it_request_timeout = _custom_properties.find("aws.request.timeout.ms"); + if (it_request_timeout != _custom_properties.end()) { + aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second); + } else { + aws_config.requestTimeoutMs = 30000; // 30s default + } + + auto it_conn_timeout = _custom_properties.find("aws.connection.timeout.ms"); + if (it_conn_timeout != _custom_properties.end()) { + aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second); + } + + // Get credentials provider (reuses S3 infrastructure) + auto credentials_provider = S3ClientFactory::instance().get_aws_credentials_provider(s3_conf); + + // Create Kinesis client + _kinesis_client = + std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config); + + if (!_kinesis_client) { + return Status::InternalError("Failed to create AWS Kinesis client for stream: {}, region: {}", + _stream, _region); + } + + LOG(INFO) << "Created Kinesis client for stream: " << _stream << ", region: " << _region; + return Status::OK(); +} + +Status KinesisDataConsumer::assign_shards( + const std::map<std::string, std::string>& shard_sequence_numbers, + const std::string& stream_name, std::shared_ptr<StreamLoadContext> ctx) { + DCHECK(_kinesis_client); + + std::stringstream ss; + ss << "Assigning shards to Kinesis consumer " << _id << ": "; + + for (auto& entry : shard_sequence_numbers) { + const std::string& shard_id = entry.first; + const std::string& sequence_number = entry.second; + + // Get shard iterator for this shard + std::string iterator; + RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number, &iterator)); + + _shard_iterators[shard_id] = iterator; + _consuming_shard_ids.insert(shard_id); + + ss << "[" << shard_id << ": " << sequence_number << "] "; + } + + LOG(INFO) << ss.str(); + return Status::OK(); +} + +Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id, + const std::string& sequence_number, + std::string* iterator) { + Aws::Kinesis::Model::GetShardIteratorRequest request; + request.SetStreamName(_stream); + request.SetShardId(shard_id); + + // Determine iterator type based on sequence number + if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" || + sequence_number == "-2") { + // Start from oldest record in shard + request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON); + } else if (sequence_number == "LATEST" || sequence_number == "-1") { + // Start from newest record (records arriving after iterator creation) + request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST); + } else { + // Resume from specific sequence number + request.SetShardIteratorType( + Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER); + request.SetStartingSequenceNumber(sequence_number); + } + + auto outcome = _kinesis_client->GetShardIterator(request); + if (!outcome.IsSuccess()) { + auto& error = outcome.GetError(); + return Status::InternalError( + "Failed to get shard iterator for shard {}: {} ({})", shard_id, + error.GetMessage(), static_cast<int>(error.GetErrorType())); + } + + *iterator = outcome.GetResult().GetShardIterator(); + VLOG_NOTICE << "Got shard iterator for shard: " << shard_id; + return Status::OK(); +} + +Status KinesisDataConsumer::group_consume( + BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue, + int64_t max_running_time_ms) { + static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3; + static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second + static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000 + static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep between shards + + int64_t left_time = max_running_time_ms; + LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id + << ", stream: " << _stream << ", max running time(ms): " << left_time; + + int64_t received_rows = 0; + int64_t put_rows = 0; + int32_t retry_times = 0; + Status st = Status::OK(); + bool done = false; + + MonotonicStopWatch consumer_watch; + MonotonicStopWatch watch; + watch.start(); + + while (true) { + // Check cancellation flag + { + std::unique_lock<std::mutex> l(_lock); + if (_cancelled) { + break; + } + } + + if (left_time <= 0) { + break; + } + + // Round-robin through all active shards + for (auto it = _consuming_shard_ids.begin(); it != _consuming_shard_ids.end() && !done;) { + const std::string& shard_id = *it; + auto iter_it = _shard_iterators.find(shard_id); + + if (iter_it == _shard_iterators.end() || iter_it->second.empty()) { + // Shard exhausted (closed due to split/merge), remove from active set + LOG(INFO) << "Shard exhausted: " << shard_id; + it = _consuming_shard_ids.erase(it); + continue; + } + + // Call Kinesis GetRecords API + consumer_watch.start(); + + Aws::Kinesis::Model::GetRecordsRequest request; + request.SetShardIterator(iter_it->second); + request.SetLimit(KINESIS_GET_RECORDS_LIMIT); + + auto outcome = _kinesis_client->GetRecords(request); + consumer_watch.stop(); + + // Track metrics (reuse Kafka metrics, they're generic) + DorisMetrics::instance()->routine_load_get_msg_count->increment(1); + DorisMetrics::instance()->routine_load_get_msg_latency->increment( + consumer_watch.elapsed_time() / 1000 / 1000); + + if (!outcome.IsSuccess()) { + auto& error = outcome.GetError(); + + // Handle throttling (ProvisionedThroughputExceededException) + if (error.GetErrorType() == + Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) { + LOG(INFO) << "Kinesis rate limit exceeded for shard: " << shard_id + << ", backing off " << RATE_LIMIT_BACKOFF_MS << "ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(RATE_LIMIT_BACKOFF_MS)); + ++it; // Move to next shard, will retry this one next round + continue; + } + + // Handle retriable errors + if (_is_retriable_error(error)) { + LOG(INFO) << "Kinesis retriable error for shard " << shard_id << ": " + << error.GetMessage() << ", retry times: " << retry_times++; + if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + continue; + } + } + + // Fatal error + LOG(WARNING) << "Kinesis consume failed for shard " << shard_id << ": " + << error.GetMessage() << " (" << static_cast<int>(error.GetErrorType()) + << ")"; + st = Status::InternalError("Kinesis GetRecords failed for shard {}: {}", shard_id, + error.GetMessage()); + done = true; + break; + } + + // Reset retry counter on success + retry_times = 0; + + // Process records + auto& result = outcome.GetResult(); + RETURN_IF_ERROR(_process_records(result, queue, &received_rows, &put_rows)); + + // Track MillisBehindLatest for this shard (used by FE for lag monitoring & scheduling) + _millis_behind_latest[shard_id] = result.GetMillisBehindLatest(); + + // Update shard iterator for next call + std::string next_iterator = result.GetNextShardIterator(); + if (next_iterator.empty()) { + // Shard is closed (split/merge), remove from active set + LOG(INFO) << "Shard closed: " << shard_id << " (split/merge detected)"; + _shard_iterators.erase(shard_id); + it = _consuming_shard_ids.erase(it); + } else { + _shard_iterators[shard_id] = next_iterator; + ++it; + } + + // Check if all shards are exhausted + if (_consuming_shard_ids.empty()) { + LOG(INFO) << "All shards exhausted for consumer: " << _id; + done = true; + break; + } + + // Small sleep to avoid tight loop + std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS)); + } + + left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000; + if (done) { + break; + } Review Comment: **MEDIUM: Unnecessary record copy — performance concern.** ```cpp auto record_ptr = std::make_shared<Aws::Kinesis::Model::Record>(record); ``` This copies the entire `Record` object including its data buffer. For high-throughput streams, this is a significant performance bottleneck. The `records` vector from `GetRecords` could be iterated with move semantics instead: ```cpp // If result.GetRecords() returned by value, you could use: auto record_ptr = std::make_shared<Aws::Kinesis::Model::Record>(std::move(record)); ``` Note: The const reference from `GetRecords()` prevents moving. Consider copying the data payload bytes directly to the pipe instead of queueing entire `Record` objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
