chenylee-aws commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3299456111
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- try {
+ synchronized (lockObject) {
+ if (shardSubscriber !=
FanOutShardSubscriber.this) {
+ LOG.warn(
+ "Ignoring late event for shard {}
from a disposed "
+ + "subscriber; it will be
re-delivered after "
+ + "reactivation.",
+ shardId);
+ return;
+ }
+
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
- eventQueue.put(event);
- if (event.continuationSequenceNumber() ==
null) {
- isShardEnd.set(true);
+ // Non-blocking offer. Under the prefetch
discipline maintained
+ // by onSubscribe (primes PREFETCH -
queue.size() requests) and
+ // pollAndRequestNext (issues request(1) after
each consumer
+ // drain), the invariant queue.size +
outstanding == PREFETCH
+ // holds in steady state, so the queue is
guaranteed to have
+ // room for each delivered event. If offer()
ever returns false
+ // it indicates a protocol / state invariant
violation (e.g. the
+ // server delivered an unrequested event) -
fail loud rather
+ // than block the Netty event loop. The
subscription will be
+ // reactivated from the previous
startingPosition (which has
+ // not yet been advanced below) and the server
will re-deliver
+ // this event.
+ if (!eventQueue.offer(event)) {
+ LOG.error(
+ "Event queue overflow for shard
{}; server delivered "
+ + "an unrequested event.
Failing subscription "
+ + "to recover.",
+ shardId);
+
+ if
(disposeIfActive(FanOutShardSubscriber.this)) {
+ setSubscriptionException(
+ new IOException(
+ "Event queue overflow
for shard "
+ + shardId
+ + "; server
delivered an "
+ + "unrequested
event."));
+ }
return;
}
- // Update the starting position in case we
have to recreate the
- // subscription
- startingPosition =
-
StartingPosition.continueFromSequenceNumber(
-
event.continuationSequenceNumber());
-
- // Replace the record just consumed in the
Queue
- requestRecords();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KinesisStreamsSourceException(
- "Interrupted while adding Kinesis
record to internal buffer.",
- e);
+ if (event.continuationSequenceNumber() ==
null) {
+ startingPosition = null;
+ } else {
+ startingPosition =
+
StartingPosition.continueFromSequenceNumber(
+
event.continuationSequenceNumber());
+ }
}
}
});
}
@Override
public void onError(Throwable throwable) {
- if (!subscriptionException.compareAndSet(null, throwable)) {
- LOG.warn(
- "Another subscription exception has been queued,
ignoring subsequent exceptions",
- throwable);
+ synchronized (lockObject) {
+ if (!disposeIfActive(this)) {
+ return;
+ }
}
+ setSubscriptionException(throwable);
}
@Override
public void onComplete() {
- LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- this.subscriptionState.set(SubscriptionState.COMPLETED);
+ synchronized (lockObject) {
+ LOG.info("Subscription complete - {} ({})", shardId,
consumerArn);
+ shardSubscriber = null;
+ }
+ activateSubscription();
}
}
- /** States that the {@code FanOutShardSubscriber} may be in. */
- private enum SubscriptionState {
- NOT_STARTED,
- SUBSCRIBED,
- COMPLETED
+ public void close() {
+ synchronized (lockObject) {
+ closed = true;
+ if (shardSubscriber != null) {
Review Comment:
I don't think there's a reachable state where `shardSubscriber == null` but
`timeoutFuture != null` since every path besides `onComplete` that nulls
shardSubscriber goes through `disposeIfActive`, which already calls
`cancelTimeoutFuture()`. For `onComplete`, the timeout should already have been
cancelled by `onSubscribe`. So I'd prefer to keep it as-is and avoid calling
`cancelTimeoutFuture()` in too many places
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- try {
+ synchronized (lockObject) {
+ if (shardSubscriber !=
FanOutShardSubscriber.this) {
+ LOG.warn(
+ "Ignoring late event for shard {}
from a disposed "
+ + "subscriber; it will be
re-delivered after "
+ + "reactivation.",
+ shardId);
+ return;
+ }
+
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
- eventQueue.put(event);
- if (event.continuationSequenceNumber() ==
null) {
- isShardEnd.set(true);
+ // Non-blocking offer. Under the prefetch
discipline maintained
+ // by onSubscribe (primes PREFETCH -
queue.size() requests) and
+ // pollAndRequestNext (issues request(1) after
each consumer
+ // drain), the invariant queue.size +
outstanding == PREFETCH
+ // holds in steady state, so the queue is
guaranteed to have
+ // room for each delivered event. If offer()
ever returns false
+ // it indicates a protocol / state invariant
violation (e.g. the
+ // server delivered an unrequested event) -
fail loud rather
+ // than block the Netty event loop. The
subscription will be
+ // reactivated from the previous
startingPosition (which has
+ // not yet been advanced below) and the server
will re-deliver
+ // this event.
+ if (!eventQueue.offer(event)) {
+ LOG.error(
+ "Event queue overflow for shard
{}; server delivered "
+ + "an unrequested event.
Failing subscription "
+ + "to recover.",
+ shardId);
+
+ if
(disposeIfActive(FanOutShardSubscriber.this)) {
+ setSubscriptionException(
+ new IOException(
+ "Event queue overflow
for shard "
+ + shardId
+ + "; server
delivered an "
+ + "unrequested
event."));
+ }
return;
}
- // Update the starting position in case we
have to recreate the
- // subscription
- startingPosition =
-
StartingPosition.continueFromSequenceNumber(
-
event.continuationSequenceNumber());
-
- // Replace the record just consumed in the
Queue
- requestRecords();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KinesisStreamsSourceException(
- "Interrupted while adding Kinesis
record to internal buffer.",
- e);
+ if (event.continuationSequenceNumber() ==
null) {
+ startingPosition = null;
+ } else {
+ startingPosition =
+
StartingPosition.continueFromSequenceNumber(
+
event.continuationSequenceNumber());
+ }
}
}
});
}
@Override
public void onError(Throwable throwable) {
- if (!subscriptionException.compareAndSet(null, throwable)) {
- LOG.warn(
- "Another subscription exception has been queued,
ignoring subsequent exceptions",
- throwable);
+ synchronized (lockObject) {
+ if (!disposeIfActive(this)) {
+ return;
+ }
}
+ setSubscriptionException(throwable);
}
@Override
public void onComplete() {
- LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- this.subscriptionState.set(SubscriptionState.COMPLETED);
+ synchronized (lockObject) {
+ LOG.info("Subscription complete - {} ({})", shardId,
consumerArn);
Review Comment:
Good catch. Applied this change
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -70,22 +70,38 @@ public class FanOutKinesisShardSubscription {
TimeoutException.class,
IOException.class,
LimitExceededException.class);
+ private static final ScheduledExecutorService TIMEOUT_SCHEDULER =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread t = new Thread(r,
"subscription-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
Review Comment:
Thanks for the suggestion. Moving it to the caller.
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java:
##########
Review Comment:
Thanks, added it to the tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]