[ 
https://issues.apache.org/jira/browse/FLINK-38096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Taranpreet Kaur updated FLINK-38096:
------------------------------------
    Attachment: AsyncSinkHangDemo.java
                DummyAsyncSink.java
                TokenBucketProvider.java
                TokenBucketRateLimitingStrategy.java

> AsyncSinkWriter may hang when configured with a custom rate limiting strategy
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-38096
>                 URL: https://issues.apache.org/jira/browse/FLINK-38096
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2
>            Reporter: Michał Misiewicz
>            Priority: Major
>         Attachments: AsyncSinkHangDemo.java, DummyAsyncSink.java, 
> TokenBucketProvider.java, TokenBucketRateLimitingStrategy.java
>
>
> [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
>  may hang when using a custom rate-limiting strategy that may block new 
> requests when no others are in progress. This issue occurs when implementing 
> rate limits, such as restricting API requests per interval.
> Given the following RateLimitingStrategy implementation based on 
> [bucket4j|https://github.com/bucket4j/bucket4j]:
> {code:java}
> package some.pipeline.ratelimit
> import com.typesafe.scalalogging.LazyLogging
> import io.github.bucket4j.Bucket
> import org.apache.flink.connector.base.sink.writer.strategy.{
>   RateLimitingStrategy,
>   RequestInfo,
>   ResultInfo
> }
> import java.io.Serializable
> class TokenBucketRateLimitingStrategy(
>     maxInFlightRequests: Int,
>     tokensPerSecond: Long,
>     tokensPerMinute: Long
> ) extends RateLimitingStrategy
>     with LazyLogging
>     with Serializable {
>   @transient
>   private var currentInFlightRequests = 0
>   @transient
>   private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
>     "TokenBucketRateLimitingStrategy",
>     tokensPerSecond,
>     tokensPerMinute
>   )
>   override def shouldBlock(requestInfo: RequestInfo): Boolean = {
>     currentInFlightRequests >= maxInFlightRequests || 
> areTokensNotAvailable(requestInfo)
>   }
>   private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
>     val batchSize = requestInfo.getBatchSize
>     if (batchSize <= 0) {
>       logger.debug(s"Received request with invalid batch size: $batchSize, 
> allowing to proceed")
>       return false
>     }
>     !bucket.estimateAbilityToConsume(batchSize).canBeConsumed
>   }
>     ...
> } {code}
>  
> AsyncSinkWriter may hang on 
> [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
>  when all tokens are already consumed and no requests are in flight:
> {code:java}
> private void flush() throws InterruptedException {
>     RequestInfo requestInfo = createRequestInfo();
>     while (rateLimitingStrategy.shouldBlock(requestInfo)) {
>         mailboxExecutor.yield();
>         requestInfo = createRequestInfo();
>     }
>     ...
> }{code}
>  
> Rate limiting activation causes Flink to hang during checkpointing, leading 
> to job failures from checkpoint timeouts.
> Solution:
> Ensure mailboxExecutor.yield() is only called when in-flight requests are 
> present. Replacing it with the private function 
> [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
>  provides a direct solution to this problem:
> {code:java}
> private void flush() throws InterruptedException {
>     RequestInfo requestInfo = createRequestInfo();
>     while (rateLimitingStrategy.shouldBlock(requestInfo)) {
>         yieldIfThereExistsInFlightRequests();
>         requestInfo = createRequestInfo();
>     }
>     ...
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to