github-actions[bot] commented on code in PR #61325: URL: https://github.com/apache/doris/pull/61325#discussion_r3239824906
########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisDataSourceProperties.java: ########## @@ -0,0 +1,411 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload.kinesis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.LoadDataSourceType; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * AWS Kinesis data source properties for Routine Load. + * + * Parameters: + * - kinesis_stream: Name of the Kinesis stream (required) + * - kinesis_shards: Comma-separated list of shard IDs (optional) + * - kinesis_shards_pos: Comma-separated list of positions for each shard (optional) + * - aws.region: AWS region (required) + * - aws.endpoint: Custom Kinesis endpoint URL (optional, e.g. LocalStack) + * - aws.access_key: AWS access key (optional) + * - aws.secret_key: AWS secret key (optional) + * - aws.session_key: AWS session token (optional) + * - aws.role_arn: IAM role ARN (optional) + * - property.kinesis_default_pos: Default position for new shards (optional) + * - property.*: Other pass-through parameters for AWS SDK configuration + * + * Example usage in SQL: + * CREATE ROUTINE LOAD my_job ON my_table + * FROM KINESIS ( + * "aws.region" = "us-east-1", + * "aws.access_key" = "AKIAIOSFODNN7EXAMPLE", + * "aws.secret_key" = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + * "kinesis_stream" = "my-stream", + * "property.kinesis_default_pos" = "TRIM_HORIZON" + * ); + */ +public class KinesisDataSourceProperties extends AbstractDataSourceProperties { + + /** + * List of shard IDs with their starting sequence numbers. + * Pair<ShardId, SequenceNumber> + * SequenceNumber can be: + * - Actual sequence number string + * - TRIM_HORIZON_VAL (-2) for oldest record + * - LATEST_VAL (-1) for newest record + */ + @Getter + @Setter + @SerializedName(value = "kinesisShardPositions") + private List<Pair<String, String>> kinesisShardPositions = Lists.newArrayList(); + + /** + * Custom Kinesis properties for advanced configuration. + * Includes AWS credentials and client configuration. + */ + @Getter + @SerializedName(value = "customKinesisProperties") + private Map<String, String> customKinesisProperties; + + /** + * Whether positions are specified as timestamps. + */ + @Getter + @SerializedName(value = "isPositionsForTimes") + private boolean isPositionsForTimes = false; + + /** + * AWS region for the Kinesis stream. + */ + @Getter + @SerializedName(value = "region") + private String region; + + /** + * Name of the Kinesis stream. + */ + @Getter + @SerializedName(value = "stream") + private String stream; + + /** + * Optional endpoint URL for custom endpoints. + */ + @Getter + @SerializedName(value = "endpoint") + private String endpoint; + + // Standard position constants (similar to Kafka's OFFSET_BEGINNING/OFFSET_END) + public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON"; + public static final String POSITION_LATEST = "LATEST"; + + // Configurable data source properties that can be set by user + // Keep compatibility with existing ALTER/SHOW output key "kinesis_endpoint". + private static final String LEGACY_KINESIS_ENDPOINT_KEY = "kinesis_endpoint"; + + private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = + new ImmutableSet.Builder<String>() + .add(KinesisConfiguration.KINESIS_REGION.getName()) + .add(KinesisConfiguration.KINESIS_ENDPOINT.getName()) + .add(LEGACY_KINESIS_ENDPOINT_KEY) + .add(KinesisConfiguration.KINESIS_STREAM.getName()) + .add(KinesisConfiguration.KINESIS_SHARDS.getName()) + .add(KinesisConfiguration.KINESIS_POSITIONS.getName()) + .add(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()) + .add(KinesisConfiguration.KINESIS_ACCESS_KEY.getName()) + .add(KinesisConfiguration.KINESIS_SECRET_KEY.getName()) + .add(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName()) + .add(KinesisConfiguration.KINESIS_ROLE_ARN.getName()) + .build(); + + public KinesisDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiLoad) { + super(dataSourceProperties, multiLoad); + } + + public KinesisDataSourceProperties(Map<String, String> originalDataSourceProperties) { + super(originalDataSourceProperties); + } + + @Override + protected String getDataSourceType() { + return LoadDataSourceType.KINESIS.name(); + } + + @Override + protected List<String> getRequiredProperties() { + return Arrays.asList( + KinesisConfiguration.KINESIS_REGION.getName(), + KinesisConfiguration.KINESIS_STREAM.getName() + ); + } + + @Override + public void convertAndCheckDataSourceProperties() throws UserException { + // Check for invalid properties - accept property.* parameters as pass-through + Optional<String> invalidProperty = originalDataSourceProperties.keySet().stream() + .filter(key -> !CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET.contains(key)) + .filter(key -> !key.startsWith("property.")) + .findFirst(); + if (invalidProperty.isPresent()) { + throw new AnalysisException(invalidProperty.get() + " is invalid Kinesis property or cannot be set"); + } + + // Parse region (required) + this.region = KinesisConfiguration.KINESIS_REGION.getParameterValue( + originalDataSourceProperties.get(KinesisConfiguration.KINESIS_REGION.getName())); + if (!isAlter() && StringUtils.isBlank(region)) { + throw new AnalysisException(KinesisConfiguration.KINESIS_REGION.getName() + " is a required property"); + } + if (StringUtils.isNotBlank(region)) { + validateRegion(region); + } + + // Parse custom endpoint (optional) + this.endpoint = KinesisConfiguration.KINESIS_ENDPOINT.getParameterValue( + originalDataSourceProperties.containsKey(KinesisConfiguration.KINESIS_ENDPOINT.getName()) + ? originalDataSourceProperties.get(KinesisConfiguration.KINESIS_ENDPOINT.getName()) + : originalDataSourceProperties.get(LEGACY_KINESIS_ENDPOINT_KEY)); + + // Parse stream name (required) + this.stream = KinesisConfiguration.KINESIS_STREAM.getParameterValue( + originalDataSourceProperties.get(KinesisConfiguration.KINESIS_STREAM.getName())); + if (!isAlter() && StringUtils.isBlank(stream)) { + throw new AnalysisException(KinesisConfiguration.KINESIS_STREAM.getName() + " is a required property"); + } + + // Parse custom properties (property.* parameters) + analyzeCustomProperties(); + + // Parse AWS credentials from direct parameters + parseAwsCredentials(); + + // Validate AWS authentication configuration + validateAwsAuthConfig(); + + // Parse shards + List<String> shards = KinesisConfiguration.KINESIS_SHARDS.getParameterValue( + originalDataSourceProperties.get(KinesisConfiguration.KINESIS_SHARDS.getName())); + if (CollectionUtils.isNotEmpty(shards)) { + analyzeKinesisShardProperty(shards); + } + + // Parse positions + List<String> positions = KinesisConfiguration.KINESIS_POSITIONS.getParameterValue( + originalDataSourceProperties.get(KinesisConfiguration.KINESIS_POSITIONS.getName())); + // Get default position from customKinesisProperties (already parsed from "property." prefix) + String defaultPositionString = customKinesisProperties.get("kinesis_default_pos"); + + // Validate that positions and default_position are not both set + if (CollectionUtils.isNotEmpty(positions) && StringUtils.isNotBlank(defaultPositionString)) { + throw new AnalysisException("Only one of " + KinesisConfiguration.KINESIS_POSITIONS.getName() + + " and property.kinesis_default_pos can be set."); + } + + // For alter operation, shards and positions must be set together + if (isAlter() && CollectionUtils.isNotEmpty(shards) && CollectionUtils.isEmpty(positions) + && StringUtils.isBlank(defaultPositionString)) { + throw new AnalysisException("Must set position or default position with shard property"); + } + + // Process positions + if (CollectionUtils.isNotEmpty(positions)) { + this.isPositionsForTimes = analyzeKinesisPositionProperty(positions); + return; + } + this.isPositionsForTimes = analyzeKinesisDefaultPositionProperty(); + if (CollectionUtils.isNotEmpty(kinesisShardPositions)) { + setDefaultPositionForShards(this.kinesisShardPositions, defaultPositionString, this.isPositionsForTimes); + } + } + + /** + * Validate AWS region format. + */ + private void validateRegion(String region) throws AnalysisException { + // AWS regions follow patterns like: us-east-1, eu-west-2, ap-southeast-1, cn-north-1 + if (!region.matches("^[a-z]{2}(-[a-z]+)?-[a-z]+-\\d$")) { + throw new AnalysisException("Invalid AWS region format: " + region + + ". Expected format like: us-east-1, eu-west-2, cn-north-1"); + } + } + + /** + * Parse and store custom Kinesis properties. + * All property.* parameters are passed through to BE. + */ + private void analyzeCustomProperties() throws AnalysisException { + this.customKinesisProperties = new HashMap<>(); + + // Store all property.* parameters (strip the "property." prefix for BE) + for (Map.Entry<String, String> entry : originalDataSourceProperties.entrySet()) { + String key = entry.getKey(); + if (key.startsWith("property.")) { + // Strip "property." prefix and pass through to BE + String actualKey = key.substring("property.".length()); + customKinesisProperties.put(actualKey, entry.getValue()); + } + } + } + + /** + * Parse AWS credentials from direct parameters and add to customKinesisProperties. + */ + private void parseAwsCredentials() { + String accessKey = originalDataSourceProperties.get(KinesisConfiguration.KINESIS_ACCESS_KEY.getName()); + if (StringUtils.isNotBlank(accessKey)) { + customKinesisProperties.put("aws.access_key", accessKey); + } + + String secretKey = originalDataSourceProperties.get(KinesisConfiguration.KINESIS_SECRET_KEY.getName()); + if (StringUtils.isNotBlank(secretKey)) { + customKinesisProperties.put("aws.secret_key", secretKey); + } + + String sessionToken = originalDataSourceProperties.get(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName()); + if (StringUtils.isNotBlank(sessionToken)) { + customKinesisProperties.put("aws.session_key", sessionToken); + } + + String roleArn = originalDataSourceProperties.get(KinesisConfiguration.KINESIS_ROLE_ARN.getName()); + if (StringUtils.isNotBlank(roleArn)) { + customKinesisProperties.put("aws.role_arn", roleArn); + } + } + + /** + * Validate AWS authentication configuration. + * At least one authentication method must be provided: + * 1. Access key + Secret key + * 2. IAM Role ARN + * 3. AWS Profile name + * 4. Default credential chain (EC2 instance profile, environment variables, etc.) + */ + private void validateAwsAuthConfig() throws AnalysisException { + String accessKey = customKinesisProperties.get("aws.access_key"); + String secretKey = customKinesisProperties.get("aws.secret_key"); + String roleArn = customKinesisProperties.get("aws.role_arn"); + + // If access key is provided, secret key must also be provided + if (StringUtils.isNotBlank(accessKey) && StringUtils.isBlank(secretKey)) { + throw new AnalysisException("When property.aws.access_key is set, property.aws.secret_key " + + "must also be set"); + } + if (StringUtils.isNotBlank(secretKey) && StringUtils.isBlank(accessKey)) { + throw new AnalysisException("When property.aws.secret_key is set, property.aws.access_key " + + "must also be set"); + } + + // If external ID is provided, role ARN must be provided + String externalId = customKinesisProperties.get("aws.external.id"); + if (StringUtils.isNotBlank(externalId) && StringUtils.isBlank(roleArn)) { + throw new AnalysisException("When property.aws.external.id is set, property.aws.role_arn must also be set"); + } + + // Note: We don't require any authentication config because the default credential chain + // can be used in EC2/EKS environments with instance profiles or service accounts + } + + /** + * Initialize shard positions with default values. + */ + private void analyzeKinesisShardProperty(List<String> shards) { + shards.forEach(shardId -> this.kinesisShardPositions.add(Pair.of(shardId, POSITION_LATEST))); + } + + /** + * Parse position property and set positions for each shard. + * All positions are interpreted as sequence-number semantics: + * TRIM_HORIZON, LATEST, or explicit sequence number. + */ + private boolean analyzeKinesisPositionProperty(List<String> positions) throws UserException { + if (positions.size() != kinesisShardPositions.size()) { + throw new AnalysisException("Number of shards must equal number of positions"); + } + + for (int i = 0; i < positions.size(); i++) { + String position = positions.get(i); + validatePosition(position); + kinesisShardPositions.get(i).second = position; + } + return false; + } + + /** + * Validate position value. + */ + private void validatePosition(String position) throws AnalysisException { + if (!position.equalsIgnoreCase(POSITION_TRIM_HORIZON) + && !position.equalsIgnoreCase(POSITION_LATEST) + && !isValidSequenceNumber(position)) { + throw new AnalysisException(KinesisConfiguration.KINESIS_POSITIONS.getName() + + " must be TRIM_HORIZON, LATEST, or a valid sequence number. Got: " + position); + } + } + + /** + * Check if the string is a valid Kinesis sequence number. + * Kinesis sequence numbers are numeric strings. + */ + private boolean isValidSequenceNumber(String position) { + try { + // Kinesis sequence numbers are large numeric strings + new java.math.BigInteger(position); + return true; + } catch (NumberFormatException e) { + return false; + } + } + + /** + * Analyze default position property. + * Default position uses sequence-number semantics: + * TRIM_HORIZON, LATEST, or explicit sequence number. + */ + private boolean analyzeKinesisDefaultPositionProperty() throws AnalysisException { + customKinesisProperties.putIfAbsent("kinesis_default_pos", POSITION_LATEST); Review Comment: This default is also injected for ALTER requests where the user did not provide `property.kinesis_default_pos`. For example, a job created with `property.kinesis_default_pos = TRIM_HORIZON` and then altered only to change `aws.region` or `aws.endpoint` arrives here with no default in `originalDataSourceProperties`, so this line adds `kinesis_default_pos=LATEST`; `modifyPropertiesInternal()` then merges it into the persisted job properties and `convertCustomProperties(true)` updates `kinesisDefaultPosition`. That silently changes the job definition and, after any source reset/new initial shard discovery, can start from `LATEST` instead of the user's existing default. Please only synthesize the create-time default on CREATE, or preserve the existing job value when ALTER omits this property. ########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java: ########## @@ -0,0 +1,893 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload.kinesis; + +import org.apache.doris.analysis.ExprToSqlVisitor; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ToSqlParams; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.datasource.kinesis.KinesisUtil; +import org.apache.doris.load.routineload.ErrorReason; +import org.apache.doris.load.routineload.LoadDataSourceType; +import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadTaskInfo; +import org.apache.doris.load.routineload.ScheduleRule; +import org.apache.doris.nereids.load.NereidsImportColumnDesc; +import org.apache.doris.nereids.load.NereidsLoadTaskInfo; +import org.apache.doris.nereids.load.NereidsLoadUtils; +import org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * KinesisRoutineLoadJob is a RoutineLoadJob that fetches data from AWS Kinesis streams. + * + * Key concepts: + * - Stream: Named collection of data records (similar to Kafka topic) + * - Shard: Sequence of data records in a stream (similar to Kafka partition) + * - Sequence Number: Unique identifier for each record within a shard (similar to Kafka offset) + * - Consumer: Application that reads from a stream + * + * The progress tracks sequence numbers for each shard, represented as: + * {"shardId-000000000000": "49590338271490256608559692538361571095921575989136588802", ...} + */ +public class KinesisRoutineLoadJob extends RoutineLoadJob { + private static final Logger LOG = LogManager.getLogger(KinesisRoutineLoadJob.class); + + public static final String KINESIS_FILE_CATALOG = "kinesis"; + + @SerializedName("rg") + private String region; + @SerializedName("stm") + private String stream; + @SerializedName("ep") + private String endpoint; + + // optional, user want to load shards(Kafka's cskp). + @SerializedName("csks") + private List<String> customKinesisShards = Lists.newArrayList(); + + // OPEN shards - actively receiving new data + @SerializedName("opks") + private List<String> openKinesisShards = Lists.newArrayList(); + + // CLOSED shards with unconsumed data - no longer receiving new data but still have data to consume + @SerializedName("clks") + private List<String> closedKinesisShards = Lists.newArrayList(); + + // Default starting position for new shards. + // Values: TRIM_HORIZON, LATEST, or a timestamp string. + private String kinesisDefaultPosition = ""; + + // custom Kinesis properties including AWS credentials and client settings. + @SerializedName("prop") + private Map<String, String> customProperties = Maps.newHashMap(); + private Map<String, String> convertedCustomProperties = Maps.newHashMap(); + + // The latest offset of each partition fetched from kinesis server. + // Will be updated periodically by calling hasMoreDataToConsume() + private Map<String, Long> cachedShardWithMillsBehindLatest = Maps.newConcurrentMap(); + + // newly discovered shards from Kinesis. + private List<String> newCurrentKinesisShards = Lists.newArrayList(); + + public KinesisRoutineLoadJob() { + // For serialization + super(-1, LoadDataSourceType.KINESIS); + } + + public KinesisRoutineLoadJob(Long id, String name, long dbId, long tableId, + String region, String stream, UserIdentity userIdentity) { + super(id, name, dbId, tableId, LoadDataSourceType.KINESIS, userIdentity); + this.region = region; + this.stream = stream; + this.progress = new KinesisProgress(); + } + + public KinesisRoutineLoadJob(Long id, String name, long dbId, + String region, String stream, + UserIdentity userIdentity, boolean isMultiTable) { + super(id, name, dbId, LoadDataSourceType.KINESIS, userIdentity); + this.region = region; + this.stream = stream; + this.progress = new KinesisProgress(); + setMultiTable(isMultiTable); + } + + public String getRegion() { + return region; + } + + public String getStream() { + return stream; + } + + public String getEndpoint() { + return endpoint; + } + + public Map<String, String> getConvertedCustomProperties() { + return convertedCustomProperties; + } + + @Override + public void prepare() throws UserException { + // should reset converted properties each time the job being prepared. + // because the file info can be changed anytime. + convertCustomProperties(true); + } + + private void convertCustomProperties(boolean rebuild) throws DdlException { + if (customProperties.isEmpty()) { + return; + } + + if (!rebuild && !convertedCustomProperties.isEmpty()) { + return; + } + + if (rebuild) { + convertedCustomProperties.clear(); + } + + for (Map.Entry<String, String> entry : customProperties.entrySet()) { + convertedCustomProperties.put(entry.getKey(), entry.getValue()); + } + + // Handle default position + if (convertedCustomProperties.containsKey("kinesis_default_pos")) { + kinesisDefaultPosition = convertedCustomProperties.get("kinesis_default_pos"); + // Keep it in convertedCustomProperties so BE can use it + } + } + + private String convertedDefaultPosition() { + if (this.kinesisDefaultPosition.isEmpty()) { + return KinesisProgress.POSITION_LATEST; + } + return this.kinesisDefaultPosition; + } + + @Override + public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException { + List<RoutineLoadTaskInfo> result = new ArrayList<>(); + writeLock(); + try { + if (state == JobState.NEED_SCHEDULE) { + // Combine open and closed shards for task assignment + List<String> allShards = Lists.newArrayList(); + allShards.addAll(openKinesisShards); + allShards.addAll(closedKinesisShards); + + // Divide shards into tasks + for (int i = 0; i < currentConcurrentTaskNum; i++) { + Map<String, String> taskKinesisProgress = Maps.newHashMap(); + for (int j = i; j < allShards.size(); j = j + currentConcurrentTaskNum) { + String shardId = allShards.get(j); + taskKinesisProgress.put(shardId, + ((KinesisProgress) progress).getSequenceNumberByShard(shardId)); + } + KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(UUID.randomUUID(), id, + getTimeout() * 1000, taskKinesisProgress, isMultiTable(), -1, false); + routineLoadTaskInfoList.add(kinesisTaskInfo); + result.add(kinesisTaskInfo); + } + // Change job state to running + if (!result.isEmpty()) { + unprotectUpdateState(JobState.RUNNING, null, false); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignore to divide routine load job while job state {}", state); + } + } + // Save task into queue of needScheduleTasks + Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(result); + } finally { + writeUnlock(); + } + } + + @Override + public int calculateCurrentConcurrentTaskNum() { + int shardNum = openKinesisShards.size() + closedKinesisShards.size(); + if (desireTaskConcurrentNum == 0) { + desireTaskConcurrentNum = Config.max_routine_load_task_concurrent_num; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("current concurrent task number is min" + + "(shard num: {}, desire task concurrent num: {}, config: {})", + shardNum, desireTaskConcurrentNum, Config.max_routine_load_task_concurrent_num); + } + currentTaskConcurrentNum = Math.min(shardNum, Math.min(desireTaskConcurrentNum, + Config.max_routine_load_task_concurrent_num)); + return currentTaskConcurrentNum; + } + + @Override + protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason) { + if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED + || txnState.getTransactionStatus() == TransactionStatus.VISIBLE) { + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("no need to update the progress of kinesis routine load. txn status: {}, " + + "txnStatusChangeReason: {}, task: {}, job: {}", + txnState.getTransactionStatus(), txnStatusChangeReason, + DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id); + } + return false; + } + + private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) { + KinesisProgress taskProgress = (KinesisProgress) attachment.getProgress(); + + // Keep the latest observed MillisBehindLatest per shard instead of the historical max. + taskProgress.getShardIdToMillsBehindLatest().forEach(cachedShardWithMillsBehindLatest::put); + + // Handle closed shards: move from open to closed list + if (taskProgress.getClosedShardIds() != null && !taskProgress.getClosedShardIds().isEmpty()) { + for (String closedShardId : taskProgress.getClosedShardIds()) { + if (openKinesisShards.remove(closedShardId)) { + if (!closedKinesisShards.contains(closedShardId)) { + closedKinesisShards.add(closedShardId); + LOG.info("Moved shard from open to closed: {}, job: {}", closedShardId, id); + } + } + } + } + + // Update progress (this will remove fully consumed shards from progress) + this.progress.update(attachment); + + if (taskProgress.getClosedShardIds() != null && !taskProgress.getClosedShardIds().isEmpty()) { + taskProgress.getClosedShardIds().forEach(cachedShardWithMillsBehindLatest::remove); + } + + // Remove fully consumed shards from closed list + closedKinesisShards.removeIf(shardId -> !((KinesisProgress) progress).containsShard(shardId)); + } + + @Override + protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { + updateProgressAndOffsetsCache(attachment); + super.updateProgress(attachment); + } + + @Override + protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { + super.replayUpdateProgress(attachment); + updateProgressAndOffsetsCache(attachment); + } + + @Override + protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo, boolean delaySchedule) { + KinesisTaskInfo oldKinesisTaskInfo = (KinesisTaskInfo) routineLoadTaskInfo; + // Add new task + KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(oldKinesisTaskInfo, + ((KinesisProgress) progress).getShardIdToSequenceNumber(oldKinesisTaskInfo.getShards()), + isMultiTable()); + kinesisTaskInfo.setDelaySchedule(delaySchedule); + // Remove old task + routineLoadTaskInfoList.remove(routineLoadTaskInfo); + // Add new task + routineLoadTaskInfoList.add(kinesisTaskInfo); + return kinesisTaskInfo; + } + + @Override + protected void unprotectUpdateProgress() throws UserException { + updateNewShardProgress(); + } + + @Override + protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException { + // For Kinesis, we refresh shards instead of Kafka partitions + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) { + if (customKinesisShards != null && !customKinesisShards.isEmpty()) { + return true; + } + return updateKinesisShards(); + } + return true; + } + + private boolean updateKinesisShards() throws UserException { + try { + this.newCurrentKinesisShards = getAllKinesisShards(); + } catch (Exception e) { + String msg = e.getMessage() + + " may be Kinesis properties set in job is error" + + " or no shard in this stream that should check Kinesis"; + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("error_msg", msg) + .build(), e); + if (this.state == JobState.NEED_SCHEDULE) { + unprotectUpdateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), + false /* not replay */); + } + return false; + } + return true; + } + + @Override + protected boolean unprotectNeedReschedule() throws UserException { + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + return isKinesisShardsChanged(); + } + return false; + } + + private boolean isKinesisShardsChanged() throws UserException { + if (CollectionUtils.isNotEmpty(customKinesisShards)) { + if (Config.isCloudMode() && (openKinesisShards.isEmpty() && closedKinesisShards.isEmpty())) { + updateCloudProgress(); + } + openKinesisShards = customKinesisShards; + closedKinesisShards.clear(); + return false; + } + + Preconditions.checkNotNull(this.newCurrentKinesisShards); + + // newCurrentKinesisShards contains only OPEN shards. When an existing shard disappears + // from this list but still has progress, it has become a retired parent shard after + // split/merge and must continue draining from closedKinesisShards. + Set<String> newShards = new HashSet<>(this.newCurrentKinesisShards); + if (syncShardTrackingFromLatestOpenShards(newShards)) { + return true; + } + + // Check if progress is consistent for all tracked shards (open + closed) + Set<String> allTrackedShards = new HashSet<>(newShards); + allTrackedShards.addAll(closedKinesisShards); + for (String shardId : allTrackedShards) { + if (!((KinesisProgress) progress).containsShard(shardId)) { + return true; + } + } + return false; + } + + private boolean syncShardTrackingFromLatestOpenShards(Set<String> latestOpenShards) { + Set<String> currentOpenShards = new HashSet<>(openKinesisShards); + Set<String> currentClosedShards = new HashSet<>(closedKinesisShards); + Set<String> updatedClosedShards = new HashSet<>(currentClosedShards); + + for (String shardId : currentOpenShards) { + if (!latestOpenShards.contains(shardId) && ((KinesisProgress) progress).containsShard(shardId)) { + if (updatedClosedShards.add(shardId)) { + LOG.info("Moved shard from open to closed after shard refresh: {}, job: {}", shardId, id); + } + } + } + + boolean openChanged = !latestOpenShards.equals(currentOpenShards); + boolean closedChanged = !updatedClosedShards.equals(currentClosedShards); + if (!openChanged && !closedChanged) { + return false; + } + + openKinesisShards = new ArrayList<>(latestOpenShards); + closedKinesisShards = new ArrayList<>(updatedClosedShards); + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("open_kinesis_shards", Joiner.on(",").join(openKinesisShards)) + .add("closed_kinesis_shards", Joiner.on(",").join(closedKinesisShards)) + .add("msg", "kinesis shards changed") + .build()); + } + return true; + } + + @Override + protected boolean needAutoResume() { + writeLock(); + try { + if (this.state == JobState.PAUSED) { + return ScheduleRule.isNeedAutoSchedule(this); + } + return false; + } finally { + writeUnlock(); + } + } + + @Override + public String getStatistic() { + Map<String, Object> summary = this.jobStatistic.summary(); + readLock(); + try { + summary.put("openShardNum", openKinesisShards.size()); + summary.put("closedShardNum", closedKinesisShards.size()); + summary.put("trackedShardNum", ((KinesisProgress) progress).getShardIdToSequenceNumber().size()); + summary.put("cachedMillisBehindLatestShardNum", cachedShardWithMillsBehindLatest.size()); + summary.put("totalMillisBehindLatest", totalLag()); + long maxMillisBehindLatest = cachedShardWithMillsBehindLatest.values().stream() + .filter(lag -> lag >= 0) + .mapToLong(v -> v) + .max() + .orElse(-1L); + summary.put("maxMillisBehindLatest", maxMillisBehindLatest); + } finally { + readUnlock(); + } + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(summary); + } + + /** + * Get all shards from the Kinesis stream. + * Delegates to a BE node via gRPC, which calls AWS ListShards API using the SDK. + */ + private List<String> getAllKinesisShards() throws UserException { + convertCustomProperties(false); + if (!customKinesisShards.isEmpty()) { + return customKinesisShards; + } + return KinesisUtil.getAllKinesisShards(region, stream, endpoint, convertedCustomProperties); + } + + /** + * Create a KinesisRoutineLoadJob from CreateRoutineLoadInfo. + */ + public static KinesisRoutineLoadJob fromCreateInfo(CreateRoutineLoadInfo info, ConnectContext ctx) + throws UserException { + if (Config.isCloudMode()) { + throw new DdlException("Kinesis routine load does not support cloud mode"); + } + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDBName()); + + long id = Env.getCurrentEnv().getNextId(); + KinesisDataSourceProperties kinesisProperties = + (KinesisDataSourceProperties) info.getDataSourceProperties(); + KinesisRoutineLoadJob kinesisRoutineLoadJob; + + if (kinesisProperties.isMultiTable()) { + kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(), + db.getId(), + kinesisProperties.getRegion(), kinesisProperties.getStream(), + ctx.getCurrentUserIdentity(), true); + } else { + OlapTable olapTable = db.getOlapTableOrDdlException(info.getTableName()); + checkMeta(olapTable, info.getRoutineLoadDesc()); + // Check load_to_single_tablet compatibility + if (info.isLoadToSingleTablet() + && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new DdlException( + "if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + long tableId = olapTable.getId(); + kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(), + db.getId(), tableId, + kinesisProperties.getRegion(), kinesisProperties.getStream(), + ctx.getCurrentUserIdentity()); + } + + kinesisRoutineLoadJob.setOptional(info); + kinesisRoutineLoadJob.checkCustomProperties(); + + return kinesisRoutineLoadJob; + } + + private void checkCustomProperties() throws DdlException { + // Validate custom properties if needed + } + + private void updateNewShardProgress() throws UserException { + // Check if this is initial setup (no shards in progress yet) + boolean isInitialSetup = !((KinesisProgress) progress).hasShards(); + + // Combine open and closed shards + List<String> allShards = Lists.newArrayList(); + allShards.addAll(openKinesisShards); + allShards.addAll(closedKinesisShards); + + for (String shardId : allShards) { + if (!((KinesisProgress) progress).containsShard(shardId)) { + String startPosition; + + if (isInitialSetup) { + // Initial shards: use user-configured default position + startPosition = convertedDefaultPosition(); + } else { + // New shards discovered later: always use TRIM_HORIZON to avoid data loss + startPosition = KinesisProgress.TRIM_HORIZON_VAL; + LOG.info("New shard detected: {}, starting from TRIM_HORIZON to avoid data loss", + shardId); + } + + ((KinesisProgress) progress).addShardPosition(Pair.of(shardId, startPosition)); + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("kinesis_shard_id", shardId) + .add("begin_position", startPosition) + .add("is_initial_setup", isInitialSetup) + .add("msg", "The new shard has been added in job")); + } + } + } + } + + private List<Pair<String, String>> getNewShardPositionsFromDefault(List<String> newShards) + throws UserException { + List<Pair<String, String>> shardPositions = Lists.newArrayList(); + String defaultPosition = convertedDefaultPosition(); + for (String shardId : newShards) { + shardPositions.add(Pair.of(shardId, defaultPosition)); + } + return shardPositions; + } + + protected void setOptional(CreateRoutineLoadInfo info) throws UserException { + super.setOptional(info); + KinesisDataSourceProperties kinesisDataSourceProperties = + (KinesisDataSourceProperties) info.getDataSourceProperties(); + + // Set endpoint if provided + if (kinesisDataSourceProperties.getEndpoint() != null) { + this.endpoint = kinesisDataSourceProperties.getEndpoint(); + } + + // Set custom shards and positions + if (CollectionUtils.isNotEmpty(kinesisDataSourceProperties.getKinesisShardPositions())) { + setCustomKinesisShards(kinesisDataSourceProperties); + } + + // Set custom properties + if (MapUtils.isNotEmpty(kinesisDataSourceProperties.getCustomKinesisProperties())) { + setCustomKinesisProperties(kinesisDataSourceProperties.getCustomKinesisProperties()); + } + } + + private void setCustomKinesisShards(KinesisDataSourceProperties kinesisDataSourceProperties) throws LoadException { + List<Pair<String, String>> shardPositions = kinesisDataSourceProperties.getKinesisShardPositions(); + for (Pair<String, String> shardPosition : shardPositions) { + this.customKinesisShards.add(shardPosition.first); + ((KinesisProgress) progress).addShardPosition(shardPosition); + } + } + + private void setCustomKinesisProperties(Map<String, String> kinesisProperties) { + this.customProperties = kinesisProperties; + } + + @Override + public String dataSourcePropertiesJsonToString() { + Map<String, String> dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("region", region); + dataSourceProperties.put("stream", stream); + if (endpoint != null) { + dataSourceProperties.put("endpoint", endpoint); + } + List<String> sortedOpenShards = Lists.newArrayList(openKinesisShards); + Collections.sort(sortedOpenShards); + dataSourceProperties.put("openKinesisShards", Joiner.on(",").join(sortedOpenShards)); + + List<String> sortedClosedShards = Lists.newArrayList(closedKinesisShards); + Collections.sort(sortedClosedShards); + dataSourceProperties.put("closedKinesisShards", Joiner.on(",").join(sortedClosedShards)); + + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(dataSourceProperties); + } + + @Override + public String customPropertiesJsonToString() { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + // Mask sensitive information + Map<String, String> maskedProperties = new HashMap<>(customProperties); + if (maskedProperties.containsKey("aws.secret_key")) { + maskedProperties.put("aws.secret_key", "******"); + } + if (maskedProperties.containsKey("aws.session_key")) { + maskedProperties.put("aws.session_key", "******"); + } + return gson.toJson(maskedProperties); + } + + @Override + public Map<String, String> getDataSourceProperties() { + Map<String, String> dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(KinesisConfiguration.KINESIS_REGION.getName(), region); + dataSourceProperties.put("kinesis_stream", stream); + if (endpoint != null) { + dataSourceProperties.put("kinesis_endpoint", endpoint); + } + return dataSourceProperties; + } + + @Override + public Map<String, String> getCustomProperties() { + Map<String, String> ret = new HashMap<>(); + customProperties.forEach((k, v) -> { + // Mask sensitive values + if (k.equals("aws.secret_key") || k.equals("aws.session_key")) { + ret.put("property." + k, "******"); + } else { + ret.put("property." + k, v); + } + }); + return ret; + } + + @Override + public void modifyProperties(AlterRoutineLoadCommand command) throws UserException { + Map<String, String> jobProperties = command.getAnalyzedJobProperties(); + KinesisDataSourceProperties dataSourceProperties = + (KinesisDataSourceProperties) command.getDataSourceProperties(); + + writeLock(); + try { + if (getState() != JobState.PAUSED) { + throw new DdlException("Only supports modification of PAUSED jobs"); + } + + modifyPropertiesInternal(jobProperties, dataSourceProperties); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, + jobProperties, dataSourceProperties); + Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); + } finally { + writeUnlock(); + } + } + + private void modifyPropertiesInternal(Map<String, String> jobProperties, + KinesisDataSourceProperties dataSourceProperties) + throws UserException { + if (dataSourceProperties != null) { + List<Pair<String, String>> shardPositions = Lists.newArrayList(); + Map<String, String> customKinesisProperties = Maps.newHashMap(); + boolean resetProgress = false; + boolean hasExplicitShardPositions = false; + + if (MapUtils.isNotEmpty(dataSourceProperties.getOriginalDataSourceProperties())) { + shardPositions = dataSourceProperties.getKinesisShardPositions(); + customKinesisProperties = dataSourceProperties.getCustomKinesisProperties(); + hasExplicitShardPositions = !shardPositions.isEmpty(); + } + + // Update custom properties + if (!customKinesisProperties.isEmpty()) { + this.customProperties.putAll(customKinesisProperties); + convertCustomProperties(true); + } + + // Modify stream if provided + if (!Strings.isNullOrEmpty(dataSourceProperties.getStream())) { + this.stream = dataSourceProperties.getStream(); + resetProgress = true; + } + + // Modify region if provided + if (!Strings.isNullOrEmpty(dataSourceProperties.getRegion())) { Review Comment: Changing the region changes the Kinesis stream identity just as much as changing the stream name, but `resetProgress` remains false here. A paused job on `us-east-1/streamA` with tracked shard IDs and sequence numbers can be altered to `us-west-2/streamA`; after resume, FE keeps `openKinesisShards`, `closedKinesisShards`, `customKinesisShards`, and `progress` from the old stream and sends those old shard IDs to BE. `GetShardIterator` will then fail with a shard-not-found error (or, if IDs overlap, resume from an unrelated sequence), leaving the job paused or consuming the wrong position. The endpoint branch below has the same problem. Region/endpoint changes should reset progress and clear dynamic/custom shard tracking unless explicit shard positions for the new source are supplied. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
