yihua commented on code in PR #18224: URL: https://github.com/apache/hudi/pull/18224#discussion_r2842613314
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.LOCALSTACK_END_SEQ_SENTINEL; + +/** + * Source to read JSON data from AWS Kinesis Data Streams using Spark. + */ +@Slf4j +public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** Result from reading a single shard in a partition. */ + @AllArgsConstructor + @Getter + private static class ShardFetchResult implements Serializable { + private final List<String> records; + private final String shardId; + private final Option<String> lastSequenceNumber; + private final boolean reachedEndOfShard; + } + + /** Metadata-only summary for checkpoint; avoids bringing records to driver. */ + @AllArgsConstructor + @Getter + private static class ShardFetchSummary implements Serializable { + private final String shardId; + private final Option<String> lastSequenceNumber; + private final int recordCount; + private final boolean reachedEndOfShard; + } + + /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid memory leak. */ + private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> persistedFetchRdd; + /** Record count from fetch, avoids redundant batch.count() Spark job. */ + private long lastRecordCount; + /** Shard IDs where the executor observed nextShardIterator==null (end-of-shard reached). */ + protected Set<String> shardsReachedEnd; + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + this(properties, sparkContext, sparkSession, metrics, + new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, + new DefaultStreamContext(streamContext.getSchemaProvider(), streamContext.getSourceProfileSupplier())); + this.offsetGen = new KinesisOffsetGen(props); + } + + @Override + protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRanges) { + KinesisReadConfig readConfig = new KinesisReadConfig( + offsetGen.getStreamName(), + offsetGen.getRegion(), + offsetGen.getEndpointUrl().orElse(null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null), + offsetGen.getStartingPosition(), + shouldAddOffsets, + getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION), + getIntWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_MAX_RECORDS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS), + // Evenly set the max events per shard. + shardRanges.length > 0 ? Math.max(1, getLongWithAltKeys(props, KinesisSourceConfig.MAX_EVENTS_FROM_KINESIS_SOURCE) / shardRanges.length) : Long.MAX_VALUE); + + JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize( + java.util.Arrays.asList(shardRanges), shardRanges.length) + .mapPartitions(shardRangeIt -> { + List<ShardFetchResult> results = new ArrayList<>(); + try (KinesisClient client = createKinesisClientFromConfig(readConfig)) { + while (shardRangeIt.hasNext()) { + KinesisOffsetGen.KinesisShardRange range = shardRangeIt.next(); + KinesisOffsetGen.ShardReadResult readResult = KinesisOffsetGen.readShardRecords( + client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), + readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMs(), readConfig.getMaxRecordsPerShard(), + readConfig.isEnableDeaggregation()); + + List<String> recordStrings = new ArrayList<>(); + for (Record r : readResult.getRecords()) { + String json = recordToJsonStatic(r, range.getShardId(), readConfig.isShouldAddOffsets()); + if (json != null) { + recordStrings.add(json); + } + } + results.add(new ShardFetchResult(recordStrings, range.getShardId(), + readResult.getLastSequenceNumber(), readResult.isReachedEndOfShard())); + } + } + return results.iterator(); + }); + + // Cache so we can both get records and checkpoint from the same RDD + fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK()); + persistedFetchRdd = fetchRdd; + + // RDD for the shard data. + JavaRDD<String> recordRdd = fetchRdd.flatMap(r -> r.getRecords().iterator()); + // Apply minimum partitions for downstream parallelism (similar to Kafka source) + long minPartitions = getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_SOURCE_MIN_PARTITIONS); + if (minPartitions > 0 && minPartitions > shardRanges.length) { + int targetPartitions = (int) minPartitions; + log.info("Repartitioning from {} shards to {} partitions (minPartitions={})", + shardRanges.length, targetPartitions, minPartitions); + recordRdd = recordRdd.repartition(targetPartitions); + } + + // Information for checkpoint generation. + List<ShardFetchSummary> summaries = fetchRdd + .map(r -> new ShardFetchSummary(r.getShardId(), r.getLastSequenceNumber(), r.getRecords().size(), + r.isReachedEndOfShard())) + .collect(); + lastCheckpointData = buildCheckpointFromSummaries(summaries); + Set<String> reached = new HashSet<>(); + for (ShardFetchSummary s : summaries) { + if (s.isReachedEndOfShard()) { + reached.add(s.getShardId()); + } + } + shardsReachedEnd = reached; + lastRecordCount = summaries.stream().mapToLong(ShardFetchSummary::getRecordCount).sum(); + + return recordRdd; + } + + private static KinesisClient createKinesisClientFromConfig(KinesisReadConfig config) { + software.amazon.awssdk.services.kinesis.KinesisClientBuilder builder = + KinesisClient.builder().region(software.amazon.awssdk.regions.Region.of(config.getRegion())); + if (config.getEndpointUrl() != null && !config.getEndpointUrl().isEmpty()) { + builder = builder.endpointOverride(java.net.URI.create(config.getEndpointUrl())); + } + if (config.getAccessKey() != null && !config.getAccessKey().isEmpty() + && config.getSecretKey() != null && !config.getSecretKey().isEmpty()) { + builder = builder.credentialsProvider( + software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create( + config.getAccessKey(), config.getSecretKey()))); + } + return builder.build(); + } + + private static String recordToJsonStatic(Record record, String shardId, boolean shouldAddOffsets) { + String dataStr = record.data().asUtf8String(); + // Pure empty or null records in Kinesis is not meaningful. + if (dataStr == null || dataStr.trim().isEmpty()) { + return null; + } + if (shouldAddOffsets) { + try { + ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(dataStr); + node.put("_hoodie_kinesis_source_sequence_number", record.sequenceNumber()); + node.put("_hoodie_kinesis_source_shard_id", shardId); + node.put("_hoodie_kinesis_source_partition_key", record.partitionKey()); + if (record.approximateArrivalTimestamp() != null) { + node.put("_hoodie_kinesis_source_timestamp", + record.approximateArrivalTimestamp().toEpochMilli()); + } + return OBJECT_MAPPER.writeValueAsString(node); + } catch (Exception e) { Review Comment: This `catch (Exception e)` silently drops the error and returns the raw string without any logging. If offset appending fails (e.g., data isn't valid JSON, or it's a JSON array rather than an object), you'd have no way to tell why some records have offsets and others don't. Could you at least log a warning here so data quality issues are debuggable? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") Review Comment: should be removed, same for other configs. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") + .withDocumentation("Kinesis Data Streams stream name."); + + public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty + .key(PREFIX + "region") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "region") + .withDocumentation("AWS region for the Kinesis stream (e.g., us-east-1)."); + + public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = ConfigProperty + .key(PREFIX + "endpoint.url") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "endpoint.url") + .markAdvanced() + .withDocumentation("Custom endpoint URL for Kinesis (e.g., for localstack). " + + "If not set, uses the default AWS endpoint for the region."); + + public static final ConfigProperty<String> KINESIS_ACCESS_KEY = ConfigProperty + .key(PREFIX + "access.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "access.key") + .markAdvanced() + .withDocumentation("AWS access key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<String> KINESIS_SECRET_KEY = ConfigProperty + .key(PREFIX + "secret.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "secret.key") + .markAdvanced() + .withDocumentation("AWS secret key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents") + .defaultValue(5000000L) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents") + .markAdvanced() + .withDocumentation("Maximum number of records obtained in each batch from Kinesis."); + + public static final ConfigProperty<Long> KINESIS_SOURCE_MIN_PARTITIONS = ConfigProperty + .key(PREFIX + "minPartitions") + .defaultValue(0L) + .withAlternatives(OLD_PREFIX + "minPartitions") Review Comment: ```suggestion .withAlternatives(OLD_PREFIX + "min.partitions") ``` ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java: ########## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; + +/** + * Helper for reading from Kinesis Data Streams and managing checkpoints. + * Checkpoint format: streamName,shardId:sequenceNumber,shardId:sequenceNumber,... + */ +@Slf4j +@Getter +public class KinesisOffsetGen { + + public static class CheckpointUtils { + /** Separator between lastSeq and endSeq for closed shards. Seq numbers are numeric, so this is safe. */ + private static final String END_SEQ_SEPARATOR = "|"; + /** + * Kinesis checkpoint pattern. + * Format: streamName,shardId:lastSeq,shardId:lastSeq|endSeq,... + * For closed shards we store lastSeq|endSeq so we can detect data loss when shard expires. + */ + private static final Pattern PATTERN = Pattern.compile(".*,.*:.*"); + + /** + * Parse checkpoint string to shardId -> value map. Value is lastSeq or lastSeq|endSeq for closed shards. + */ + public static Map<String, String> strToOffsets(String checkpointStr) { + Map<String, String> offsetMap = new HashMap<>(); + String[] splits = checkpointStr.split(","); + for (int i = 1; i < splits.length; i++) { + String part = splits[i]; + int colonIdx = part.indexOf(':'); + if (colonIdx > 0 && colonIdx < part.length() - 1) { + String shardId = part.substring(0, colonIdx); + String value = part.substring(colonIdx + 1); + offsetMap.put(shardId, value); + } + } + return offsetMap; + } + + /** + * Extract lastSeq from checkpoint value (which may be "lastSeq" or "lastSeq|endSeq"). + */ + public static String getLastSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return value; + } + int sep = value.indexOf(END_SEQ_SEPARATOR); + return sep >= 0 ? value.substring(0, sep) : value; + } + + /** + * Extract endSeq from checkpoint value if present. Returns null for open shards. + */ + public static String getEndSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return null; + } + int sep = value.indexOf(END_SEQ_SEPARATOR); + return sep >= 0 && sep < value.length() - 1 ? value.substring(sep + 1) : null; + } + + /** + * Build checkpoint value: "lastSeq" or "lastSeq|endSeq" when endSeq is present (closed shards). + */ + public static String buildCheckpointValue(String lastSeq, String endSeq) { + if (endSeq != null && !endSeq.isEmpty()) { + return lastSeq + END_SEQ_SEPARATOR + endSeq; + } + return lastSeq; + } + + /** + * String representation of checkpoint. + * Format: streamName,shardId:value,shardId:value,... where value is lastSeq or lastSeq|endSeq. + */ + public static String offsetsToStr(String streamName, Map<String, String> shardToValue) { + String parts = shardToValue.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + return streamName + "," + parts; + } + + public static boolean checkStreamCheckpoint(Option<String> lastCheckpointStr) { + return lastCheckpointStr.isPresent() && PATTERN.matcher(lastCheckpointStr.get()).matches(); + } + } + + /** LocalStack returns Long.MAX_VALUE for closed shards' endingSequenceNumber; real AWS returns actual value. */ + public static final String LOCALSTACK_END_SEQ_SENTINEL = "9223372036854775807"; + + /** + * Represents a shard to read from, with optional starting sequence number. + * For closed shards, endingSequenceNumber is set so we can store it in the checkpoint + * and later detect data loss when the shard expires. + */ + @AllArgsConstructor + @Getter + public static class KinesisShardRange implements java.io.Serializable { + private final String shardId; + /** If empty, use TRIM_HORIZON or LATEST based on config. */ + private final Option<String> startingSequenceNumber; + /** For closed shards: the shard's ending sequence number. Empty for open shards. */ + private final Option<String> endingSequenceNumber; + + public static KinesisShardRange of(String shardId, Option<String> seqNum) { + return new KinesisShardRange(shardId, seqNum, Option.empty()); + } + + public static KinesisShardRange of(String shardId, Option<String> seqNum, Option<String> endSeq) { + return new KinesisShardRange(shardId, seqNum, endSeq); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * Uses conservative default (useLatestWhenNoCheckpoint=false). + */ + public boolean hasUnreadRecords() { + return hasUnreadRecords(false); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * <ul> + * <li>Open shard: always true (may have new records)</li> + * <li>Closed shard, lastSeq >= endSeq: false (fully consumed)</li> + * <li>Closed shard, no checkpoint and useLatest: false (at LATEST tip, closed shard has no records)</li> + * <li>Closed shard with LocalStack endSeq sentinel and lastSeq equals sentinel: false (fully consumed)</li> + * <li>Otherwise: true (may have unread records or cannot definitively say)</li> + * </ul> + * + * @param useLatestWhenNoCheckpoint when startingSequenceNumber is empty, true means we use LATEST + * (start at tip); for a closed shard there are no records to read + */ + public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) { + String lastSeq = startingSequenceNumber.orElse(null); + String endSeq = endingSequenceNumber.orElse(null); + + // Open shard: may have records + if (endSeq == null || endSeq.isEmpty()) { + return true; + } + + // Closed shard with no checkpoint + if (lastSeq == null || lastSeq.isEmpty()) { + return !useLatestWhenNoCheckpoint; + } + + // Closed shard: lastSeq >= endSeq means fully consumed + if (lastSeq.compareTo(endSeq) >= 0) { + return false; + } + + // LocalStack sentinel: when lastSeq equals sentinel, we've fully consumed + if (LOCALSTACK_END_SEQ_SENTINEL.equals(endSeq) && LOCALSTACK_END_SEQ_SENTINEL.equals(lastSeq)) { + return false; + } + + // lastSeq < endSeq or ambiguous (e.g. LocalStack sentinel): may have unread records + return true; + } + } + + private final String streamName; + private final String region; + private final Option<String> endpointUrl; + private final KinesisSourceConfig.KinesisStartingPosition startingPosition; + private final TypedProperties props; + + public KinesisOffsetGen(TypedProperties props) { + this.props = props; + checkRequiredConfigProperties(props, + Arrays.asList(KinesisSourceConfig.KINESIS_STREAM_NAME, KinesisSourceConfig.KINESIS_REGION)); + this.streamName = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_STREAM_NAME); + this.region = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_REGION); + this.endpointUrl = Option.ofNullable(getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ENDPOINT_URL, null)); + String posStr = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_STARTING_POSITION, true); + String normalized = posStr.toUpperCase().replace("EARLIEST", "TRIM_HORIZON"); + this.startingPosition = KinesisSourceConfig.KinesisStartingPosition.valueOf(normalized); + } + + public KinesisClient createKinesisClient() { + KinesisClientBuilder builder = KinesisClient.builder().region(Region.of(region)); + if (endpointUrl.isPresent()) { + builder = builder.endpointOverride(URI.create(endpointUrl.get())); + } + String accessKey = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null); + String secretKey = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null); + if (accessKey != null && !accessKey.isEmpty() && secretKey != null && !secretKey.isEmpty()) { + builder = builder.credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); + } + return builder.build(); + } + + /** + * List all active shards for the stream. + * Note: AWS API disallows streamName and nextToken in the same request. + */ + public List<Shard> listShards(KinesisClient client) { + List<Shard> allShards = new ArrayList<>(); + String nextToken = null; + do { + ListShardsRequest request = nextToken != null + ? ListShardsRequest.builder().nextToken(nextToken).build() + : ListShardsRequest.builder().streamName(streamName).build(); + ListShardsResponse response; + try { + response = client.listShards(request); + } catch (ResourceNotFoundException e) { + throw new HoodieReadFromSourceException("Kinesis stream " + streamName + " not found", e); + } catch (ProvisionedThroughputExceededException e) { + throw new HoodieReadFromSourceException("Kinesis throughput exceeded listing shards for " + streamName, e); + } catch (LimitExceededException e) { + throw new HoodieReadFromSourceException("Kinesis limit exceeded listing shards: " + e.getMessage(), e); + } + allShards.addAll(response.shards()); + nextToken = response.nextToken(); + } while (nextToken != null); + + // Include both open and closed shards. Closed shards (e.g., from resharding) may still contain + // unread records within the retention period. GetRecords works on closed shards until all data + // is consumed, at which point NextShardIterator returns null. + long openCount = allShards.stream() + .filter(s -> s.sequenceNumberRange() != null && s.sequenceNumberRange().endingSequenceNumber() == null) + .count(); + log.info("Found {} shards for stream {} ({} open, {} closed)", + allShards.size(), streamName, openCount, allShards.size() - openCount); + return allShards; + } + + /** + * Get shard ranges to read, based on checkpoint and limits. + */ + public KinesisShardRange[] getNextShardRanges(Option<Checkpoint> lastCheckpoint, + long sourceLimit, + HoodieIngestionMetrics metrics) { + long maxEvents = getLongWithAltKeys(props, KinesisSourceConfig.MAX_EVENTS_FROM_KINESIS_SOURCE); + long numEvents = sourceLimit == Long.MAX_VALUE ? maxEvents : Math.min(sourceLimit, maxEvents); + long minPartitions = getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_SOURCE_MIN_PARTITIONS); + log.info("getNextShardRanges set config {} to {}", KinesisSourceConfig.KINESIS_SOURCE_MIN_PARTITIONS.key(), minPartitions); + + try (KinesisClient client = createKinesisClient()) { + // List all open and closed shards. + List<Shard> shards = listShards(client); + if (shards.isEmpty()) { + return new KinesisShardRange[0]; + } + + Map<String, String> fromSequenceNumbers = new HashMap<>(); + Option<String> lastCheckpointStr = lastCheckpoint.isPresent() + ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(); + + // CASE: last checkpoint exists. + if (lastCheckpointStr.isPresent() && CheckpointUtils.checkStreamCheckpoint(lastCheckpointStr)) { + Map<String, String> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + if (!checkpointOffsets.isEmpty() && lastCheckpointStr.get().startsWith(streamName + ",")) { + // Check for expired shards (checkpoint references shards no longer in stream, e.g., past retention) + Set<String> availableShardIds = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); + List<String> expiredShardIds = checkpointOffsets.keySet().stream() + .filter(id -> !availableShardIds.contains(id)) + .collect(Collectors.toList()); + // Handle expired shards that exist in the last checkpoint. + if (!expiredShardIds.isEmpty()) { + boolean failOnDataLoss = getBooleanWithAltKeys(props, KinesisSourceConfig.ENABLE_FAIL_ON_DATA_LOSS); + for (String shardId : expiredShardIds) { + String value = checkpointOffsets.get(shardId); + String lastSeq = CheckpointUtils.getLastSeqFromValue(value); + String endSeq = CheckpointUtils.getEndSeqFromValue(value); + boolean fullyConsumed; + if (endSeq != null) { + // CASE 1: lastSeq >= endSeq: all records have been consumed. + fullyConsumed = lastSeq != null && lastSeq.compareTo(endSeq) >= 0; + } else { + // CASE 2: lastSeq < endSeq: some records haven't been consumed. + // CASE 3: endSeq == null: open shard. + fullyConsumed = false; + } + if (fullyConsumed) { + log.info("Expired shard {} was fully consumed (lastSeq >= endSeq); pruning from checkpoint", + shardId); + } else { + if (failOnDataLoss) { + throw new HoodieReadFromSourceException("Checkpoint references expired shard " + shardId + + " with unread data (lastSeq < endSeq or no endSeq stored). Data loss may have occurred. " + + "Set " + KinesisSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "=false to continue."); + } + log.warn("Expired shard {} may have unread data; pruning and continuing (failOnDataLoss=false)", + shardId); + } + } + } + // Parse lastSeq for open and closed shards. + // For closed shards, even if all their records have been consumed, they are still included. + for (String shardId : availableShardIds) { + if (checkpointOffsets.containsKey(shardId)) { + String lastSeq = CheckpointUtils.getLastSeqFromValue(checkpointOffsets.get(shardId)); + if (lastSeq != null && !lastSeq.isEmpty()) { + fromSequenceNumbers.put(shardId, lastSeq); + } + } + } + } + } + + List<KinesisShardRange> ranges = new ArrayList<>(); + for (Shard shard : shards) { + String shardId = shard.shardId(); + Option<String> startSeq = fromSequenceNumbers.containsKey(shardId) + ? Option.of(fromSequenceNumbers.get(shardId)) + : Option.empty(); + Option<String> endSeq = (shard.sequenceNumberRange() != null + && shard.sequenceNumberRange().endingSequenceNumber() != null) + ? Option.of(shard.sequenceNumberRange().endingSequenceNumber()) + : Option.empty(); + ranges.add(KinesisShardRange.of(shardId, startSeq, endSeq)); + } + + int targetParallelism = minPartitions > 0 + ? (int) Math.max(minPartitions, ranges.size()) + : ranges.size(); + metrics.updateStreamerSourceParallelism(targetParallelism); + + log.info("About to read up to {} events from {} shards in stream {} (target parallelism: {})", + numEvents, ranges.size(), streamName, targetParallelism); + return ranges.toArray(new KinesisShardRange[0]); + } catch (ResourceNotFoundException e) { + throw new HoodieReadFromSourceException("Kinesis stream " + streamName + " not found", e); + } catch (ProvisionedThroughputExceededException e) { + throw new HoodieReadFromSourceException("Kinesis throughput exceeded for stream " + streamName, e); + } catch (InvalidArgumentException e) { + throw new HoodieReadFromSourceException("Invalid Kinesis request: " + e.getMessage(), e); + } catch (LimitExceededException e) { + throw new HoodieReadFromSourceException("Kinesis limit exceeded: " + e.getMessage(), e); + } + } + + /** + * Result of reading from a shard: records and the last sequence number for checkpoint. + */ + @AllArgsConstructor + @Getter + public static class ShardReadResult implements java.io.Serializable { + private final List<Record> records; + private final Option<String> lastSequenceNumber; + /** True when nextShardIterator was null, meaning the shard has no more records to return. */ + private final boolean reachedEndOfShard; + } + + /** + * Read records from a single shard. + * @param enableDeaggregation when true, de-aggregates KPL records into individual user records + */ + public static ShardReadResult readShardRecords(KinesisClient client, String streamName, + KinesisShardRange range, KinesisSourceConfig.KinesisStartingPosition defaultPosition, + int maxRecordsPerRequest, long intervalMs, long maxTotalRecords, + boolean enableDeaggregation) throws InterruptedException { + String shardIterator; + try { + shardIterator = getShardIterator(client, streamName, range, defaultPosition); + } catch (InvalidArgumentException e) { + // GetShardIterator throws InvalidArgumentException (not ExpiredIteratorException) when the + // requested sequence number is past the stream's retention window. + throw new HoodieReadFromSourceException("Sequence number in checkpoint is expired or invalid for shard " + + range.getShardId() + ". Reset the checkpoint to recover.", e); + } catch (ResourceNotFoundException e) { + throw new HoodieReadFromSourceException("Shard or stream not found: " + range.getShardId(), e); + } catch (ProvisionedThroughputExceededException e) { + throw new HoodieReadFromSourceException("Kinesis throughput exceeded reading shard " + range.getShardId(), e); + } + List<Record> allRecords = new ArrayList<>(); + String lastSequenceNumber = null; + int requestCount = 0; + + while (allRecords.size() < maxTotalRecords && shardIterator != null) { + GetRecordsResponse response; + try { + response = client.getRecords( + GetRecordsRequest.builder() + .shardIterator(shardIterator) + .limit(Math.min(maxRecordsPerRequest, (int) (maxTotalRecords - allRecords.size()))) + .build()); + } catch (ExpiredIteratorException e) { + log.warn("Shard iterator expired for {} during GetRecords, stopping read", range.getShardId()); + break; + } catch (ProvisionedThroughputExceededException e) { + throw new HoodieReadFromSourceException("Kinesis throughput exceeded reading shard " + range.getShardId(), e); + } + + List<Record> records = response.records(); + // Update shardIterator before the empty check so its null-ness correctly reflects end-of-shard + // even when the final response carries 0 records (closed shard fully exhausted). + shardIterator = response.nextShardIterator(); + // CASE 1: No records returned: stop polling. nextShardIterator can be non-null when at LATEST with no new + // data; continuing would cause an infinite loop of empty GetRecords calls. + if (records.isEmpty()) { + break; + } + // CASE 2: records returned. + List<Record> toAdd = enableDeaggregation ? KinesisDeaggregator.deaggregate(records) : records; + for (Record r : toAdd) { + allRecords.add(r); + } + // Checkpoint uses the last Kinesis record's sequence number (from raw records, not deaggregated) + lastSequenceNumber = records.get(records.size() - 1).sequenceNumber(); + + requestCount++; + // This is for rate limiting + if (shardIterator != null && intervalMs > 0) { + Thread.sleep(intervalMs); + } + } Review Comment: After deaggregation, `allRecords` can significantly exceed `maxTotalRecords` since one aggregated record can expand into many user records, but the while-loop only checks the limit before fetching. With aggressive KPL aggregation ratios (e.g., 100:1), a shard could return far more records than the configured per-shard limit. Have you considered truncating `toAdd` to `maxTotalRecords - allRecords.size()` before adding? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") + .withDocumentation("Kinesis Data Streams stream name."); + + public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty + .key(PREFIX + "region") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "region") + .withDocumentation("AWS region for the Kinesis stream (e.g., us-east-1)."); + + public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = ConfigProperty + .key(PREFIX + "endpoint.url") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "endpoint.url") + .markAdvanced() + .withDocumentation("Custom endpoint URL for Kinesis (e.g., for localstack). " + + "If not set, uses the default AWS endpoint for the region."); + + public static final ConfigProperty<String> KINESIS_ACCESS_KEY = ConfigProperty + .key(PREFIX + "access.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "access.key") + .markAdvanced() + .withDocumentation("AWS access key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<String> KINESIS_SECRET_KEY = ConfigProperty + .key(PREFIX + "secret.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "secret.key") + .markAdvanced() + .withDocumentation("AWS secret key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents") Review Comment: ```suggestion .key(STREAMER_CONFIG_PREFIX + "kinesis.source.max.events") ``` ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") + .withDocumentation("Kinesis Data Streams stream name."); + + public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty + .key(PREFIX + "region") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "region") + .withDocumentation("AWS region for the Kinesis stream (e.g., us-east-1)."); Review Comment: Should this be marked `.markAdvanced()`? The AWS SDK should automatically resolve the region to use by default. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; Review Comment: New streamer configs should not support the deprecated prefix. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.LOCALSTACK_END_SEQ_SENTINEL; + +/** + * Source to read JSON data from AWS Kinesis Data Streams using Spark. + */ +@Slf4j +public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** Result from reading a single shard in a partition. */ + @AllArgsConstructor + @Getter + private static class ShardFetchResult implements Serializable { + private final List<String> records; + private final String shardId; + private final Option<String> lastSequenceNumber; + private final boolean reachedEndOfShard; + } + + /** Metadata-only summary for checkpoint; avoids bringing records to driver. */ + @AllArgsConstructor + @Getter + private static class ShardFetchSummary implements Serializable { + private final String shardId; + private final Option<String> lastSequenceNumber; + private final int recordCount; + private final boolean reachedEndOfShard; + } + + /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid memory leak. */ + private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> persistedFetchRdd; + /** Record count from fetch, avoids redundant batch.count() Spark job. */ + private long lastRecordCount; + /** Shard IDs where the executor observed nextShardIterator==null (end-of-shard reached). */ + protected Set<String> shardsReachedEnd; + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + this(properties, sparkContext, sparkSession, metrics, + new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, + new DefaultStreamContext(streamContext.getSchemaProvider(), streamContext.getSourceProfileSupplier())); + this.offsetGen = new KinesisOffsetGen(props); + } + + @Override + protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRanges) { + KinesisReadConfig readConfig = new KinesisReadConfig( + offsetGen.getStreamName(), + offsetGen.getRegion(), + offsetGen.getEndpointUrl().orElse(null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null), + offsetGen.getStartingPosition(), + shouldAddOffsets, + getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION), + getIntWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_MAX_RECORDS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS), + // Evenly set the max events per shard. + shardRanges.length > 0 ? Math.max(1, getLongWithAltKeys(props, KinesisSourceConfig.MAX_EVENTS_FROM_KINESIS_SOURCE) / shardRanges.length) : Long.MAX_VALUE); + + JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize( + java.util.Arrays.asList(shardRanges), shardRanges.length) + .mapPartitions(shardRangeIt -> { + List<ShardFetchResult> results = new ArrayList<>(); + try (KinesisClient client = createKinesisClientFromConfig(readConfig)) { + while (shardRangeIt.hasNext()) { + KinesisOffsetGen.KinesisShardRange range = shardRangeIt.next(); + KinesisOffsetGen.ShardReadResult readResult = KinesisOffsetGen.readShardRecords( + client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), + readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMs(), readConfig.getMaxRecordsPerShard(), + readConfig.isEnableDeaggregation()); + + List<String> recordStrings = new ArrayList<>(); + for (Record r : readResult.getRecords()) { + String json = recordToJsonStatic(r, range.getShardId(), readConfig.isShouldAddOffsets()); + if (json != null) { + recordStrings.add(json); + } + } + results.add(new ShardFetchResult(recordStrings, range.getShardId(), + readResult.getLastSequenceNumber(), readResult.isReachedEndOfShard())); + } + } + return results.iterator(); + }); + + // Cache so we can both get records and checkpoint from the same RDD + fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK()); Review Comment: If `toBatch` is called a second time (e.g., during retry), the previous `persistedFetchRdd` is overwritten without being unpersisted first, leaking Spark storage memory. Could you add `if (persistedFetchRdd != null) { persistedFetchRdd.unpersist(); }` before the persist call, or is it handled somewhere else? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") + .withDocumentation("Kinesis Data Streams stream name."); Review Comment: Add `.sinceVersion("1.2.0")`. Same for other configs. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; +import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX; + +/** + * Kinesis Source Configs for Hudi Streamer. + */ +@Immutable +@ConfigClassProperty(name = "Kinesis Source Configs", + groupName = ConfigGroups.Names.HUDI_STREAMER, + subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE, + description = "Configurations controlling the behavior of Kinesis source in Hudi Streamer.") +public class KinesisSourceConfig extends HoodieConfig { + + public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string"; + + private static final String PREFIX = STREAMER_CONFIG_PREFIX + "source.kinesis."; + private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "source.kinesis."; + + public static final ConfigProperty<String> KINESIS_STREAM_NAME = ConfigProperty + .key(PREFIX + "stream.name") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "stream.name") + .withDocumentation("Kinesis Data Streams stream name."); + + public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty + .key(PREFIX + "region") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "region") + .withDocumentation("AWS region for the Kinesis stream (e.g., us-east-1)."); + + public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = ConfigProperty + .key(PREFIX + "endpoint.url") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "endpoint.url") + .markAdvanced() + .withDocumentation("Custom endpoint URL for Kinesis (e.g., for localstack). " + + "If not set, uses the default AWS endpoint for the region."); + + public static final ConfigProperty<String> KINESIS_ACCESS_KEY = ConfigProperty + .key(PREFIX + "access.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "access.key") + .markAdvanced() + .withDocumentation("AWS access key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<String> KINESIS_SECRET_KEY = ConfigProperty + .key(PREFIX + "secret.key") + .noDefaultValue() + .withAlternatives(OLD_PREFIX + "secret.key") + .markAdvanced() + .withDocumentation("AWS secret key for Kinesis. Used when connecting to custom endpoints (e.g., LocalStack). " + + "If not set with endpoint, uses the default AWS credential chain."); + + public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents") + .defaultValue(5000000L) + .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents") + .markAdvanced() + .withDocumentation("Maximum number of records obtained in each batch from Kinesis."); + + public static final ConfigProperty<Long> KINESIS_SOURCE_MIN_PARTITIONS = ConfigProperty + .key(PREFIX + "minPartitions") + .defaultValue(0L) + .withAlternatives(OLD_PREFIX + "minPartitions") + .markAdvanced() + .withDocumentation("Desired minimum number of Spark partitions when reading from Kinesis. " + + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark partitions. " + + "If set to a value greater than the number of shards, the result RDD will be repartitioned " + + "to increase downstream parallelism. Use 0 for 1-1 mapping."); + + public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = ConfigProperty + .key(PREFIX + "append.offsets") + .defaultValue(false) + .withAlternatives(OLD_PREFIX + "append.offsets") + .markAdvanced() + .withDocumentation("When enabled, appends Kinesis metadata (sequence number, shard id, arrival timestamp, partition key) to records."); Review Comment: Are these fields prepended to the target Hudi table's schema so that schema evoluation can still work? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.LOCALSTACK_END_SEQ_SENTINEL; + +/** + * Source to read JSON data from AWS Kinesis Data Streams using Spark. + */ +@Slf4j +public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** Result from reading a single shard in a partition. */ + @AllArgsConstructor + @Getter + private static class ShardFetchResult implements Serializable { + private final List<String> records; + private final String shardId; + private final Option<String> lastSequenceNumber; + private final boolean reachedEndOfShard; + } + + /** Metadata-only summary for checkpoint; avoids bringing records to driver. */ + @AllArgsConstructor + @Getter + private static class ShardFetchSummary implements Serializable { + private final String shardId; + private final Option<String> lastSequenceNumber; + private final int recordCount; + private final boolean reachedEndOfShard; + } + + /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid memory leak. */ + private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> persistedFetchRdd; + /** Record count from fetch, avoids redundant batch.count() Spark job. */ + private long lastRecordCount; + /** Shard IDs where the executor observed nextShardIterator==null (end-of-shard reached). */ + protected Set<String> shardsReachedEnd; + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + this(properties, sparkContext, sparkSession, metrics, + new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, + new DefaultStreamContext(streamContext.getSchemaProvider(), streamContext.getSourceProfileSupplier())); + this.offsetGen = new KinesisOffsetGen(props); + } + + @Override + protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRanges) { + KinesisReadConfig readConfig = new KinesisReadConfig( + offsetGen.getStreamName(), + offsetGen.getRegion(), + offsetGen.getEndpointUrl().orElse(null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null), + offsetGen.getStartingPosition(), + shouldAddOffsets, + getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION), + getIntWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_MAX_RECORDS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS), + // Evenly set the max events per shard. + shardRanges.length > 0 ? Math.max(1, getLongWithAltKeys(props, KinesisSourceConfig.MAX_EVENTS_FROM_KINESIS_SOURCE) / shardRanges.length) : Long.MAX_VALUE); + + JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize( + java.util.Arrays.asList(shardRanges), shardRanges.length) + .mapPartitions(shardRangeIt -> { + List<ShardFetchResult> results = new ArrayList<>(); + try (KinesisClient client = createKinesisClientFromConfig(readConfig)) { + while (shardRangeIt.hasNext()) { + KinesisOffsetGen.KinesisShardRange range = shardRangeIt.next(); + KinesisOffsetGen.ShardReadResult readResult = KinesisOffsetGen.readShardRecords( + client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), + readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMs(), readConfig.getMaxRecordsPerShard(), + readConfig.isEnableDeaggregation()); + + List<String> recordStrings = new ArrayList<>(); + for (Record r : readResult.getRecords()) { + String json = recordToJsonStatic(r, range.getShardId(), readConfig.isShouldAddOffsets()); + if (json != null) { + recordStrings.add(json); + } + } + results.add(new ShardFetchResult(recordStrings, range.getShardId(), + readResult.getLastSequenceNumber(), readResult.isReachedEndOfShard())); + } + } + return results.iterator(); + }); + + // Cache so we can both get records and checkpoint from the same RDD + fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK()); + persistedFetchRdd = fetchRdd; + + // RDD for the shard data. + JavaRDD<String> recordRdd = fetchRdd.flatMap(r -> r.getRecords().iterator()); + // Apply minimum partitions for downstream parallelism (similar to Kafka source) + long minPartitions = getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_SOURCE_MIN_PARTITIONS); + if (minPartitions > 0 && minPartitions > shardRanges.length) { + int targetPartitions = (int) minPartitions; + log.info("Repartitioning from {} shards to {} partitions (minPartitions={})", + shardRanges.length, targetPartitions, minPartitions); + recordRdd = recordRdd.repartition(targetPartitions); + } + + // Information for checkpoint generation. + List<ShardFetchSummary> summaries = fetchRdd + .map(r -> new ShardFetchSummary(r.getShardId(), r.getLastSequenceNumber(), r.getRecords().size(), + r.isReachedEndOfShard())) + .collect(); + lastCheckpointData = buildCheckpointFromSummaries(summaries); + Set<String> reached = new HashSet<>(); + for (ShardFetchSummary s : summaries) { + if (s.isReachedEndOfShard()) { + reached.add(s.getShardId()); + } + } + shardsReachedEnd = reached; + lastRecordCount = summaries.stream().mapToLong(ShardFetchSummary::getRecordCount).sum(); + + return recordRdd; + } + + private static KinesisClient createKinesisClientFromConfig(KinesisReadConfig config) { + software.amazon.awssdk.services.kinesis.KinesisClientBuilder builder = + KinesisClient.builder().region(software.amazon.awssdk.regions.Region.of(config.getRegion())); + if (config.getEndpointUrl() != null && !config.getEndpointUrl().isEmpty()) { + builder = builder.endpointOverride(java.net.URI.create(config.getEndpointUrl())); + } + if (config.getAccessKey() != null && !config.getAccessKey().isEmpty() + && config.getSecretKey() != null && !config.getSecretKey().isEmpty()) { + builder = builder.credentialsProvider( + software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create( + config.getAccessKey(), config.getSecretKey()))); + } + return builder.build(); + } + + private static String recordToJsonStatic(Record record, String shardId, boolean shouldAddOffsets) { + String dataStr = record.data().asUtf8String(); + // Pure empty or null records in Kinesis is not meaningful. + if (dataStr == null || dataStr.trim().isEmpty()) { + return null; + } + if (shouldAddOffsets) { + try { + ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(dataStr); + node.put("_hoodie_kinesis_source_sequence_number", record.sequenceNumber()); + node.put("_hoodie_kinesis_source_shard_id", shardId); + node.put("_hoodie_kinesis_source_partition_key", record.partitionKey()); + if (record.approximateArrivalTimestamp() != null) { + node.put("_hoodie_kinesis_source_timestamp", + record.approximateArrivalTimestamp().toEpochMilli()); + } + return OBJECT_MAPPER.writeValueAsString(node); + } catch (Exception e) { + return dataStr; + } + } + return dataStr; + } + + private Map<String, String> buildCheckpointFromSummaries(List<ShardFetchSummary> summaries) { + Map<String, String> checkpoint = new HashMap<>(); + for (ShardFetchSummary s : summaries) { + if (s.getLastSequenceNumber().isPresent()) { + checkpoint.put(s.getShardId(), s.getLastSequenceNumber().get()); + } + } + return checkpoint; + } + + @Override + protected String createCheckpointFromBatch(JavaRDD<String> batch, + KinesisOffsetGen.KinesisShardRange[] shardRangesRead, + KinesisOffsetGen.KinesisShardRange[] allShardRanges) { + // Build checkpoint from ALL shards so filtered-out (fully consumed) shards are preserved. + // Otherwise the next run would omit them from the checkpoint and re-read from TRIM_HORIZON. + Map<String, String> fullCheckpoint = new HashMap<>(); + for (KinesisOffsetGen.KinesisShardRange range : allShardRanges) { + String lastSeq; + String endSeq; + if (lastCheckpointData != null && lastCheckpointData.containsKey(range.getShardId())) { + // Shard we read from: use data from the read + lastSeq = lastCheckpointData.get(range.getShardId()); + endSeq = range.getEndingSequenceNumber().orElse(null); + } else { + // Filtered shard (not read): preserve from range so next run won't re-read + lastSeq = range.getStartingSequenceNumber().orElse(""); + endSeq = range.getEndingSequenceNumber().orElse(null); + } + // for test only + // LocalStack returns Long.MAX_VALUE for closed shards; use lastSeq as endSeq so we can detect + // "fully consumed" when the parent shard expires (lastSeq >= endSeq). + if (LOCALSTACK_END_SEQ_SENTINEL.equals(endSeq) && lastSeq != null && !lastSeq.isEmpty()) { + endSeq = lastSeq; + } Review Comment: This sentinel check is commented "for test only" but runs in the production code path. Could we remove this from production code and use a different way to handle it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
