chenylee-aws opened a new pull request, #247:
URL: https://github.com/apache/flink-connector-aws/pull/247

   …ng behavior and concurrent activation race in FanOutKinesisShardSubscription
   
   ## Purpose of the change
   
   Fix FLINK-39660 (https://issues.apache.org/jira/browse/FLINK-39660): a 
cascading failure in the Kinesis EFO subscription lifecycle. Under transient 
disruptions (downstream slow consumer, GC pause, brief network error) the 
connector enters a self-sustaining retry storm that stalls consumption across 
many shards, leaks HTTP/2 streams, and saturates the JVM common ForkJoinPool. 
See the JIRA for observed symptoms and an illustrative stack trace.
   
     This PR addresses the root causes in FanOutKinesisShardSubscription:
     
     - Netty event loop no longer blocks on record processing. Switched from 
blocking eventQueue.put() to non-blocking offer(), with a prefetch-based pull 
model (priming
     PREFETCH - queueSize requests in onSubscribe, issuing request(1) after 
each consumer drain). The invariant queue.size + outstandingRequests == 
PREFETCH prevents overflow
     by construction.
     - Concurrent activations on the same shard are prevented. Replaced the 
subscriptionActive-based guard (which only blocked after onSubscribe) with an 
identity-based guard
     on shardSubscriber. Activations during the pending window are now 
correctly rejected.
     - Identity-based disposal. All four error paths (responseHandler.onError, 
.exceptionally, timeout callback, Subscriber.onError) funnel through
     disposeIfActive(subscriber); only the winning path cleans up. Losing paths 
no-op instead of corrupting state.
     - cancel() now runs unconditionally. Removed the subscriptionActive gate 
that made cancel a no-op during the pending window; failed subscriptions now 
promptly release
     their HTTP/2 stream slot.
     - Timeout watcher moved off the common ForkJoinPool onto a dedicated 
single-thread ScheduledExecutorService; common pool is no longer saturated by 
latch watchers.
     - Timeout/onSubscribe race closed. Added a timeoutFuture == null guard 
inside the timeout callback so a late-firing timeout cannot tear down a 
subscription that has just
     successfully established.
     - Stale onNext deliveries dropped via identity check. If the AWS SDK 
delivers an event on a disposed subscriber, it is silently ignored (the server 
will redeliver it on
     the replacement subscription).
     - Added close() invoked from FanOutKinesisShardSplitReader.close() for 
clean shutdown; pending subscriptions are disposed and further activations are 
rejected.
     - KinesisStreamsSource now passes user-configured properties to the Netty 
HTTP client instead of an empty AttributeMap, so http-client.max-concurrency 
and related
     settings actually apply.
     
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     
     - Added 22 unit tests in FanOutKinesisShardSubscriptionTest covering:
       - Happy-path activation, EFO 5-minute rotation, shard-end handling
       - Concurrent activation prevention (the main correctness bug)
       - Recoverable / unrecoverable / ResourceNotFoundException error paths
       - Dual error-path deduplication via identity check
       - Subscription timeout cleanup and retry
       - Stale onSubscribe / stale onNext handling after disposal
       - close() behavior and idempotency
       - Pull-based backpressure (initial request equals PREFETCH, 
request-per-drain, no spurious requests on empty queue)
       - Reactivation invariant (priming equals PREFETCH - queueSize, not 
always PREFETCH)
       - End-to-end pipeline depth check (queue.size + outstanding == PREFETCH 
throughout a full drain cycle)
     
   
   
   This change added tests and can be verified as follows:
     - Tests use a scripted AsyncStreamProxy fake that lets each test 
deterministically drive onSubscribe, onNext, onComplete, handler.onError, and
     future.completeExceptionally signals. Avoids Thread.sleep in favour of 
Awaitility polling.
     - Load-tested against a 200-shard stream at parallelism 18 for ~90 minutes 
before and after the change. With the fix: no concurrent activations observed, 
no zombie
     subscriptions, no ForkJoinPool common-pool saturation in thread dumps, no 
Netty event-loop blocking on put.
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to