This is an automated email from the ASF dual-hosted git repository.
ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new ede02da [FLINK-39660] Fix Netty event loop threads blocking and race
condition in `FanOutKinesisShardSubscription`
ede02da is described below
commit ede02dae4f0f98efe8bc174872f94db8194442b1
Author: chenylee-aws <[email protected]>
AuthorDate: Tue May 26 12:03:42 2026 -0700
[FLINK-39660] Fix Netty event loop threads blocking and race condition in
`FanOutKinesisShardSubscription`
---
.../kinesis/source/KinesisStreamsSource.java | 3 +-
.../fanout/FanOutKinesisShardSplitReader.java | 17 +-
.../fanout/FanOutKinesisShardSubscription.java | 457 ++++++-----
.../fanout/FanOutKinesisShardSubscriptionTest.java | 893 ++++++++++++++++-----
4 files changed, 959 insertions(+), 411 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index e07add1..69c8065 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -72,7 +72,6 @@ import
software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRespo
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
-import software.amazon.awssdk.utils.AttributeMap;
import java.time.Duration;
import java.util.Map;
@@ -271,7 +270,7 @@ public class KinesisStreamsSource<T>
SdkAsyncHttpClient asyncHttpClient =
AWSGeneralUtil.createAsyncHttpClient(
- AttributeMap.builder().build(),
NettyNioAsyncHttpClient.builder());
+ kinesisClientProperties,
NettyNioAsyncHttpClient.builder());
KinesisAsyncClient kinesisAsyncClient =
AWSClientUtil.createAwsAsyncClient(
kinesisClientProperties,
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
index c0aefee..a8baaab 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
@@ -32,6 +32,7 @@ import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
@@ -44,6 +45,7 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
private final AsyncStreamProxy asyncStreamProxy;
private final String consumerArn;
private final Duration subscriptionTimeout;
+ private final ScheduledThreadPoolExecutor timeoutScheduler;
private final Map<String, FanOutKinesisShardSubscription>
splitSubscriptions = new HashMap<>();
@@ -56,6 +58,15 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
this.subscriptionTimeout =
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
+ this.timeoutScheduler =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread t = new Thread(r,
"subscription-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
+ this.timeoutScheduler.setRemoveOnCancelPolicy(true);
}
@Override
@@ -71,6 +82,7 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
boolean shardCompleted = event.continuationSequenceNumber() == null;
if (shardCompleted) {
splitSubscriptions.remove(splitState.getShardId());
+ subscription.close();
}
return new RecordBatch(event.records(), event.millisBehindLatest(),
shardCompleted);
}
@@ -85,7 +97,8 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
consumerArn,
split.getShardId(),
split.getStartingPosition(),
- subscriptionTimeout);
+ subscriptionTimeout,
+ timeoutScheduler);
subscription.activateSubscription();
splitSubscriptions.put(split.splitId(), subscription);
}
@@ -93,6 +106,8 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
@Override
public void close() throws Exception {
+
splitSubscriptions.values().forEach(FanOutKinesisShardSubscription::close);
+ timeoutScheduler.shutdownNow();
asyncStreamProxy.close();
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
index 9674951..72df724 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
@@ -44,12 +44,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -70,132 +69,171 @@ public class FanOutKinesisShardSubscription {
TimeoutException.class,
IOException.class,
LimitExceededException.class);
-
+ private final ScheduledExecutorService timeoutScheduler;
private final AsyncStreamProxy kinesis;
private final String consumerArn;
private final String shardId;
-
private final Duration subscriptionTimeout;
- // Queue is meant for eager retrieval of records from the Kinesis stream.
We will always have 2
- // record batches available on next read.
- private final BlockingQueue<SubscribeToShardEvent> eventQueue = new
LinkedBlockingQueue<>(2);
+ /**
+ * Number of events to keep in flight per subscriber. Pipelining the fetch
overlaps the server's
+ * next-event work with the consumer's drain work. Must match the capacity
of {@link
+ * #eventQueue}.
+ */
+ private static final int PREFETCH = 2;
+
+ private final BlockingQueue<SubscribeToShardEvent> eventQueue =
+ new LinkedBlockingQueue<>(PREFETCH);
private final AtomicReference<Throwable> subscriptionException = new
AtomicReference<>();
- // Store the current starting position for this subscription. Will be
updated each time new
- // batch of records is consumed
- private StartingPosition startingPosition;
+ // All fields below are guarded by lockObject
+ private final Object lockObject = new Object();
+ private ScheduledFuture<?> timeoutFuture;
private FanOutShardSubscriber shardSubscriber;
+ private boolean closed = false;
+ private StartingPosition startingPosition;
public FanOutKinesisShardSubscription(
AsyncStreamProxy kinesis,
String consumerArn,
String shardId,
StartingPosition startingPosition,
- Duration subscriptionTimeout) {
+ Duration subscriptionTimeout,
+ ScheduledExecutorService timeoutScheduler) {
this.kinesis = kinesis;
this.consumerArn = consumerArn;
this.shardId = shardId;
this.startingPosition = startingPosition;
this.subscriptionTimeout = subscriptionTimeout;
+ this.timeoutScheduler = timeoutScheduler;
}
/** Method to allow eager activation of the subscription. */
public void activateSubscription() {
- LOG.info(
- "Activating subscription to shard {} with starting position {}
for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- if (shardSubscriber != null
- && shardSubscriber.getSubscriptionState() ==
SubscriptionState.SUBSCRIBED) {
- LOG.warn("Skipping activation of subscription since it is already
active.");
- return;
- }
+ synchronized (lockObject) {
+ if (closed) {
+ LOG.info("Subscription for shard {} is closed; skipping
activation.", shardId);
+ return;
+ }
+ if (startingPosition == null) {
+ LOG.info(
+ "Shard {} has been completely consumed (shard end).
Skipping re-subscription.",
+ shardId);
+ return;
+ }
+ if (shardSubscriber != null) {
+ LOG.warn(
+ "Shard {} Skipping activation of subscription since
one is already active or in progress.",
+ shardId);
+ return;
+ }
- // We have to use our own CountDownLatch to wait for subscription to
be acquired because
- // subscription event is tracked via the handler.
- CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
- shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
- SubscribeToShardResponseHandler responseHandler =
- SubscribeToShardResponseHandler.builder()
- .subscriber(() -> shardSubscriber)
- .onError(
- throwable -> {
- // Errors that occur when obtaining a
subscription are thrown
- // here.
- // After subscription is acquired, these
errors can be ignored.
- if (waitForSubscriptionLatch.getCount() >
0) {
- terminateSubscription(throwable);
- waitForSubscriptionLatch.countDown();
+ LOG.info(
+ "Activating subscription to shard {} with starting
position {} for consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn);
+
+ FanOutShardSubscriber subscriber = new FanOutShardSubscriber();
+ shardSubscriber = subscriber;
+
+ SubscribeToShardResponseHandler responseHandler =
+ SubscribeToShardResponseHandler.builder()
+ .subscriber(() -> subscriber)
+ .onError(
+ throwable -> {
+ synchronized (lockObject) {
+ if (!disposeIfActive(subscriber)) {
+ return;
+ }
+ }
+ LOG.error(
+ "Error onError subscribing to
shard {} with "
+ + "starting position
{} for consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn,
+ throwable);
+ setSubscriptionException(throwable);
+ })
+ .build();
+
+ cancelTimeoutFuture();
+ timeoutFuture =
+ timeoutScheduler.schedule(
+ () -> {
+ String errorMessage =
+ "Timeout when subscribing to shard "
+ + shardId
+ + " with starting position "
+ + startingPosition
+ + " for consumer "
+ + consumerArn
+ + ".";
+ synchronized (lockObject) {
+ // The timeout future was cancelled
between firing and
+ // acquiring the lock (e.g. onSubscribe
succeeded, or another
+ // error path disposed the subscriber). Do
nothing.
+ if (timeoutFuture == null) {
+ return;
}
- })
- .build();
-
- // We don't need to keep track of the future here because we monitor
subscription success
- // using our own CountDownLatch
- kinesis.subscribeToShard(consumerArn, shardId, startingPosition,
responseHandler)
- .exceptionally(
- throwable -> {
- // If consumer exists and is still activating, we
want to countdown.
- if (ExceptionUtils.findThrowable(
- throwable,
ResourceInUseException.class)
- .isPresent()) {
- waitForSubscriptionLatch.countDown();
+ if (!disposeIfActive(subscriber)) {
+ return;
+ }
+ }
+ LOG.error(errorMessage);
+ setSubscriptionException(new
TimeoutException(errorMessage));
+ },
+ subscriptionTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ kinesis.subscribeToShard(consumerArn, shardId, startingPosition,
responseHandler)
+ .exceptionally(
+ throwable -> {
+ synchronized (lockObject) {
+ if (!disposeIfActive(subscriber)) {
+ return null;
+ }
+ }
+ LOG.error(
+ "Error exceptionally subscribing to
shard {} with starting position {} for "
+ + "consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn,
+ throwable);
+ setSubscriptionException(throwable);
return null;
- }
- LOG.error(
- "Error subscribing to shard {} with
starting position {} for consumer {}.",
- shardId,
- startingPosition,
- consumerArn,
- throwable);
- terminateSubscription(throwable);
- return null;
- });
-
- // We have to handle timeout for subscriptions separately because Java
8 does not support a
- // fluent orTimeout() methods on CompletableFuture.
- CompletableFuture.runAsync(
- () -> {
- try {
- if (waitForSubscriptionLatch.await(
- subscriptionTimeout.toMillis(),
TimeUnit.MILLISECONDS)) {
- LOG.info(
- "Successfully subscribed to shard {} with
starting position {} for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- // Request first batch of records.
- shardSubscriber.requestRecords();
-
- } else {
- String errorMessage =
- "Timeout when subscribing to shard "
- + shardId
- + " with starting position "
- + startingPosition
- + " for consumer "
- + consumerArn
- + ".";
- LOG.error(errorMessage);
- terminateSubscription(new
TimeoutException(errorMessage));
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for subscription
to complete.", e);
- terminateSubscription(e);
- Thread.currentThread().interrupt();
- }
- });
+ });
+ }
+ }
+
+ // Must be called while holding lockObject
+ private void cancelTimeoutFuture() {
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ timeoutFuture = null;
+ }
+ }
+
+ // Must be called while holding lockObject
+ private boolean disposeIfActive(FanOutShardSubscriber subscriber) {
+ if (shardSubscriber != subscriber) {
+ return false;
+ }
+ cancelTimeoutFuture();
+ shardSubscriber.cancelSubscription();
+ shardSubscriber = null;
+ return true;
}
- private void terminateSubscription(Throwable t) {
+ private void setSubscriptionException(Throwable t) {
if (!subscriptionException.compareAndSet(null, t)) {
LOG.warn(
- "Another subscription exception has been queued, ignoring
subsequent exceptions",
+ "Another subscription exception has been queued for
shardId {}, ignoring subsequent exceptions",
+ shardId,
t);
}
- shardSubscriber.cancel();
}
/**
@@ -209,10 +247,6 @@ public class FanOutKinesisShardSubscription {
public SubscribeToShardEvent nextEvent() {
Throwable throwable = subscriptionException.getAndSet(null);
if (throwable != null) {
- // If consumer is still activating, we want to wait.
- if (ExceptionUtils.findThrowable(throwable,
ResourceInUseException.class).isPresent()) {
- return null;
- }
// We don't want to wrap ResourceNotFoundExceptions because it is
handled via a
// try-catch loop
if (throwable instanceof ResourceNotFoundException) {
@@ -226,46 +260,29 @@ public class FanOutKinesisShardSubscription {
.findFirst();
if (recoverableException.isPresent()) {
LOG.warn(
- "Recoverable exception encountered while subscribing
to shard. Ignoring.",
+ "Recoverable exception encountered for shard {} while
subscribing to shard. Ignoring: {}",
+ shardId,
recoverableException.get());
- shardSubscriber.cancel();
activateSubscription();
return null;
}
- LOG.error("Subscription encountered unrecoverable exception.",
throwable);
+ LOG.error("Subscription encountered unrecoverable exception. {}",
shardId, throwable);
throw new KinesisStreamsSourceException(
"Subscription encountered unrecoverable exception.",
throwable);
}
- final SubscriptionState state =
- Optional.ofNullable(shardSubscriber)
- .map(FanOutShardSubscriber::getSubscriptionState)
- .orElse(SubscriptionState.NOT_STARTED);
-
- switch (state) {
- case NOT_STARTED:
- LOG.debug(
- "Subscription to shard {} for consumer {} is not yet
active. Skipping.",
- shardId,
- consumerArn);
- return null;
- case COMPLETED:
- if (shardSubscriber.isShardEndReached()) {
- LOG.info(
- "Subscription reached SHARD_END for shard {} for
consumer {}.",
- shardId,
- consumerArn);
- return null;
- }
- LOG.info(
- "Subscription expired to shard {} for consumer {}.
Restarting.",
- shardId,
- consumerArn);
- activateSubscription();
- return null;
- case SUBSCRIBED:
- return eventQueue.poll();
- default:
- throw new IllegalStateException("Unknown subscription state: "
+ state);
+
+ return pollAndRequestNext();
+ }
+
+ private SubscribeToShardEvent pollAndRequestNext() {
+ synchronized (lockObject) {
+ SubscribeToShardEvent event = eventQueue.poll();
+ // If shardSubscriber is null, the subscriber has either completed
or been disposed.
+ // In either case, do not issue a follow-up request.
+ if (event != null && shardSubscriber != null) {
+ shardSubscriber.requestRecords();
+ }
+ return event;
}
}
@@ -274,61 +291,52 @@ public class FanOutKinesisShardSubscription {
* Streams.
*/
private class FanOutShardSubscriber implements
Subscriber<SubscribeToShardEventStream> {
- private final CountDownLatch subscriptionLatch;
- private Subscription subscription;
-
- private final AtomicReference<SubscriptionState> subscriptionState =
- new AtomicReference<>(SubscriptionState.NOT_STARTED);
- private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
-
- private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
- this.subscriptionLatch = subscriptionLatch;
- }
- /**
- * Fetch the state that the subscriber is in.
- *
- * @return Subscription state for the subscriber.
- */
- public SubscriptionState getSubscriptionState() {
- return subscriptionState.get();
- }
-
- /**
- * Boolean whether this subscriber has reached the end of a shard.
- *
- * @return True if ShardEnd. false otherwise.
- */
- public boolean isShardEndReached() {
- return isShardEnd.get();
- }
+ private Subscription subscription;
public void requestRecords() {
- subscription.request(1);
- }
-
- public void cancel() {
- if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
- LOG.warn("Trying to cancel inactive subscription. Ignoring.");
- return;
+ // subscription can be null if onSubscribe has not yet fired on a
freshly activated
+ // subscriber. In that case the initial request(1) will be issued
from onSubscribe
+ // itself, so it is safe to skip here.
+ if (subscription != null) {
+ subscription.request(1);
}
+ }
+ public void cancelSubscription() {
if (subscription != null) {
subscription.cancel();
}
- this.subscriptionState.set(SubscriptionState.COMPLETED);
}
@Override
public void onSubscribe(Subscription subscription) {
- LOG.info(
- "Successfully subscribed to shard {} at {} using consumer
{}.",
- shardId,
- startingPosition,
- consumerArn);
- this.subscription = subscription;
- this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
- subscriptionLatch.countDown();
+ synchronized (lockObject) {
+ if (shardSubscriber != this) {
+ // Timeout/error disposed this subscriber and a new one
was created before SDK
+ // called onSubscribe
+ subscription.cancel();
+ return;
+ }
+ cancelTimeoutFuture();
+ this.subscription = subscription;
+
+ int priming = PREFETCH - eventQueue.size();
+ if (priming > 0) {
+ subscription.request(priming);
+ } else {
+ LOG.debug(
+ "Shard {} reactivated with {} buffered event(s).
Skipping initial "
+ + "priming; request(1) will come from the
consumer-drain path.",
+ shardId,
+ eventQueue.size());
+ }
+ LOG.info(
+ "Successfully subscribed to shard {} at {} using
consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn);
+ }
}
@Override
@@ -337,31 +345,58 @@ public class FanOutKinesisShardSubscription {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- try {
+ synchronized (lockObject) {
+ if (shardSubscriber !=
FanOutShardSubscriber.this) {
+ LOG.warn(
+ "Ignoring late event for shard {}
from a disposed "
+ + "subscriber; it will be
re-delivered after "
+ + "reactivation.",
+ shardId);
+ return;
+ }
+
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
- eventQueue.put(event);
- if (event.continuationSequenceNumber() ==
null) {
- isShardEnd.set(true);
+ // Non-blocking offer. Under the prefetch
discipline maintained
+ // by onSubscribe (primes PREFETCH -
queue.size() requests) and
+ // pollAndRequestNext (issues request(1) after
each consumer
+ // drain), the invariant queue.size +
outstanding == PREFETCH
+ // holds in steady state, so the queue is
guaranteed to have
+ // room for each delivered event. If offer()
ever returns false
+ // it indicates a protocol / state invariant
violation (e.g. the
+ // server delivered an unrequested event) -
fail loud rather
+ // than block the Netty event loop. The
subscription will be
+ // reactivated from the previous
startingPosition (which has
+ // not yet been advanced below) and the server
will re-deliver
+ // this event.
+ if (!eventQueue.offer(event)) {
+ LOG.error(
+ "Event queue overflow for shard
{}; server delivered "
+ + "an unrequested event.
Failing subscription "
+ + "to recover.",
+ shardId);
+
+ if
(disposeIfActive(FanOutShardSubscriber.this)) {
+ setSubscriptionException(
+ new IOException(
+ "Event queue overflow
for shard "
+ + shardId
+ + "; server
delivered an "
+ + "unrequested
event."));
+ }
return;
}
- // Update the starting position in case we
have to recreate the
- // subscription
- startingPosition =
-
StartingPosition.continueFromSequenceNumber(
-
event.continuationSequenceNumber());
-
- // Replace the record just consumed in the
Queue
- requestRecords();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KinesisStreamsSourceException(
- "Interrupted while adding Kinesis
record to internal buffer.",
- e);
+ if (event.continuationSequenceNumber() ==
null) {
+ startingPosition = null;
+ } else {
+ startingPosition =
+
StartingPosition.continueFromSequenceNumber(
+
event.continuationSequenceNumber());
+ }
}
}
});
@@ -369,24 +404,36 @@ public class FanOutKinesisShardSubscription {
@Override
public void onError(Throwable throwable) {
- if (!subscriptionException.compareAndSet(null, throwable)) {
- LOG.warn(
- "Another subscription exception has been queued,
ignoring subsequent exceptions",
- throwable);
+ synchronized (lockObject) {
+ if (!disposeIfActive(this)) {
+ return;
+ }
}
+ setSubscriptionException(throwable);
}
@Override
public void onComplete() {
- LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- this.subscriptionState.set(SubscriptionState.COMPLETED);
+ synchronized (lockObject) {
+ if (shardSubscriber != this) {
+ LOG.warn(
+ "Ignoring late onComplete for shard {} from a
disposed subscriber.",
+ shardId);
+ return;
+ }
+ LOG.info("Subscription complete - {} ({})", shardId,
consumerArn);
+ shardSubscriber = null;
+ }
+ activateSubscription();
}
}
- /** States that the {@code FanOutShardSubscriber} may be in. */
- private enum SubscriptionState {
- NOT_STARTED,
- SUBSCRIBED,
- COMPLETED
+ public void close() {
+ synchronized (lockObject) {
+ closed = true;
+ if (shardSubscriber != null) {
+ disposeIfActive(shardSubscriber);
+ }
+ }
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
index b6d66b7..3496d63 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
@@ -21,267 +21,754 @@ package
org.apache.flink.connector.kinesis.source.reader.fanout;
import
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
-import
org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory;
import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
-import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
-import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.SHARD_ID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
-/** Tests for {@link FanOutKinesisShardSubscription}. */
+/**
+ * Tests for {@link FanOutKinesisShardSubscription}.
+ *
+ * <p>These tests focus on the concurrent lifecycle of a subscription:
activation guards, error
+ * disposal, timeout handling, and the identity-based cleanup that prevents
the connection-leak bug.
+ */
class FanOutKinesisShardSubscriptionTest {
- private static final String TEST_SHARD_ID = generateShardId(1);
- private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(5);
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(500);
+ private static final Duration LONG_TIMEOUT = Duration.ofSeconds(10);
+
+ // ----- Happy path -----
@Test
- void testNextEventReturnsNullBeforeActivation() {
- AsyncStreamProxy proxy =
FakeKinesisFanOutBehaviorsFactory.boundedShard().build();
- FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- SUBSCRIPTION_TIMEOUT);
+ void nextEventReturnsNullBeforeActivation() {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
+ // Before activateSubscription is called, nextEvent returns null
assertThat(subscription.nextEvent()).isNull();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(0);
}
@Test
- void testResourceNotFoundExceptionThrown() {
- AsyncStreamProxy proxy =
-
FakeKinesisFanOutBehaviorsFactory.resourceNotFoundWhenObtainingSubscription();
- FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- SUBSCRIPTION_TIMEOUT);
+ void activatesAndDeliversEvents() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+
+ // Deliver onSubscribe and an event with records
+ s1.onSubscribeDelivered();
+ SubscribeToShardEvent event =
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber("cont-1")
+ .build();
+ s1.deliverEvent(event);
+
+ SubscribeToShardEvent received = pollEvent(subscription);
+ assertThat(received).isNotNull();
+ assertThat(received.continuationSequenceNumber()).isEqualTo("cont-1");
+ }
- // Poll until exception surfaces
- assertThatThrownBy(
- () -> {
- for (int i = 0; i < 200; i++) {
- subscription.nextEvent();
- Thread.sleep(50);
- }
- })
- .isInstanceOf(ResourceNotFoundException.class);
+ @Test
+ void onCompleteTriggersReactivationForOngoingShard() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ // deliver an event with a non-null continuation so the shard is not
at end
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber("cont-1")
+ .build());
+ pollEvent(subscription);
+
+ // Natural end of subscription triggers EFO rotation
+ s1.onComplete();
+
+ // A second subscribeToShard call must occur
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(2);
+ // The new activation should resume from where s1 left off
+ assertThat(s2.startingPosition)
+
.isEqualTo(StartingPosition.continueFromSequenceNumber("cont-1"));
}
@Test
- void testUnrecoverableExceptionWrappedInSourceException() throws Exception
{
- AsyncStreamProxy proxy =
- new AsyncStreamProxy() {
- @Override
- public CompletableFuture<Void> subscribeToShard(
- String consumerArn,
- String shardId,
- StartingPosition startingPosition,
- SubscribeToShardResponseHandler responseHandler) {
- responseHandler.exceptionOccurred(
- new IllegalStateException("unrecoverable"));
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public void close() {}
- };
- FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- SUBSCRIPTION_TIMEOUT);
+ void shardEndPreventsReactivation() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ // Deliver event with null continuation number to signal shard end
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber(null)
+ .build());
+ pollEvent(subscription);
+
+ // onComplete normally triggers reactivation, but not when
startingPosition is null
+ s1.onComplete();
+
+ // There should be no further subscribe attempts.
+ // We give the scheduler a moment to make any mistake visible.
+ waitShort();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(1);
+ }
- assertThatThrownBy(
- () -> {
- for (int i = 0; i < 200; i++) {
- subscription.nextEvent();
- Thread.sleep(50);
- }
- })
+ // ----- Concurrent activation prevention -----
+
+ @Test
+ void activateIsNoOpWhenSubscriptionAlreadyInFlight() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ // Important: use a long timeout so the first subscribe doesn't time
out mid-test
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ proxy.awaitSubscription();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(1);
+
+ // Now call activateSubscription again. The first subscription has not
received
+ // onSubscribe yet, so the broken main-branch guard
(subscriptionActive) would
+ // allow a second subscribe to fire. The fixed guard (shardSubscriber
!= null)
+ // must block it.
+ subscription.activateSubscription();
+ subscription.activateSubscription();
+ subscription.activateSubscription();
+
+ waitShort();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(1);
+ }
+
+ @Test
+ void activateIsNoOpAfterOnSubscribeSucceeds() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ subscription.activateSubscription();
+ subscription.activateSubscription();
+
+ waitShort();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(1);
+ }
+
+ // ----- Error handling and retries -----
+
+ @Test
+ void recoverableErrorTriggersRetry() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+
+ // Fail the subscription via the future path (common for acquire
timeouts)
+ s1.completeExceptionally(new IOException("simulated connection
reset"));
+
+ // nextEvent must drain the exception, classify as recoverable, and
retry
+ SubscribeToShardEvent e = subscription.nextEvent();
+ assertThat(e).isNull();
+
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ assertThat(s2).isNotSameAs(s1);
+ assertThat(proxy.subscribeCallCount()).isEqualTo(2);
+ }
+
+ @Test
+ void unrecoverableErrorPropagatesFromNextEvent() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+
+ // Use a non-recoverable runtime exception
+ s1.completeExceptionally(new RuntimeException("nope"));
+
+ assertThatThrownBy(subscription::nextEvent)
.isInstanceOf(KinesisStreamsSourceException.class)
.hasMessageContaining("unrecoverable");
}
@Test
- void testSubscriptionTimeoutTerminatesSubscription() throws Exception {
- AsyncStreamProxy proxy =
- new AsyncStreamProxy() {
- @Override
- public CompletableFuture<Void> subscribeToShard(
- String consumerArn,
- String shardId,
- StartingPosition startingPosition,
- SubscribeToShardResponseHandler responseHandler) {
- return new CompletableFuture<>();
- }
-
- @Override
- public void close() {}
- };
- FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- Duration.ofMillis(200));
+ void resourceNotFoundIsRethrownDirectly() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
- // Wait for timeout to trigger, then poll - should recover
- Thread.sleep(500);
- SubscribeToShardEvent event = subscription.nextEvent();
- assertThat(event).isNull();
+
s1.completeExceptionally(ResourceNotFoundException.builder().message("gone").build());
+
+
assertThatThrownBy(subscription::nextEvent).isInstanceOf(ResourceNotFoundException.class);
}
+ // ----- Dual error path dedup (disposeIfActive identity check) -----
+
@Test
- void testExpiredSubscriptionResubscribes() throws Exception {
- AtomicInteger subscribeCount = new AtomicInteger(0);
- AsyncStreamProxy proxy =
- new AsyncStreamProxy() {
- @Override
- public CompletableFuture<Void> subscribeToShard(
- String consumerArn,
- String shardId,
- StartingPosition startingPosition,
- SubscribeToShardResponseHandler responseHandler) {
- subscribeCount.incrementAndGet();
- return CompletableFuture.supplyAsync(
- () -> {
- responseHandler.responseReceived(
-
SubscribeToShardResponse.builder().build());
- responseHandler.onEventStream(
- subscriber -> {
- subscriber.onSubscribe(
- new Subscription() {
- @Override
- public void
request(long n) {
- // Complete
without sending any
- // events
(simulates 5-min expiry)
-
subscriber.onComplete();
- }
-
- @Override
- public void
cancel() {}
- });
- });
- return null;
- });
- }
-
- @Override
- public void close() {}
- };
+ void dualErrorPathQueuesAtMostOneException() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+
+ // Both error signals fire for the same subscribe (as AWS SDK does in
practice)
+ s1.fireHandlerError(new IOException("handler error"));
+ s1.completeExceptionally(new IOException("future error"));
+
+ // First nextEvent drains exactly one exception and retries
+ assertThat(subscription.nextEvent()).isNull();
+
+ // After retry, there must be exactly ONE new subscribe (not one per
error signal)
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ assertThat(s2).isNotSameAs(s1);
+ assertThat(proxy.subscribeCallCount()).isEqualTo(2);
+
+ // Second nextEvent must not find a lingering exception from the
losing error path
+ assertThat(subscription.nextEvent()).isNull();
+ waitShort();
+ // Still only 2 subscribes (no runaway retry)
+ assertThat(proxy.subscribeCallCount()).isEqualTo(2);
+ }
+
+ // ----- Subscription timeout handling -----
+
+ @Test
+ void subscriptionTimeoutTriggersCleanupAndRetry() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription =
+ newSubscription(proxy, Duration.ofMillis(100));
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ // Deliberately do NOT call s1.onSubscribeDelivered() — the latch will
time out
+
+ // Wait for the timeout to fire and queue a TimeoutException
+ await().atMost(Duration.ofSeconds(2))
+ .untilAsserted(
+ () -> {
+ SubscribeToShardEvent e = subscription.nextEvent();
+ // After the timeout, nextEvent should drain the
exception and fire a
+ // retry.
+ // Keep polling until the retry shows up.
+
assertThat(proxy.subscribeCallCount()).isGreaterThanOrEqualTo(2);
+ assertThat(e).isNull();
+ });
+ }
+
+ // ----- Stale onSubscribe (race: timeout before onSubscribe) -----
+
+ @Test
+ void lateOnSubscribeOnStaleSubscriberIsCancelled() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- SUBSCRIPTION_TIMEOUT);
+ newSubscription(proxy, Duration.ofMillis(100));
subscription.activateSubscription();
- Thread.sleep(500);
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ // Let the timeout fire (no onSubscribe)
+ await().atMost(Duration.ofSeconds(2))
+ .until(() -> subscription.nextEvent() == null &&
proxy.subscribeCallCount() >= 2);
+
+ // Now s1 is stale; a new subscription s2 has been created
+ ScriptedSubscription s2 = proxy.latestSubscription();
+ assertThat(s2).isNotSameAs(s1);
+
+ // Late onSubscribe delivery on the STALE subscriber must result in its
+ // Subscription being cancelled (to free the underlying HTTP/2 stream
slot).
+ s1.onSubscribeDelivered();
+ assertThat(s1.subscription.isCancelled()).isTrue();
+ }
- // nextEvent() should detect COMPLETED without shard-end and trigger
resubscription
- subscription.nextEvent();
- Thread.sleep(500);
+ // ----- Pull-based backpressure: at most one event in flight per
subscriber -----
- assertThat(subscribeCount.get()).isEqualTo(2);
+ @Test
+ void initialRequestIssuedOnlyOnceAfterOnSubscribe() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ // After onSubscribe on an empty queue, exactly PREFETCH requests must
have been issued.
+ // The "max-one-in-flight" variant would prime with 1; depth-2
pipelining primes with 2.
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ // No drain yet → no further requests
+ waitShort();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
}
@Test
- void testShardEndDoesNotResubscribe() throws Exception {
- AtomicInteger subscribeCount = new AtomicInteger(0);
- AsyncStreamProxy proxy =
- new AsyncStreamProxy() {
- @Override
- public CompletableFuture<Void> subscribeToShard(
- String consumerArn,
- String shardId,
- StartingPosition startingPosition,
- SubscribeToShardResponseHandler responseHandler) {
- subscribeCount.incrementAndGet();
- return CompletableFuture.supplyAsync(
- () -> {
- responseHandler.responseReceived(
-
SubscribeToShardResponse.builder().build());
- responseHandler.onEventStream(
- subscriber -> {
- subscriber.onSubscribe(
- new Subscription() {
- private boolean
sent = false;
-
- @Override
- public void
request(long n) {
- if (!sent) {
- sent =
true;
- // Send
event with null
- //
continuation (shard end)
-
subscriber.onNext(
-
SubscribeToShardEvent
-
.builder()
-
.millisBehindLatest(
-
0L)
-
.continuationSequenceNumber(
-
null)
-
.build());
- } else {
-
subscriber.onComplete();
- }
- }
-
- @Override
- public void
cancel() {}
- });
- });
- return null;
- });
- }
-
- @Override
- public void close() {}
- };
+ void requestOneAfterEachConsumerDrain() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
- FanOutKinesisShardSubscription subscription =
- new FanOutKinesisShardSubscription(
- proxy,
- CONSUMER_ARN,
- TEST_SHARD_ID,
- StartingPosition.fromStart(),
- SUBSCRIPTION_TIMEOUT);
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ // Initial priming: PREFETCH (=2) outstanding requests.
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ // Server delivers one event; onNext must NOT issue another request
(backpressure).
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber("cont-1")
+ .build());
+ waitShort();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ // Consumer drains via nextEvent → exactly one more request(1) must
fire
+ SubscribeToShardEvent drained = pollEvent(subscription);
+ assertThat(drained).isNotNull();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(3);
+
+ // Second event/drain cycle
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-2"))
+ .continuationSequenceNumber("cont-2")
+ .build());
+ assertThat(pollEvent(subscription)).isNotNull();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(4);
+ }
+
+ @Test
+ void pipelineDepthMatchesPrefetch() throws Exception {
+ // Verifies the invariant: queue.size + outstanding == PREFETCH (=2).
Fills the queue to
+ // capacity, then drains one at a time, checking the queue never
overflows and
+ // request counts increment as expected.
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
subscription.activateSubscription();
- Thread.sleep(500);
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ // Deliver 2 events back-to-back, filling the queue to PREFETCH.
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber("cont-1")
+ .build());
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-2"))
+ .continuationSequenceNumber("cont-2")
+ .build());
+
+ // No further requests should have fired yet; outstanding = 0, queue =
2, sum = 2.
+ waitShort();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ // Drain event 1 → request(1). Outstanding = 1, queue = 1, sum = 2.
+ SubscribeToShardEvent e1 = pollEvent(subscription);
+ assertThat(e1.continuationSequenceNumber()).isEqualTo("cont-1");
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(3);
+
+ // Server delivers event 3; queue = 2 again.
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-3"))
+ .continuationSequenceNumber("cont-3")
+ .build());
+ waitShort();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(3);
+
+ // Drain event 2 → request(1). Sum stays at 2.
+ SubscribeToShardEvent e2 = pollEvent(subscription);
+ assertThat(e2.continuationSequenceNumber()).isEqualTo("cont-2");
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(4);
+ }
- // Drain the shard-end event from the queue
- subscription.nextEvent();
- Thread.sleep(500);
+ @Test
+ void nextEventOnEmptyQueueDoesNotRequestMore() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
- // Should not have resubscribed — shard has ended
- assertThat(subscribeCount.get()).isEqualTo(1);
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ int priming = s1.subscription.getRequestedCount();
+
+ // nextEvent on an empty queue returns null and must NOT issue a
spurious request(1)
+ assertThat(subscription.nextEvent()).isNull();
+ assertThat(subscription.nextEvent()).isNull();
assertThat(subscription.nextEvent()).isNull();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(priming);
+ }
+
+ // ----- Reactivation invariant: onSubscribe must not re-prime if queue
has leftover events
+ // -----
+
+ @Test
+ void
onSubscribeWithBufferedEventFromPreviousSubscriberDoesNotPrimeRequest() throws
Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ // s1 activates and delivers one event that gets buffered
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ assertThat(s1.subscription.getRequestedCount()).isEqualTo(2);
+
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("seq-1"))
+ .continuationSequenceNumber("cont-1")
+ .build());
+
+ // Before draining, s1 errors (handler path). This disposes s1 but
leaves event1
+ // in the shared queue.
+ s1.fireHandlerError(new IOException("connection closed"));
+
+ // nextEvent drains the exception and reactivates. We should now have
s2 pending.
+ assertThat(subscription.nextEvent()).isNull();
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ assertThat(s2).isNotSameAs(s1);
+
+ // s2's onSubscribe fires. The queue has 1 leftover event, so s2
primes only
+ // (PREFETCH - 1) = 1 to preserve the queue.size + outstanding ==
PREFETCH invariant.
+ s2.onSubscribeDelivered();
+ assertThat(s2.subscription.getRequestedCount())
+ .as("Reactivation with 1 buffered event primes only
PREFETCH-1")
+ .isEqualTo(1);
+
+ // Consumer drains the leftover event; pollAndRequestNext adds another
request(1),
+ // bringing outstanding on s2 back to PREFETCH (=2).
+ SubscribeToShardEvent drained = pollEvent(subscription);
+ assertThat(drained.continuationSequenceNumber()).isEqualTo("cont-1");
+ assertThat(s2.subscription.getRequestedCount())
+ .as("After consumer drain, total requests equal PREFETCH")
+ .isEqualTo(2);
+ }
+
+ @Test
+ void onSubscribeWithEmptyQueuePrimesRequestImmediately() throws Exception {
+ // Counterpart of the above: in the normal case (queue empty on
onSubscribe),
+ // priming equals PREFETCH.
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ assertThat(s1.subscription.getRequestedCount())
+ .as("Normal activation with empty queue primes PREFETCH
requests")
+ .isEqualTo(2);
+ }
+
+ // ----- close() shutdown behavior -----
+
+ @Test
+ void closeCancelsActiveSubscription() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+ assertThat(s1.subscription.isCancelled()).isFalse();
+
+ subscription.close();
+
+ // close must cancel the active reactive Subscription so the
underlying HTTP/2
+ // stream slot is released promptly.
+ assertThat(s1.subscription.isCancelled()).isTrue();
+ }
+
+ @Test
+ void activateAfterCloseIsNoOp() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.close();
+
+ // Any further activation attempts must be silently ignored.
+ subscription.activateSubscription();
+ subscription.activateSubscription();
+
+ waitShort();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(0);
+ }
+
+ @Test
+ void closeIsIdempotent() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ // Two close() calls must not throw and must leave state consistent.
+ subscription.close();
+ subscription.close();
+
+ assertThat(s1.subscription.isCancelled()).isTrue();
+ }
+
+ // ----- Stale onNext drop (identity check in onNext) -----
+
+ @Test
+ void onNextFromStaleSubscriberIsDropped() throws Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ // Disposal via error path retires s1
+ s1.fireHandlerError(new IOException("dispose s1"));
+
+ // Drain the error and trigger reactivation
+ assertThat(subscription.nextEvent()).isNull();
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ s2.onSubscribeDelivered();
+
+ // The AWS SDK may deliver a late onNext on the disposed subscriber
before cancel
+ // propagates. Such events must be silently dropped (identity check in
onNext).
+ s1.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("stale"))
+ .continuationSequenceNumber("stale-cont")
+ .build());
+
+ // nextEvent must NOT return the stale event
+ waitShort();
+ assertThat(subscription.nextEvent()).isNull();
+
+ // The active subscriber can still deliver events normally
+ s2.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("live"))
+ .continuationSequenceNumber("live-cont")
+ .build());
+ SubscribeToShardEvent got = pollEvent(subscription);
+ assertThat(got.continuationSequenceNumber()).isEqualTo("live-cont");
+ }
+
+ @Test
+ void onCompleteFromStaleSubscriberDoesNotDisposeActiveSubscriber() throws
Exception {
+ ScriptedProxy proxy = new ScriptedProxy();
+ FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
+
+ subscription.activateSubscription();
+ ScriptedSubscription s1 = proxy.awaitSubscription();
+ s1.onSubscribeDelivered();
+
+ s1.fireHandlerError(new IOException("dispose s1"));
+ assertThat(subscription.nextEvent()).isNull();
+ ScriptedSubscription s2 = proxy.awaitSubscription();
+ s2.onSubscribeDelivered();
+
+ s1.onComplete();
+
+ waitShort();
+ assertThat(proxy.subscribeCallCount()).isEqualTo(2);
+
+ s2.deliverEvent(
+ SubscribeToShardEvent.builder()
+ .records(record("live"))
+ .continuationSequenceNumber("live-cont")
+ .build());
+ SubscribeToShardEvent got = pollEvent(subscription);
+ assertThat(got.continuationSequenceNumber()).isEqualTo("live-cont");
+ }
+
+ private FanOutKinesisShardSubscription newSubscription(AsyncStreamProxy
proxy) {
+ return newSubscription(proxy, DEFAULT_TIMEOUT);
+ }
+
+ private FanOutKinesisShardSubscription newSubscription(
+ AsyncStreamProxy proxy, Duration subscriptionTimeout) {
+ ScheduledThreadPoolExecutor timeoutScheduler = new
ScheduledThreadPoolExecutor(1);
+ timeoutScheduler.setRemoveOnCancelPolicy(true);
+ return new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ SHARD_ID,
+ StartingPosition.fromStart(),
+ subscriptionTimeout,
+ timeoutScheduler);
+ }
+
+ private static Record record(String sequenceNumber) {
+ return
Record.builder().sequenceNumber(sequenceNumber).partitionKey("pk").build();
+ }
+
+ private static SubscribeToShardEvent
pollEvent(FanOutKinesisShardSubscription subscription) {
+ return
await().atMost(Duration.ofSeconds(2)).until(subscription::nextEvent, e -> e !=
null);
+ }
+
+ private static void waitShort() throws InterruptedException {
+ // Give the async machinery a window to do anything incorrect.
+ // Not a timing assertion — just bounded patience for "nothing should
happen".
+ Thread.sleep(100);
+ }
+
+ // ----- Programmable fake AsyncStreamProxy -----
+
+ /**
+ * A fake {@link AsyncStreamProxy} that records subscribe calls and
exposes each one via a
+ * {@link ScriptedSubscription} handle for the test to drive events and
errors.
+ */
+ private static final class ScriptedProxy implements AsyncStreamProxy {
+ private final ConcurrentLinkedQueue<ScriptedSubscription> calls =
+ new ConcurrentLinkedQueue<>();
+
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ SubscribeToShardResponseHandler responseHandler) {
+ ScriptedSubscription call = new
ScriptedSubscription(startingPosition, responseHandler);
+ calls.add(call);
+ return call.future;
+ }
+
+ @Override
+ public void close() {}
+
+ int subscribeCallCount() {
+ return calls.size();
+ }
+
+ ScriptedSubscription awaitSubscription() {
+ return await().atMost(Duration.ofSeconds(2))
+ .until(
+ () -> {
+ // Return the last unseen call
+ for (ScriptedSubscription c : calls) {
+ if (!c.observed) {
+ c.observed = true;
+ return c;
+ }
+ }
+ return null;
+ },
+ c -> c != null);
+ }
+
+ ScriptedSubscription latestSubscription() {
+ ScriptedSubscription last = null;
+ for (ScriptedSubscription c : calls) {
+ last = c;
+ }
+ return last;
+ }
+ }
+
+ /** A handle for the test to drive a specific {@code subscribeToShard}
call. */
+ private static final class ScriptedSubscription {
+ final StartingPosition startingPosition;
+ final SubscribeToShardResponseHandler handler;
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ final TestSubscription subscription = new TestSubscription();
+ volatile boolean observed = false;
+
+ private final AtomicReference<Subscriber<? super
SubscribeToShardEventStream>>
+ subscriberRef = new AtomicReference<>();
+
+ ScriptedSubscription(
+ StartingPosition startingPosition,
SubscribeToShardResponseHandler handler) {
+ this.startingPosition = startingPosition;
+ this.handler = handler;
+ // Start the event stream handshake: the handler.onEventStream
callback tells us
+ // the Subscriber instance we should deliver signals to.
+ handler.onEventStream(subscriberRef::set);
+ }
+
+ void onSubscribeDelivered() {
+ Subscriber<? super SubscribeToShardEventStream> s =
awaitSubscriber();
+ s.onSubscribe(subscription);
+ }
+
+ void deliverEvent(SubscribeToShardEvent event) {
+ Subscriber<? super SubscribeToShardEventStream> s =
awaitSubscriber();
+ // SubscribeToShardEvent itself implements
SubscribeToShardEventStream, so we can
+ // pass it directly. Its accept() dispatches to
visitor.visit(this).
+ s.onNext(event);
+ }
+
+ void onComplete() {
+ Subscriber<? super SubscribeToShardEventStream> s =
awaitSubscriber();
+ s.onComplete();
+ }
+
+ void fireHandlerError(Throwable t) {
+ handler.exceptionOccurred(t);
+ }
+
+ void completeExceptionally(Throwable t) {
+ future.completeExceptionally(t);
+ }
+
+ private Subscriber<? super SubscribeToShardEventStream>
awaitSubscriber() {
+ return
await().atMost(Duration.ofSeconds(2)).until(subscriberRef::get, s -> s != null);
+ }
+ }
+
+ /** A {@link Subscription} that records whether cancel was called. */
+ private static final class TestSubscription implements Subscription {
+ private final AtomicInteger requested = new AtomicInteger(0);
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ @Override
+ public void request(long n) {
+ requested.addAndGet((int) n);
+ }
+
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+
+ boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ int getRequestedCount() {
+ return requested.get();
+ }
}
}