yihua commented on code in PR #18224: URL: https://github.com/apache/hudi/pull/18224#discussion_r3036403582
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisDeaggregator; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.streamer.StreamContext; + +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; + +@Slf4j +public abstract class KinesisSource<T> extends Source<T> { + + protected static final String METRIC_NAME_KINESIS_MESSAGE_IN_COUNT = "kinesisMessageInCount"; + + protected final HoodieIngestionMetrics metrics; + protected final SchemaProvider schemaProvider; + protected KinesisOffsetGen offsetGen; + protected final boolean shouldAddMetaFields; + /** Checkpoint data (shardId -> sequenceNumber) collected during toBatch execution. Set by subclasses. */ + protected Map<String, String> lastCheckpointData; + + protected KinesisSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SourceType sourceType, HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(props, sparkContext, sparkSession, sourceType, streamContext); + this.schemaProvider = streamContext.getSchemaProvider(); + this.metrics = metrics; + this.shouldAddMetaFields = getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_APPEND_OFFSETS); + } + + @Override + protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sourceLimit) { + throw new UnsupportedOperationException("KinesisSource#fetchNewData should not be called"); + } + + @Override + protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) { + // STEP 1: Collect all available shards for the stream: open/closed shards. + KinesisOffsetGen.KinesisShardRange[] allOpenClosedShardRanges = offsetGen.getNextShardRanges(lastCheckpoint, sourceLimit); + // STEP 2: Filter out shards with no unread records to avoid unnecessary GetRecords calls. + boolean useLatestStartingPositionStrategy = + offsetGen.getStartingPositionStrategy() == KinesisSourceConfig.KinesisStartingPositionStrategy.LATEST; + int numShardsBeforeFilter = allOpenClosedShardRanges.length; + KinesisOffsetGen.KinesisShardRange[] shardRangesWithUnreadRecords = Arrays.stream(allOpenClosedShardRanges) + .filter(range -> range.hasUnreadRecords(useLatestStartingPositionStrategy)) + .toArray(KinesisOffsetGen.KinesisShardRange[]::new); + if (numShardsBeforeFilter > shardRangesWithUnreadRecords.length) { + log.info("Filtered {} shards with no unread records, {} shards remain", + numShardsBeforeFilter - shardRangesWithUnreadRecords.length, shardRangesWithUnreadRecords.length); + } + // When nothing to read, return empty batch and previous checkpoint if any. + if (shardRangesWithUnreadRecords.length == 0) { + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT, 0); + String checkpointStr = lastCheckpoint.isPresent() ? lastCheckpoint.get().getCheckpointKey() : ""; + return new InputBatch<>(Option.empty(), checkpointStr); + } + // STEP 3: Otherwise, do the read. + T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit); + // STEP 4: Generate checkpoint. + // Pass allOpenClosedShardRanges so filtered-out shards are preserved in the checkpoint; otherwise + // next run would re-read them from TRIM_HORIZON and cause duplicates + String checkpointStr = createCheckpointFromBatch(batch, shardRangesWithUnreadRecords, allOpenClosedShardRanges); + // STEP 5: Emit metrics. + long totalMsgs = getRecordCount(batch); + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT, totalMsgs); + log.info("Read {} records from Kinesis stream {} with {} shards, checkpoint: {}", + totalMsgs, offsetGen.getStreamName(), shardRangesWithUnreadRecords.length, checkpointStr); + + return new InputBatch<>(Option.of(batch), checkpointStr); + } + + /** Upper bound on consecutive empty GetRecords responses before giving up on a shard. */ + private static final int MAX_EMPTY_RESPONSES_FROM_GET_RECORDS = 100; + + /** + * Lazy iterator over records from a single Kinesis shard. + * + * <p>Records are fetched one GetRecords page at a time; the next page is only requested once all + * records from the current page have been consumed. This avoids holding the full shard batch in + * executor memory simultaneously with the caller's output collection. + * + * <p>After {@link #hasNext()} returns {@code false} callers must read + * {@link #getLastSequenceNumber()} and {@link #isReachedEndOfShard()} to obtain checkpoint state. + * + * <p><b>lastSequenceNumber correctness invariant:</b> the sequence number is taken from the last + * <em>raw</em> Kinesis record (pre-deaggregation) of a page and is only committed once all + * deaggregated records from that page have been yielded. This guarantees the checkpoint never + * advances past records that have not yet been returned to the caller. + */ + public static class ShardRecordIterator implements Iterator<Record> { + private final KinesisClient client; + private final String shardId; + private final int maxRecordsPerRequest; + private final long requestIntervalMs; + private final long maxTotalRecords; + private final boolean enableDeaggregation; + private final long retryInitialIntervalMs; + private final long retryMaxIntervalMs; + private final long throttleTimeoutMs; + + /** Current position in the Kinesis shard; null means the shard is exhausted. */ + private String shardIteratorStr; + /** Records from the most recently fetched page, ready to be yielded. */ + private Iterator<Record> currentPage = Collections.emptyIterator(); + /** + * Raw lastSeq of the page currently being consumed. Moved to {@link #lastSequenceNumber} only + * when the page iterator is fully exhausted, ensuring the checkpoint never skips records. + */ + private String pendingPageLastSeq = null; + /** Checkpoint-safe lastSeq: reflects only fully-consumed pages. */ + private String lastSequenceNumber = null; + private boolean reachedEndOfShard = false; + /** True once no further GetRecords calls should be made. */ + private boolean fetchingDone = false; + private long totalConsumed = 0; + private int emptyPageCount = 0; + + /** + * Dynamically tuned records-per-request limit. + * Halved on each ProvisionedThroughputExceededException and held there for the rest of the shard read. + */ + private int currentMaxRecords; + /** Epoch ms of the last successful GetRecords call; used to enforce {@link #throttleTimeoutMs}. */ + private long lastSuccessTimeMs; + + public ShardRecordIterator(String initialShardIterator, KinesisClient client, String shardId, + int maxRecordsPerRequest, long requestIntervalMs, long maxTotalRecords, boolean enableDeaggregation, + long retryInitialIntervalMs, long retryMaxIntervalMs, long throttleTimeoutMs) { + this.shardIteratorStr = initialShardIterator; + this.client = client; + this.shardId = shardId; + this.maxRecordsPerRequest = maxRecordsPerRequest; + this.requestIntervalMs = requestIntervalMs; + this.maxTotalRecords = maxTotalRecords; + this.enableDeaggregation = enableDeaggregation; + this.retryInitialIntervalMs = retryInitialIntervalMs; + this.retryMaxIntervalMs = retryMaxIntervalMs; + this.throttleTimeoutMs = throttleTimeoutMs; + this.currentMaxRecords = maxRecordsPerRequest; + this.lastSuccessTimeMs = System.currentTimeMillis(); + } + + @Override + public boolean hasNext() { + while (true) { + if (currentPage.hasNext()) { + return true; + } + // Current page fully consumed: commit its lastSeq before moving on. + commitPendingPageLastSeq(); + if (fetchingDone) { + return false; + } + fetchNextPage(); + // Loop: if the page was empty, try fetching again (up to MAX_EMPTY_RESPONSES limit). + } + } + + @Override + public Record next() { + if (!hasNext()) { + throw new NoSuchElementException("No more records for shard " + shardId); + } + totalConsumed++; + return currentPage.next(); + } + + public Option<String> getLastSequenceNumber() { + return Option.ofNullable(lastSequenceNumber); + } + + public boolean isReachedEndOfShard() { + return reachedEndOfShard; + } + + private void commitPendingPageLastSeq() { + if (pendingPageLastSeq != null) { + lastSequenceNumber = pendingPageLastSeq; + pendingPageLastSeq = null; + } + } + + private void fetchNextPage() { + if (shardIteratorStr == null || totalConsumed >= maxTotalRecords) { + fetchingDone = true; + return; + } + GetRecordsResponse response; + int attempt = 0; + while (true) { + try { + response = client.getRecords( + GetRecordsRequest.builder() + .shardIterator(shardIteratorStr) + .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - totalConsumed))) + .build()); + lastSuccessTimeMs = System.currentTimeMillis(); + break; + } catch (ExpiredIteratorException e) { + log.warn("Shard iterator expired for {} during GetRecords, stopping read", shardId); + fetchingDone = true; + return; + } catch (ProvisionedThroughputExceededException e) { + long nowMs = System.currentTimeMillis(); + if (nowMs - lastSuccessTimeMs > throttleTimeoutMs) { + throw new HoodieReadFromSourceException( + "Kinesis throughput exceeded for shard " + shardId + ": no successful fetch within " + + throttleTimeoutMs + " ms. Last successful fetch, or first fetch was " + (nowMs - lastSuccessTimeMs) + " ms ago.", e); + } + // Halve the per-request limit to reduce pressure; floor at 1. + int prevLimit = currentMaxRecords; + currentMaxRecords = Math.max(1, currentMaxRecords / 2); + // Use attempt count only to compute exponential backoff delay, not as a stop condition. + long waitMs = Math.min(retryInitialIntervalMs * (1L << Math.min(attempt, 30)), retryMaxIntervalMs); + waitMs += ThreadLocalRandom.current().nextInt(500); + log.warn("Throughput exceeded for shard {}: halving records/request from {} to {}, retry after {} ms " + + "(no success for {} ms, will give up after {} ms)", + shardId, prevLimit, currentMaxRecords, waitMs, nowMs - lastSuccessTimeMs, throttleTimeoutMs); + try { + Thread.sleep(waitMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new HoodieReadFromSourceException("Interrupted while backing off for shard " + shardId, ie); + } + attempt++; + } + } + + List<Record> rawRecords = response.records(); + // Update before empty check: null nextShardIterator signals end-of-shard even on a 0-record response. + shardIteratorStr = response.nextShardIterator(); + + if (!rawRecords.isEmpty()) { + // pendingPageLastSeq is from raw records (pre-deaggregation) per the checkpoint invariant. Review Comment: 🤖 `GetRecordsResponse.millisBehindLatest()` returns a boxed `Long` in the AWS SDK v2, which can be `null` (e.g., with LocalStack or custom endpoints). The `== 0` comparison will auto-unbox and throw a `NullPointerException`. Could you guard with a null check, e.g. `Long millis = response.millisBehindLatest(); if (millis != null && millis == 0)`? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java: ########## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.calculateNumEvents; + +/** + * Source to read JSON data from AWS Kinesis Data Streams using Spark. + */ +@Slf4j +public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** Metadata-only summary for checkpoint; avoids bringing records to driver. */ + @AllArgsConstructor + @Getter + private static class ShardFetchSummary implements Serializable { + private final String shardId; + private final Option<String> lastSequenceNumber; + // UTC in milliseconds. + private final Option<Long> lastArrivalTime; + private final int recordCount; + private final boolean reachedEndOfShard; + } + + /** + * Per-shard fetch result stored in the persisted RDD. + * Records are eagerly materialized as List<String> so that both fields survive RDD spill + * to disk: List and ShardFetchSummary are fully serializable, unlike a transient Iterable. + */ + @AllArgsConstructor + @Getter + private static class ShardFetchResult implements Serializable { + private final List<String> records; + private final ShardFetchSummary summary; + } + + /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid memory leak. */ + private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> persistedFetchRdd; + /** Record count from fetch, avoids redundant batch.count() Spark job. */ + private long lastRecordCount; + /** Shard IDs where the executor observed nextShardIterator==null (end-of-shard reached). */ + protected Set<String> shardsReachedEnd; + /** Arrival time (epoch millis) of the record with last sequence number, per shard. */ + protected Map<String, Long> lastArrivalTimes; + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + this(properties, sparkContext, sparkSession, metrics, + new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, + new DefaultStreamContext(streamContext.getSchemaProvider(), streamContext.getSourceProfileSupplier())); + this.offsetGen = new KinesisOffsetGen(props); + } + + @Override + protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRanges, long sourceLimit) { + long numEvents = calculateNumEvents(sourceLimit, props); + KinesisReadConfig readConfig = new KinesisReadConfig( + offsetGen.getStreamName(), + offsetGen.getRegion(), + offsetGen.getEndpointUrl().orElse(null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null), + offsetGen.getStartingPositionStrategy(), + shouldAddMetaFields, + getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION), + getIntWithAltKeys(props, KinesisSourceConfig.KINESIS_MAX_RECORDS_PER_REQUEST), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS), + // NOTE that: Evenly set the max events per shard. + shardRanges.length > 0 ? Math.max(1, numEvents / shardRanges.length) : numEvents, + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_RETRY_INITIAL_INTERVAL_MS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_RETRY_MAX_INTERVAL_MS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_THROTTLE_TIMEOUT_MS)); + + JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize( + java.util.Arrays.asList(shardRanges), shardRanges.length) + .mapPartitions(shardRangeIt -> { + List<ShardFetchResult> results = new ArrayList<>(); + try (KinesisClient client = KinesisOffsetGen.createKinesisClient( Review Comment: 🤖 When `shardRanges.length` is 0, `sparkContext.parallelize(..., 0)` is called with 0 partitions. Is this a valid state? I believe Spark may throw an `IllegalArgumentException` for 0 partitions. It looks like `readFromCheckpoint` filters empty ranges before calling `toBatch`, but worth confirming this invariant or adding a guard. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.utilities.config.KinesisSourceConfig; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; + +/** + * Serializable configuration for Kinesis reads, used in Spark closures to avoid + * capturing non-serializable KinesisOffsetGen. + */ +@AllArgsConstructor +@Getter +public class KinesisReadConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String streamName; + private final String region; + private final String endpointUrl; // null if not set + private final String accessKey; // null if not set + private final String secretKey; // null if not set + private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition; Review Comment: 🤖 Storing `secretKey` as a plain serializable field means it could appear in Spark UI serialized task descriptions or executor logs. Have you considered using a transient field with lazy resolution, or at least ensuring `toString()` is overridden to redact it? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisDeaggregator; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.streamer.StreamContext; + +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; + +@Slf4j +public abstract class KinesisSource<T> extends Source<T> { + + protected static final String METRIC_NAME_KINESIS_MESSAGE_IN_COUNT = "kinesisMessageInCount"; + + protected final HoodieIngestionMetrics metrics; + protected final SchemaProvider schemaProvider; + protected KinesisOffsetGen offsetGen; + protected final boolean shouldAddMetaFields; + /** Checkpoint data (shardId -> sequenceNumber) collected during toBatch execution. Set by subclasses. */ + protected Map<String, String> lastCheckpointData; + + protected KinesisSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SourceType sourceType, HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(props, sparkContext, sparkSession, sourceType, streamContext); + this.schemaProvider = streamContext.getSchemaProvider(); + this.metrics = metrics; + this.shouldAddMetaFields = getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_APPEND_OFFSETS); + } + + @Override + protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sourceLimit) { + throw new UnsupportedOperationException("KinesisSource#fetchNewData should not be called"); + } + + @Override + protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) { + // STEP 1: Collect all available shards for the stream: open/closed shards. + KinesisOffsetGen.KinesisShardRange[] allOpenClosedShardRanges = offsetGen.getNextShardRanges(lastCheckpoint, sourceLimit); + // STEP 2: Filter out shards with no unread records to avoid unnecessary GetRecords calls. + boolean useLatestStartingPositionStrategy = + offsetGen.getStartingPositionStrategy() == KinesisSourceConfig.KinesisStartingPositionStrategy.LATEST; + int numShardsBeforeFilter = allOpenClosedShardRanges.length; + KinesisOffsetGen.KinesisShardRange[] shardRangesWithUnreadRecords = Arrays.stream(allOpenClosedShardRanges) + .filter(range -> range.hasUnreadRecords(useLatestStartingPositionStrategy)) + .toArray(KinesisOffsetGen.KinesisShardRange[]::new); + if (numShardsBeforeFilter > shardRangesWithUnreadRecords.length) { + log.info("Filtered {} shards with no unread records, {} shards remain", + numShardsBeforeFilter - shardRangesWithUnreadRecords.length, shardRangesWithUnreadRecords.length); + } + // When nothing to read, return empty batch and previous checkpoint if any. + if (shardRangesWithUnreadRecords.length == 0) { + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT, 0); + String checkpointStr = lastCheckpoint.isPresent() ? lastCheckpoint.get().getCheckpointKey() : ""; + return new InputBatch<>(Option.empty(), checkpointStr); + } + // STEP 3: Otherwise, do the read. + T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit); + // STEP 4: Generate checkpoint. + // Pass allOpenClosedShardRanges so filtered-out shards are preserved in the checkpoint; otherwise + // next run would re-read them from TRIM_HORIZON and cause duplicates + String checkpointStr = createCheckpointFromBatch(batch, shardRangesWithUnreadRecords, allOpenClosedShardRanges); + // STEP 5: Emit metrics. + long totalMsgs = getRecordCount(batch); + metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT, totalMsgs); + log.info("Read {} records from Kinesis stream {} with {} shards, checkpoint: {}", + totalMsgs, offsetGen.getStreamName(), shardRangesWithUnreadRecords.length, checkpointStr); + + return new InputBatch<>(Option.of(batch), checkpointStr); + } + + /** Upper bound on consecutive empty GetRecords responses before giving up on a shard. */ + private static final int MAX_EMPTY_RESPONSES_FROM_GET_RECORDS = 100; + + /** + * Lazy iterator over records from a single Kinesis shard. + * + * <p>Records are fetched one GetRecords page at a time; the next page is only requested once all + * records from the current page have been consumed. This avoids holding the full shard batch in + * executor memory simultaneously with the caller's output collection. + * + * <p>After {@link #hasNext()} returns {@code false} callers must read + * {@link #getLastSequenceNumber()} and {@link #isReachedEndOfShard()} to obtain checkpoint state. + * + * <p><b>lastSequenceNumber correctness invariant:</b> the sequence number is taken from the last + * <em>raw</em> Kinesis record (pre-deaggregation) of a page and is only committed once all + * deaggregated records from that page have been yielded. This guarantees the checkpoint never + * advances past records that have not yet been returned to the caller. + */ + public static class ShardRecordIterator implements Iterator<Record> { + private final KinesisClient client; + private final String shardId; + private final int maxRecordsPerRequest; + private final long requestIntervalMs; + private final long maxTotalRecords; + private final boolean enableDeaggregation; + private final long retryInitialIntervalMs; + private final long retryMaxIntervalMs; + private final long throttleTimeoutMs; + + /** Current position in the Kinesis shard; null means the shard is exhausted. */ + private String shardIteratorStr; + /** Records from the most recently fetched page, ready to be yielded. */ + private Iterator<Record> currentPage = Collections.emptyIterator(); + /** + * Raw lastSeq of the page currently being consumed. Moved to {@link #lastSequenceNumber} only + * when the page iterator is fully exhausted, ensuring the checkpoint never skips records. + */ + private String pendingPageLastSeq = null; + /** Checkpoint-safe lastSeq: reflects only fully-consumed pages. */ + private String lastSequenceNumber = null; + private boolean reachedEndOfShard = false; + /** True once no further GetRecords calls should be made. */ + private boolean fetchingDone = false; + private long totalConsumed = 0; + private int emptyPageCount = 0; + + /** + * Dynamically tuned records-per-request limit. + * Halved on each ProvisionedThroughputExceededException and held there for the rest of the shard read. + */ + private int currentMaxRecords; + /** Epoch ms of the last successful GetRecords call; used to enforce {@link #throttleTimeoutMs}. */ + private long lastSuccessTimeMs; + + public ShardRecordIterator(String initialShardIterator, KinesisClient client, String shardId, + int maxRecordsPerRequest, long requestIntervalMs, long maxTotalRecords, boolean enableDeaggregation, + long retryInitialIntervalMs, long retryMaxIntervalMs, long throttleTimeoutMs) { + this.shardIteratorStr = initialShardIterator; + this.client = client; + this.shardId = shardId; + this.maxRecordsPerRequest = maxRecordsPerRequest; + this.requestIntervalMs = requestIntervalMs; + this.maxTotalRecords = maxTotalRecords; + this.enableDeaggregation = enableDeaggregation; + this.retryInitialIntervalMs = retryInitialIntervalMs; + this.retryMaxIntervalMs = retryMaxIntervalMs; + this.throttleTimeoutMs = throttleTimeoutMs; + this.currentMaxRecords = maxRecordsPerRequest; + this.lastSuccessTimeMs = System.currentTimeMillis(); + } + + @Override + public boolean hasNext() { + while (true) { + if (currentPage.hasNext()) { + return true; + } + // Current page fully consumed: commit its lastSeq before moving on. + commitPendingPageLastSeq(); + if (fetchingDone) { + return false; + } + fetchNextPage(); + // Loop: if the page was empty, try fetching again (up to MAX_EMPTY_RESPONSES limit). + } + } + + @Override + public Record next() { + if (!hasNext()) { + throw new NoSuchElementException("No more records for shard " + shardId); + } + totalConsumed++; + return currentPage.next(); + } + + public Option<String> getLastSequenceNumber() { + return Option.ofNullable(lastSequenceNumber); + } + + public boolean isReachedEndOfShard() { + return reachedEndOfShard; + } + + private void commitPendingPageLastSeq() { + if (pendingPageLastSeq != null) { + lastSequenceNumber = pendingPageLastSeq; + pendingPageLastSeq = null; + } + } + + private void fetchNextPage() { Review Comment: 🤖 The cast `(int) (maxTotalRecords - totalConsumed)` can overflow if `maxTotalRecords` exceeds `Integer.MAX_VALUE` (it's a `long` config). An overflowed negative value would be picked by `Math.min` and passed to the Kinesis API. Consider using `(int) Math.min(currentMaxRecords, maxTotalRecords - totalConsumed)` to keep the arithmetic in `long` space before truncating. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java: ########## @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen; +import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.calculateNumEvents; + +/** + * Source to read JSON data from AWS Kinesis Data Streams using Spark. + */ +@Slf4j +public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** Metadata-only summary for checkpoint; avoids bringing records to driver. */ + @AllArgsConstructor + @Getter + private static class ShardFetchSummary implements Serializable { + private final String shardId; + private final Option<String> lastSequenceNumber; + // UTC in milliseconds. + private final Option<Long> lastArrivalTime; + private final int recordCount; + private final boolean reachedEndOfShard; + } + + /** + * Per-shard fetch result stored in the persisted RDD. + * Records are eagerly materialized as List<String> so that both fields survive RDD spill + * to disk: List and ShardFetchSummary are fully serializable, unlike a transient Iterable. + */ + @AllArgsConstructor + @Getter + private static class ShardFetchResult implements Serializable { + private final List<String> records; + private final ShardFetchSummary summary; + } + + /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid memory leak. */ + private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> persistedFetchRdd; + /** Record count from fetch, avoids redundant batch.count() Spark job. */ + private long lastRecordCount; + /** Shard IDs where the executor observed nextShardIterator==null (end-of-shard reached). */ + protected Set<String> shardsReachedEnd; + /** Arrival time (epoch millis) of the record with last sequence number, per shard. */ + protected Map<String, Long> lastArrivalTimes; + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { + this(properties, sparkContext, sparkSession, metrics, + new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public JsonKinesisSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { + super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, + new DefaultStreamContext(streamContext.getSchemaProvider(), streamContext.getSourceProfileSupplier())); + this.offsetGen = new KinesisOffsetGen(props); + } + + @Override + protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRanges, long sourceLimit) { + long numEvents = calculateNumEvents(sourceLimit, props); + KinesisReadConfig readConfig = new KinesisReadConfig( + offsetGen.getStreamName(), + offsetGen.getRegion(), + offsetGen.getEndpointUrl().orElse(null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null), + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null), + offsetGen.getStartingPositionStrategy(), + shouldAddMetaFields, + getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION), + getIntWithAltKeys(props, KinesisSourceConfig.KINESIS_MAX_RECORDS_PER_REQUEST), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS), + // NOTE that: Evenly set the max events per shard. + shardRanges.length > 0 ? Math.max(1, numEvents / shardRanges.length) : numEvents, + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_RETRY_INITIAL_INTERVAL_MS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_RETRY_MAX_INTERVAL_MS), + getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_THROTTLE_TIMEOUT_MS)); + + JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize( + java.util.Arrays.asList(shardRanges), shardRanges.length) + .mapPartitions(shardRangeIt -> { + List<ShardFetchResult> results = new ArrayList<>(); + try (KinesisClient client = KinesisOffsetGen.createKinesisClient( + readConfig.getRegion(), readConfig.getEndpointUrl(), + readConfig.getAccessKey(), readConfig.getSecretKey())) { + while (shardRangeIt.hasNext()) { + KinesisOffsetGen.KinesisShardRange range = shardRangeIt.next(); + // Lazy iterator: fetches one GetRecords page at a time, keeping only one page in + // executor memory instead of the full shard batch. Records are GC-eligible as soon + // as they are converted to JSON strings below. + KinesisSource.ShardRecordIterator recordIt = KinesisSource.readShardRecords( + client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), + readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMilliSeconds(), + readConfig.getMaxRecordsPerShard(), readConfig.isEnableDeaggregation(), + readConfig.getRetryInitialIntervalMs(), readConfig.getRetryMaxIntervalMs(), + readConfig.getThrottleTimeoutMs()); + + String shardId = range.getShardId(); + boolean addMetaFields = readConfig.isShouldAddMetaFields(); + List<String> jsonRecords = new ArrayList<>(); + long numNull = 0; + java.time.Instant lastArrivalTimestamp = null; + while (recordIt.hasNext()) { + Record r = recordIt.next(); + lastArrivalTimestamp = r.approximateArrivalTimestamp(); + String s = recordToJsonStatic(r, shardId, addMetaFields); + if (s != null) { + jsonRecords.add(s); + } else { + numNull++; + } + } + if (numNull > 0) { + log.warn("There are {} null strings for shard id {}", numNull, shardId); + } + // Capture the arrival time of the last record (same record whose sequence number + // becomes the checkpoint lastSeq) so it can be embedded in the checkpoint. + Option<Long> lastArrivalTime = lastArrivalTimestamp != null + ? Option.of(lastArrivalTimestamp.toEpochMilli()) : Option.empty(); + // recordCount reflects actual output records (null-filtered), not raw Kinesis count. + // NOTE: getLastSequenceNumber/isReachedEndOfShard are final only after hasNext()==false. + ShardFetchSummary summary = new ShardFetchSummary(shardId, + recordIt.getLastSequenceNumber(), lastArrivalTime, + jsonRecords.size(), recordIt.isReachedEndOfShard()); + results.add(new ShardFetchResult(jsonRecords, summary)); + } + } + return results.iterator(); + }); + + if (persistedFetchRdd != null) { + persistedFetchRdd.unpersist(); + persistedFetchRdd = null; + } + boolean persistFetchRdd = getBooleanWithAltKeys(props, KinesisSourceConfig.KINESIS_PERSIST_FETCH_RDD); + if (persistFetchRdd) { + fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK()); + persistedFetchRdd = fetchRdd; + } else { + log.debug("{} is false: fetch RDD is not persisted. The same Kinesis fetch may run twice (checkpoint + " + + "record write), which can cause duplicate records to be written. Set to true for correct behavior.", + KinesisSourceConfig.KINESIS_PERSIST_FETCH_RDD.key()); + } + // Guard: if anything below throws, unpersist immediately so the cached RDD doesn't leak. + boolean succeeded = false; + try { + // Collect basic information that will be used to construct the final checkpoint. + collectCheckpointInfo(fetchRdd); + // RDD for the shard data. + JavaRDD<String> recordRdd = fetchRdd.flatMap(r -> r.getRecords().iterator()); + // Apply minimum partitions for downstream parallelism (similar to Kafka source) + long manualPartitions = getLongWithAltKeys(props, KinesisSourceConfig.KINESIS_SOURCE_MANUAL_PARTITIONS); + if (manualPartitions > 0) { + int targetPartitions = (int) manualPartitions; + log.info("Repartitioning from {} shards to {} partitions (manualPartitions={})", + shardRanges.length, targetPartitions, manualPartitions); + recordRdd = recordRdd.repartition(targetPartitions); + } + succeeded = true; + return recordRdd; + } finally { + if (!succeeded) { + releaseResources(); + } + } + } + + private static String recordToJsonStatic(Record record, String shardId, boolean shouldAddMetaFields) { + String dataStr = record.data().asUtf8String(); + // Pure empty or null records in Kinesis is not meaningful. + if (dataStr == null || dataStr.trim().isEmpty()) { + return null; + } + if (shouldAddMetaFields) { + try { + ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(dataStr); + node.put("_hoodie_kinesis_source_sequence_number", record.sequenceNumber()); + node.put("_hoodie_kinesis_source_shard_id", shardId); + node.put("_hoodie_kinesis_source_partition_key", record.partitionKey()); + if (record.approximateArrivalTimestamp() != null) { + node.put("_hoodie_kinesis_source_timestamp", + record.approximateArrivalTimestamp().toEpochMilli()); + } + return OBJECT_MAPPER.writeValueAsString(node); + } catch (Exception e) { + // We can disable the flag for mitigation. + throw new HoodieException("Failed to add metadata fields", e); + } + } + return dataStr; + } + + private Map<String, String> buildCheckpointFromSummaries(List<ShardFetchSummary> summaries) { + Map<String, String> checkpoint = new HashMap<>(); + for (ShardFetchSummary s : summaries) { + if (s.getLastSequenceNumber().isPresent()) { + checkpoint.put(s.getShardId(), s.getLastSequenceNumber().get()); + } + } + return checkpoint; + } + + private Map<String, Long> buildArrivalTimesFromSummaries(List<ShardFetchSummary> summaries) { + Map<String, Long> arrivalTimes = new HashMap<>(); + for (ShardFetchSummary s : summaries) { + if (s.getLastArrivalTime().isPresent()) { Review Comment: 🤖 If `lastCheckpointData` is null (e.g., `collectCheckpointInfo` wasn't called or an earlier failure path), this would NPE on `lastCheckpointData.containsKey(...)`. This could happen if `toBatch` threw before reaching `collectCheckpointInfo` but the framework still called `createCheckpointFromBatch`. Consider initializing `lastCheckpointData` to `Collections.emptyMap()` or adding a null guard. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java: ########## @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; + +/** + * Helper for reading from Kinesis Data Streams and managing checkpoints. + * Checkpoint format: streamName,shardId:sequenceNumber,shardId:sequenceNumber,... + */ +@Slf4j +@Getter +public class KinesisOffsetGen { + + public static class CheckpointUtils { + /** Separator between lastSeq and endSeq for closed shards. Seq numbers are numeric, so this is safe. */ + private static final String END_SEQ_SEPARATOR = "|"; + /** + * Separator between lastSeq and arrivalTime (epoch millis of the record with last sequence number). + * '@' is used as it is absent from numeric Kinesis sequence numbers and visually distinct from '|'. + */ + private static final String ARRIVAL_TIME_SEPARATOR = "@"; + /** + * Kinesis checkpoint pattern. + * Format: streamName,shardId:lastSeq[@arrivalTime][|endSeq],... + * For closed shards we store lastSeq|endSeq (or lastSeq@arrivalTime|endSeq with arrival time) + * so we can detect data loss when shard expires. + */ + private static final Pattern PATTERN = Pattern.compile(".*,.*:.*"); + + /** + * Parse checkpoint string to shardId -> value map. + * Value format: lastSeq, or lastSeq@arrivalTime, or lastSeq|endSeq, or lastSeq@arrivalTime|endSeq. + */ + public static Map<String, String> strToOffsets(String checkpointStr) { + Map<String, String> offsetMap = new HashMap<>(); + String[] splits = checkpointStr.split(","); + for (int i = 1; i < splits.length; i++) { + String part = splits[i]; + int colonIdx = part.indexOf(':'); + if (colonIdx > 0 && colonIdx < part.length() - 1) { + String shardId = part.substring(0, colonIdx); + String value = part.substring(colonIdx + 1); + offsetMap.put(shardId, value); + } + } + return offsetMap; + } + + /** + * Extract lastSeq from a checkpoint value. + * Handles formats: "lastSeq", "lastSeq|endSeq", "lastSeq@arrivalTime", "lastSeq@arrivalTime|endSeq". + */ + public static String getLastSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return value; + } + int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR); + if (arrivalSep >= 0) { + return value.substring(0, arrivalSep); + } + int endSep = value.indexOf(END_SEQ_SEPARATOR); + return endSep >= 0 ? value.substring(0, endSep) : value; + } + + /** + * Extract arrivalTime (epoch millis) from a checkpoint value if present, otherwise null. + * Handles formats: "lastSeq@arrivalTime" and "lastSeq@arrivalTime|endSeq". + */ + public static Long getArrivalTimeFromValue(String value) { + if (value == null || value.isEmpty()) { + return null; + } + int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR); + if (arrivalSep < 0) { + return null; + } + String rest = value.substring(arrivalSep + ARRIVAL_TIME_SEPARATOR.length()); + int endSep = rest.indexOf(END_SEQ_SEPARATOR); + String arrivalStr = endSep >= 0 ? rest.substring(0, endSep) : rest; + try { + return Long.parseLong(arrivalStr); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Extract endSeq from a checkpoint value if present. Returns null for open shards. + * Handles formats: "lastSeq|endSeq" and "lastSeq@arrivalTime|endSeq". + * Since '@' and '|' are distinct, '|' always unambiguously marks the endSeq regardless of + * whether an arrival time is present. + */ + public static String getEndSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return null; + } + int endSep = value.indexOf(END_SEQ_SEPARATOR); + return endSep >= 0 && endSep < value.length() - 1 ? value.substring(endSep + 1) : null; + } + + /** + * Parse a checkpoint value into (lastSeq, endSeq). Combines {@link #getLastSeqFromValue} and + * {@link #getEndSeqFromValue} into a single call to avoid parsing the value string twice. + * @return Pair where left=lastSeq (empty Option when absent), right=endSeq (empty Option for open shards) + */ + public static Pair<Option<String>, Option<String>> parseCheckpointValue(String value) { + return Pair.of(Option.ofNullable(getLastSeqFromValue(value)), + Option.ofNullable(getEndSeqFromValue(value))); + } + + /** + * Build checkpoint value without arrival time: "lastSeq" or "lastSeq|endSeq". + */ + public static String buildCheckpointValue(String lastSeq, String endSeq) { + return buildCheckpointValue(lastSeq, null, endSeq); + } + + /** + * Build checkpoint value with optional arrival time. + * Format: lastSeq[@arrivalTime][|endSeq] + */ + public static String buildCheckpointValue(String lastSeq, Long arrivalTime, String endSeq) { + StringBuilder sb = new StringBuilder(lastSeq != null ? lastSeq : ""); + if (arrivalTime != null) { + sb.append(ARRIVAL_TIME_SEPARATOR).append(arrivalTime); + } + if (endSeq != null && !endSeq.isEmpty()) { + sb.append(END_SEQ_SEPARATOR).append(endSeq); + } + return sb.toString(); + } + + /** + * String representation of checkpoint. + * Format: streamName,shardId:value,shardId:value,... where value is lastSeq or lastSeq|endSeq. + */ + public static String offsetsToStr(String streamName, Map<String, String> shardToValue) { + String parts = shardToValue.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + return streamName + "," + parts; + } + + /** + * Returns true when {@code lastCheckpointStr} is a well-formed Kinesis checkpoint for {@code streamName}. + * Checks both format (streamName,shardId:seq,...) and that the embedded stream name matches. + */ + public static boolean isValidStreamCheckpoint(Option<String> lastCheckpointStr, String streamName) { + return lastCheckpointStr.isPresent() + && PATTERN.matcher(lastCheckpointStr.get()).matches() + && lastCheckpointStr.get().startsWith(streamName + ","); + } + } + + /** + * Represents a shard to read from, with optional starting sequence number. + * For closed shards, endingSequenceNumber is set so we can store it in the checkpoint + * and later detect data loss when the shard expires. + */ + @AllArgsConstructor + @Getter + public static class KinesisShardRange implements java.io.Serializable { + private final String shardId; + /** If empty, use TRIM_HORIZON or LATEST based on config. */ + private final Option<String> startingSequenceNumber; + /** For closed shards: the shard's ending sequence number. Empty for open shards. */ + private final Option<String> endingSequenceNumber; + + public static KinesisShardRange of(String shardId, Option<String> seqNum) { + return new KinesisShardRange(shardId, seqNum, Option.empty()); + } + + public static KinesisShardRange of(String shardId, Option<String> seqNum, Option<String> endSeq) { + return new KinesisShardRange(shardId, seqNum, endSeq); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * Uses conservative default (useLatestWhenNoCheckpoint=false). + */ + public boolean hasUnreadRecords() { + return hasUnreadRecords(false); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * <ul> + * <li>Open shard: always true (may have new records)</li> + * <li>Closed shard, lastSeq >= endSeq: false (fully consumed)</li> + * <li>Closed shard, no checkpoint and useLatest: false (at LATEST tip, closed shard has no records)</li> + * <li>Otherwise: true (may have unread records or cannot definitively say)</li> + * </ul> + * + * @param useLatestWhenNoCheckpoint when startingSequenceNumber is empty, true means we use LATEST + * (start at tip); for a closed shard there are no records to read + */ + public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) { + String lastSeq = startingSequenceNumber.orElse(null); + String endSeq = endingSequenceNumber.orElse(null); + + // CASE 1: Open shard: may have records + if (endSeq == null || endSeq.isEmpty()) { + return true; + } + // CASE 2: Closed shard with no checkpoint + if (lastSeq == null || lastSeq.isEmpty()) { + return !useLatestWhenNoCheckpoint; + } Review Comment: 🤖 Sequence number comparison via `String.compareTo()` is lexicographic, not numeric. Kinesis sequence numbers are large decimal strings that are not guaranteed to be fixed-width. For example, if two sequence numbers have different digit counts, `"99".compareTo("100") > 0` even though 99 < 100. This affects `hasUnreadRecords`, `checkDataLossOnExpiredShards`, and `checkDataLossOnAvailableShards`. Could you use `new BigInteger(lastSeq).compareTo(new BigInteger(endSeq))` for correctness? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java: ########## @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.config.KinesisSourceConfig; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; + +/** + * Helper for reading from Kinesis Data Streams and managing checkpoints. + * Checkpoint format: streamName,shardId:sequenceNumber,shardId:sequenceNumber,... + */ +@Slf4j +@Getter +public class KinesisOffsetGen { + + public static class CheckpointUtils { + /** Separator between lastSeq and endSeq for closed shards. Seq numbers are numeric, so this is safe. */ + private static final String END_SEQ_SEPARATOR = "|"; + /** + * Separator between lastSeq and arrivalTime (epoch millis of the record with last sequence number). + * '@' is used as it is absent from numeric Kinesis sequence numbers and visually distinct from '|'. + */ + private static final String ARRIVAL_TIME_SEPARATOR = "@"; + /** + * Kinesis checkpoint pattern. + * Format: streamName,shardId:lastSeq[@arrivalTime][|endSeq],... + * For closed shards we store lastSeq|endSeq (or lastSeq@arrivalTime|endSeq with arrival time) + * so we can detect data loss when shard expires. + */ + private static final Pattern PATTERN = Pattern.compile(".*,.*:.*"); + + /** + * Parse checkpoint string to shardId -> value map. + * Value format: lastSeq, or lastSeq@arrivalTime, or lastSeq|endSeq, or lastSeq@arrivalTime|endSeq. + */ + public static Map<String, String> strToOffsets(String checkpointStr) { + Map<String, String> offsetMap = new HashMap<>(); + String[] splits = checkpointStr.split(","); + for (int i = 1; i < splits.length; i++) { + String part = splits[i]; + int colonIdx = part.indexOf(':'); + if (colonIdx > 0 && colonIdx < part.length() - 1) { + String shardId = part.substring(0, colonIdx); + String value = part.substring(colonIdx + 1); + offsetMap.put(shardId, value); + } + } + return offsetMap; + } + + /** + * Extract lastSeq from a checkpoint value. + * Handles formats: "lastSeq", "lastSeq|endSeq", "lastSeq@arrivalTime", "lastSeq@arrivalTime|endSeq". + */ + public static String getLastSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return value; + } + int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR); + if (arrivalSep >= 0) { + return value.substring(0, arrivalSep); + } + int endSep = value.indexOf(END_SEQ_SEPARATOR); + return endSep >= 0 ? value.substring(0, endSep) : value; + } + + /** + * Extract arrivalTime (epoch millis) from a checkpoint value if present, otherwise null. + * Handles formats: "lastSeq@arrivalTime" and "lastSeq@arrivalTime|endSeq". + */ + public static Long getArrivalTimeFromValue(String value) { + if (value == null || value.isEmpty()) { + return null; + } + int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR); + if (arrivalSep < 0) { + return null; + } + String rest = value.substring(arrivalSep + ARRIVAL_TIME_SEPARATOR.length()); + int endSep = rest.indexOf(END_SEQ_SEPARATOR); + String arrivalStr = endSep >= 0 ? rest.substring(0, endSep) : rest; + try { + return Long.parseLong(arrivalStr); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Extract endSeq from a checkpoint value if present. Returns null for open shards. + * Handles formats: "lastSeq|endSeq" and "lastSeq@arrivalTime|endSeq". + * Since '@' and '|' are distinct, '|' always unambiguously marks the endSeq regardless of + * whether an arrival time is present. + */ + public static String getEndSeqFromValue(String value) { + if (value == null || value.isEmpty()) { + return null; + } + int endSep = value.indexOf(END_SEQ_SEPARATOR); + return endSep >= 0 && endSep < value.length() - 1 ? value.substring(endSep + 1) : null; + } + + /** + * Parse a checkpoint value into (lastSeq, endSeq). Combines {@link #getLastSeqFromValue} and + * {@link #getEndSeqFromValue} into a single call to avoid parsing the value string twice. + * @return Pair where left=lastSeq (empty Option when absent), right=endSeq (empty Option for open shards) + */ + public static Pair<Option<String>, Option<String>> parseCheckpointValue(String value) { + return Pair.of(Option.ofNullable(getLastSeqFromValue(value)), + Option.ofNullable(getEndSeqFromValue(value))); + } + + /** + * Build checkpoint value without arrival time: "lastSeq" or "lastSeq|endSeq". + */ + public static String buildCheckpointValue(String lastSeq, String endSeq) { + return buildCheckpointValue(lastSeq, null, endSeq); + } + + /** + * Build checkpoint value with optional arrival time. + * Format: lastSeq[@arrivalTime][|endSeq] + */ + public static String buildCheckpointValue(String lastSeq, Long arrivalTime, String endSeq) { + StringBuilder sb = new StringBuilder(lastSeq != null ? lastSeq : ""); + if (arrivalTime != null) { + sb.append(ARRIVAL_TIME_SEPARATOR).append(arrivalTime); + } + if (endSeq != null && !endSeq.isEmpty()) { + sb.append(END_SEQ_SEPARATOR).append(endSeq); + } + return sb.toString(); + } + + /** + * String representation of checkpoint. + * Format: streamName,shardId:value,shardId:value,... where value is lastSeq or lastSeq|endSeq. + */ + public static String offsetsToStr(String streamName, Map<String, String> shardToValue) { + String parts = shardToValue.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + return streamName + "," + parts; + } + + /** + * Returns true when {@code lastCheckpointStr} is a well-formed Kinesis checkpoint for {@code streamName}. + * Checks both format (streamName,shardId:seq,...) and that the embedded stream name matches. + */ + public static boolean isValidStreamCheckpoint(Option<String> lastCheckpointStr, String streamName) { + return lastCheckpointStr.isPresent() + && PATTERN.matcher(lastCheckpointStr.get()).matches() + && lastCheckpointStr.get().startsWith(streamName + ","); + } + } + + /** + * Represents a shard to read from, with optional starting sequence number. + * For closed shards, endingSequenceNumber is set so we can store it in the checkpoint + * and later detect data loss when the shard expires. + */ + @AllArgsConstructor + @Getter + public static class KinesisShardRange implements java.io.Serializable { + private final String shardId; + /** If empty, use TRIM_HORIZON or LATEST based on config. */ + private final Option<String> startingSequenceNumber; + /** For closed shards: the shard's ending sequence number. Empty for open shards. */ + private final Option<String> endingSequenceNumber; + + public static KinesisShardRange of(String shardId, Option<String> seqNum) { + return new KinesisShardRange(shardId, seqNum, Option.empty()); + } + + public static KinesisShardRange of(String shardId, Option<String> seqNum, Option<String> endSeq) { + return new KinesisShardRange(shardId, seqNum, endSeq); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * Uses conservative default (useLatestWhenNoCheckpoint=false). + */ + public boolean hasUnreadRecords() { + return hasUnreadRecords(false); + } + + /** + * Returns true if this range may have unread records, false if we can definitively determine it has none. + * <ul> + * <li>Open shard: always true (may have new records)</li> + * <li>Closed shard, lastSeq >= endSeq: false (fully consumed)</li> + * <li>Closed shard, no checkpoint and useLatest: false (at LATEST tip, closed shard has no records)</li> + * <li>Otherwise: true (may have unread records or cannot definitively say)</li> + * </ul> + * + * @param useLatestWhenNoCheckpoint when startingSequenceNumber is empty, true means we use LATEST + * (start at tip); for a closed shard there are no records to read + */ + public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) { + String lastSeq = startingSequenceNumber.orElse(null); + String endSeq = endingSequenceNumber.orElse(null); + + // CASE 1: Open shard: may have records + if (endSeq == null || endSeq.isEmpty()) { + return true; + } + // CASE 2: Closed shard with no checkpoint + if (lastSeq == null || lastSeq.isEmpty()) { + return !useLatestWhenNoCheckpoint; + } + // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed + if (lastSeq.compareTo(endSeq) >= 0) { + return false; + } + // CASE 4: lastSeq < endSeq: may have unread records + return true; + } + } + + /** Name of the Kinesis Data Stream to consume from (e.g. "my-stream"). */ + private final String streamName; + /** AWS region where the Kinesis stream is provisioned (e.g. "us-east-1"). Required because KinesisClient is region-scoped — each client instance connects to exactly one regional endpoint. */ + private final String region; + /** Optional custom Kinesis endpoint URL, used for localstack or VPC endpoints. Empty when using the default AWS endpoint. */ + private final Option<String> endpointUrl; + /** Strategy that determines where to start reading when no prior checkpoint exists (LATEST, TRIM_HORIZON, etc.). */ + private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPositionStrategy; + /** Raw configuration properties passed from HoodieStreamer; used to resolve Kinesis-specific and shared settings. */ + private final TypedProperties props; + + public KinesisOffsetGen(TypedProperties props) { + this.props = props; + checkRequiredConfigProperties(props, + Arrays.asList(KinesisSourceConfig.KINESIS_STREAM_NAME, KinesisSourceConfig.KINESIS_REGION)); + this.streamName = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_STREAM_NAME); + this.region = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_REGION); + this.endpointUrl = Option.ofNullable(getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ENDPOINT_URL, null)); + this.startingPositionStrategy = KinesisSourceConfig.KinesisStartingPositionStrategy.fromString( + getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_STARTING_POSITION, true)); + } + + /** + * Builds a Kinesis client from explicit parameters. Used by both the instance method + * {@link #createKinesisClient()} and by {@link org.apache.hudi.utilities.sources.JsonKinesisSource} + * from serializable {@link KinesisReadConfig} in Spark closures. + */ + public static KinesisClient createKinesisClient(String region, String endpointUrl, + String accessKey, String secretKey) { + KinesisClientBuilder builder = KinesisClient.builder().region(Region.of(region)); + if (endpointUrl != null && !endpointUrl.isEmpty()) { + builder = builder.endpointOverride(URI.create(endpointUrl)); + } + if (accessKey != null && !accessKey.isEmpty() && secretKey != null && !secretKey.isEmpty()) { + builder = builder.credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); + } + return builder.build(); + } + + public KinesisClient createKinesisClient() { + String accessKey = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, null); + String secretKey = getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, null); + return createKinesisClient(region, endpointUrl.orElse(null), accessKey, secretKey); + } + + /** + * List all active shards for the stream. + * Note: AWS API disallows streamName and nextToken in the same request. + */ + public List<Shard> listShards(KinesisClient client) { + List<Shard> allShards = new ArrayList<>(); + String nextToken = null; + do { + ListShardsRequest request = nextToken != null + ? ListShardsRequest.builder().nextToken(nextToken).build() + : ListShardsRequest.builder().streamName(streamName).build(); + ListShardsResponse response; + try { + response = client.listShards(request); + } catch (ResourceNotFoundException e) { + throw new HoodieReadFromSourceException("Kinesis stream " + streamName + " not found", e); + } catch (ProvisionedThroughputExceededException e) { + throw new HoodieReadFromSourceException("Kinesis throughput exceeded listing shards for " + streamName, e); + } catch (LimitExceededException e) { + throw new HoodieReadFromSourceException("Kinesis limit exceeded listing shards: " + e.getMessage(), e); + } + allShards.addAll(response.shards()); + nextToken = response.nextToken(); + } while (nextToken != null); + + // Include both open and closed shards. Closed shards (e.g., from resharding) may still contain + // unread records within the retention period. GetRecords works on closed shards until all data + // is consumed, at which point NextShardIterator returns null. + long numOpenShards = allShards.stream() + .filter(s -> s.sequenceNumberRange() != null && s.sequenceNumberRange().endingSequenceNumber() == null) + .count(); + log.info("Found {} shards for stream {} ({} open, {} closed)", + allShards.size(), streamName, numOpenShards, allShards.size() - numOpenShards); + logShardSequenceRanges(allShards); + return allShards; + } + + /** + * Logs each shard's start/end sequence number so they can be used when resetting the checkpoint. + */ + private void logShardSequenceRanges(List<Shard> shards) { + for (Shard shard : shards) { + String startSeq = (shard.sequenceNumberRange() != null && shard.sequenceNumberRange().startingSequenceNumber() != null) + ? shard.sequenceNumberRange().startingSequenceNumber() : "n/a"; + String endSeq = (shard.sequenceNumberRange() != null && shard.sequenceNumberRange().endingSequenceNumber() != null) + ? shard.sequenceNumberRange().endingSequenceNumber() : null; + if (endSeq != null) { + log.info("Shard {}: startSeq={}, endSeq={} (for checkpoint reset: {}:{}|{})", + shard.shardId(), startSeq, endSeq, shard.shardId(), startSeq, endSeq); + } else { + log.info("Shard {}: startSeq={}, endSeq=open (for checkpoint reset from start: {}:{})", + shard.shardId(), startSeq, shard.shardId(), startSeq); + } + } + } + + /** + * Get shard ranges to read, based on checkpoint and limits. + */ + public KinesisShardRange[] getNextShardRanges(Option<Checkpoint> lastCheckpoint, long sourceLimit) { + long numEvents = calculateNumEvents(sourceLimit, props); + + try (KinesisClient client = createKinesisClient()) { + // STEP 1: List all open and closed shards from the server. + // Note: no expired shards. + List<Shard> shards = listShards(client); + if (shards.isEmpty()) { + return new KinesisShardRange[0]; + } + // STEP 2: parse last checkpoint if exists. + Map<String, String> fromSequenceNumbers = new HashMap<>(); + Option<String> lastCheckpointStr = lastCheckpoint.isPresent() + ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(); + if (CheckpointUtils.isValidStreamCheckpoint(lastCheckpointStr, streamName)) { + Map<String, String> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + if (!checkpointOffsets.isEmpty()) { + // Check for expired shards (checkpoint references shards no longer in stream, e.g., past retention) + Set<String> availableShardIds = shards.stream().map(Shard::shardId).collect(Collectors.toSet()); + List<String> expiredShardIds = checkpointOffsets.keySet().stream() + .filter(id -> !availableShardIds.contains(id)) + .collect(Collectors.toList()); + // Handle expired shards that exist in the last checkpoint. + // This is important to detect data loss. + if (!expiredShardIds.isEmpty()) { + checkDataLossOnExpiredShards(expiredShardIds, checkpointOffsets); + } + // Handle regular case. + // Parse lastSeq for open and closed shards. + // For closed shards, even if all their records have been consumed, they are still included. + for (String shardId : availableShardIds) { + if (checkpointOffsets.containsKey(shardId)) { + Option<String> lastSeqOpt = Option.ofNullable( + CheckpointUtils.getLastSeqFromValue(checkpointOffsets.get(shardId))) + .flatMap(seq -> seq.isEmpty() ? Option.empty() : Option.of(seq)); + lastSeqOpt.ifPresent(seq -> fromSequenceNumbers.put(shardId, seq)); + } + } + // Check if any available shard's checkpoint lastSeq has fallen behind the shard's Review Comment: 🤖 The Javadoc says this detects when checkpoint falls behind the trim horizon, but `shard.sequenceNumberRange().startingSequenceNumber()` is the shard's creation-time starting sequence number, not the current trim horizon. I believe `ListShards` doesn't expose the trim horizon — so this check would only fire if the checkpoint somehow references a sequence number before the shard was created, which shouldn't happen. Is this intended as a best-effort sanity check, or did you expect `startingSequenceNumber` to advance with retention? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
