This is an automated email from the ASF dual-hosted git repository. ferenc-csaky pushed a commit to branch v6.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit d8d14ac10d24c832834494590a092fd099970fd2 Author: pelaezryan <[email protected]> AuthorDate: Thu Apr 30 03:23:28 2026 -0700 [FLINK-39540][Connectors/Kinesis][6.x] Addressed bugs for EFO subscriptions when they are completed (#243) * Removed resubscription for EFO subscriptions when they are completed * Added unit tests for Kinesis EFO subscriptions * Removed cancel() since this blocked subscriptions from restarting when necessary (e.g. 5 minute subscription timeout) * Addressed flink resubscriptions for resharding/shardend and timeouts * Addressed revision comments and added java docs * Ran mvn spotless to address formatting --------- Co-authored-by: Ryan Pelaez <[email protected]> (cherry picked from commit c662ddd044d7ed69d995a84a39045cc8181aba8c) --- .../fanout/FanOutKinesisShardSubscription.java | 87 +++++-- .../fanout/FanOutKinesisShardSubscriptionTest.java | 287 +++++++++++++++++++++ 2 files changed, 358 insertions(+), 16 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java index a299e50..9674951 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java @@ -80,7 +80,6 @@ public class FanOutKinesisShardSubscription { // Queue is meant for eager retrieval of records from the Kinesis stream. We will always have 2 // record batches available on next read. private final BlockingQueue<SubscribeToShardEvent> eventQueue = new LinkedBlockingQueue<>(2); - private final AtomicBoolean subscriptionActive = new AtomicBoolean(false); private final AtomicReference<Throwable> subscriptionException = new AtomicReference<>(); // Store the current starting position for this subscription. Will be updated each time new @@ -108,7 +107,8 @@ public class FanOutKinesisShardSubscription { shardId, startingPosition, consumerArn); - if (subscriptionActive.get()) { + if (shardSubscriber != null + && shardSubscriber.getSubscriptionState() == SubscriptionState.SUBSCRIBED) { LOG.warn("Skipping activation of subscription since it is already active."); return; } @@ -166,9 +166,9 @@ public class FanOutKinesisShardSubscription { shardId, startingPosition, consumerArn); - subscriptionActive.set(true); // Request first batch of records. shardSubscriber.requestRecords(); + } else { String errorMessage = "Timeout when subscribing to shard " @@ -236,16 +236,37 @@ public class FanOutKinesisShardSubscription { throw new KinesisStreamsSourceException( "Subscription encountered unrecoverable exception.", throwable); } + final SubscriptionState state = + Optional.ofNullable(shardSubscriber) + .map(FanOutShardSubscriber::getSubscriptionState) + .orElse(SubscriptionState.NOT_STARTED); - if (!subscriptionActive.get()) { - LOG.debug( - "Subscription to shard {} for consumer {} is not yet active. Skipping.", - shardId, - consumerArn); - return null; + switch (state) { + case NOT_STARTED: + LOG.debug( + "Subscription to shard {} for consumer {} is not yet active. Skipping.", + shardId, + consumerArn); + return null; + case COMPLETED: + if (shardSubscriber.isShardEndReached()) { + LOG.info( + "Subscription reached SHARD_END for shard {} for consumer {}.", + shardId, + consumerArn); + return null; + } + LOG.info( + "Subscription expired to shard {} for consumer {}. Restarting.", + shardId, + consumerArn); + activateSubscription(); + return null; + case SUBSCRIBED: + return eventQueue.poll(); + default: + throw new IllegalStateException("Unknown subscription state: " + state); } - - return eventQueue.poll(); } /** @@ -254,26 +275,48 @@ public class FanOutKinesisShardSubscription { */ private class FanOutShardSubscriber implements Subscriber<SubscribeToShardEventStream> { private final CountDownLatch subscriptionLatch; - private Subscription subscription; + private final AtomicReference<SubscriptionState> subscriptionState = + new AtomicReference<>(SubscriptionState.NOT_STARTED); + private final AtomicBoolean isShardEnd = new AtomicBoolean(false); + private FanOutShardSubscriber(CountDownLatch subscriptionLatch) { this.subscriptionLatch = subscriptionLatch; } + /** + * Fetch the state that the subscriber is in. + * + * @return Subscription state for the subscriber. + */ + public SubscriptionState getSubscriptionState() { + return subscriptionState.get(); + } + + /** + * Boolean whether this subscriber has reached the end of a shard. + * + * @return True if ShardEnd. false otherwise. + */ + public boolean isShardEndReached() { + return isShardEnd.get(); + } + public void requestRecords() { subscription.request(1); } public void cancel() { - if (!subscriptionActive.get()) { + if (this.subscriptionState.get() == SubscriptionState.COMPLETED) { LOG.warn("Trying to cancel inactive subscription. Ignoring."); return; } - subscriptionActive.set(false); + if (subscription != null) { subscription.cancel(); } + this.subscriptionState.set(SubscriptionState.COMPLETED); } @Override @@ -284,6 +327,7 @@ public class FanOutKinesisShardSubscription { startingPosition, consumerArn); this.subscription = subscription; + this.subscriptionState.set(SubscriptionState.SUBSCRIBED); subscriptionLatch.countDown(); } @@ -300,6 +344,11 @@ public class FanOutKinesisShardSubscription { event); eventQueue.put(event); + if (event.continuationSequenceNumber() == null) { + isShardEnd.set(true); + return; + } + // Update the starting position in case we have to recreate the // subscription startingPosition = @@ -330,8 +379,14 @@ public class FanOutKinesisShardSubscription { @Override public void onComplete() { LOG.info("Subscription complete - {} ({})", shardId, consumerArn); - cancel(); - activateSubscription(); + this.subscriptionState.set(SubscriptionState.COMPLETED); } } + + /** States that the {@code FanOutShardSubscriber} may be in. */ + private enum SubscriptionState { + NOT_STARTED, + SUBSCRIBED, + COMPLETED + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java new file mode 100644 index 0000000..b6d66b7 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader.fanout; + +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link FanOutKinesisShardSubscription}. */ +class FanOutKinesisShardSubscriptionTest { + + private static final String TEST_SHARD_ID = generateShardId(1); + private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(5); + + @Test + void testNextEventReturnsNullBeforeActivation() { + AsyncStreamProxy proxy = FakeKinesisFanOutBehaviorsFactory.boundedShard().build(); + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + SUBSCRIPTION_TIMEOUT); + + assertThat(subscription.nextEvent()).isNull(); + } + + @Test + void testResourceNotFoundExceptionThrown() { + AsyncStreamProxy proxy = + FakeKinesisFanOutBehaviorsFactory.resourceNotFoundWhenObtainingSubscription(); + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + SUBSCRIPTION_TIMEOUT); + + subscription.activateSubscription(); + + // Poll until exception surfaces + assertThatThrownBy( + () -> { + for (int i = 0; i < 200; i++) { + subscription.nextEvent(); + Thread.sleep(50); + } + }) + .isInstanceOf(ResourceNotFoundException.class); + } + + @Test + void testUnrecoverableExceptionWrappedInSourceException() throws Exception { + AsyncStreamProxy proxy = + new AsyncStreamProxy() { + @Override + public CompletableFuture<Void> subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + responseHandler.exceptionOccurred( + new IllegalStateException("unrecoverable")); + return CompletableFuture.completedFuture(null); + } + + @Override + public void close() {} + }; + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + SUBSCRIPTION_TIMEOUT); + + subscription.activateSubscription(); + + assertThatThrownBy( + () -> { + for (int i = 0; i < 200; i++) { + subscription.nextEvent(); + Thread.sleep(50); + } + }) + .isInstanceOf(KinesisStreamsSourceException.class) + .hasMessageContaining("unrecoverable"); + } + + @Test + void testSubscriptionTimeoutTerminatesSubscription() throws Exception { + AsyncStreamProxy proxy = + new AsyncStreamProxy() { + @Override + public CompletableFuture<Void> subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + return new CompletableFuture<>(); + } + + @Override + public void close() {} + }; + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + Duration.ofMillis(200)); + + subscription.activateSubscription(); + + // Wait for timeout to trigger, then poll - should recover + Thread.sleep(500); + SubscribeToShardEvent event = subscription.nextEvent(); + assertThat(event).isNull(); + } + + @Test + void testExpiredSubscriptionResubscribes() throws Exception { + AtomicInteger subscribeCount = new AtomicInteger(0); + AsyncStreamProxy proxy = + new AsyncStreamProxy() { + @Override + public CompletableFuture<Void> subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + subscribeCount.incrementAndGet(); + return CompletableFuture.supplyAsync( + () -> { + responseHandler.responseReceived( + SubscribeToShardResponse.builder().build()); + responseHandler.onEventStream( + subscriber -> { + subscriber.onSubscribe( + new Subscription() { + @Override + public void request(long n) { + // Complete without sending any + // events (simulates 5-min expiry) + subscriber.onComplete(); + } + + @Override + public void cancel() {} + }); + }); + return null; + }); + } + + @Override + public void close() {} + }; + + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + SUBSCRIPTION_TIMEOUT); + + subscription.activateSubscription(); + Thread.sleep(500); + + // nextEvent() should detect COMPLETED without shard-end and trigger resubscription + subscription.nextEvent(); + Thread.sleep(500); + + assertThat(subscribeCount.get()).isEqualTo(2); + } + + @Test + void testShardEndDoesNotResubscribe() throws Exception { + AtomicInteger subscribeCount = new AtomicInteger(0); + AsyncStreamProxy proxy = + new AsyncStreamProxy() { + @Override + public CompletableFuture<Void> subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + subscribeCount.incrementAndGet(); + return CompletableFuture.supplyAsync( + () -> { + responseHandler.responseReceived( + SubscribeToShardResponse.builder().build()); + responseHandler.onEventStream( + subscriber -> { + subscriber.onSubscribe( + new Subscription() { + private boolean sent = false; + + @Override + public void request(long n) { + if (!sent) { + sent = true; + // Send event with null + // continuation (shard end) + subscriber.onNext( + SubscribeToShardEvent + .builder() + .millisBehindLatest( + 0L) + .continuationSequenceNumber( + null) + .build()); + } else { + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }); + }); + return null; + }); + } + + @Override + public void close() {} + }; + + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + proxy, + CONSUMER_ARN, + TEST_SHARD_ID, + StartingPosition.fromStart(), + SUBSCRIPTION_TIMEOUT); + + subscription.activateSubscription(); + Thread.sleep(500); + + // Drain the shard-end event from the queue + subscription.nextEvent(); + Thread.sleep(500); + + // Should not have resubscribed — shard has ended + assertThat(subscribeCount.get()).isEqualTo(1); + assertThat(subscription.nextEvent()).isNull(); + } +}
