[
https://issues.apache.org/jira/browse/FLINK-38096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18057734#comment-18057734
]
Taranpreet Kaur commented on FLINK-38096:
-----------------------------------------
Hi [~michalmisiewicz] ,
Can you please provide the complete code example to replicate the issue as I
wanted to work on this one. Otherwise, I have created one example. Please
Verify, if it is a valid scenario.
I have attached the code example.[^AsyncSinkHangDemo.java]
> AsyncSinkWriter may hang when configured with a custom rate limiting strategy
> -----------------------------------------------------------------------------
>
> Key: FLINK-38096
> URL: https://issues.apache.org/jira/browse/FLINK-38096
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2
> Reporter: Michał Misiewicz
> Priority: Major
> Attachments: AsyncSinkHangDemo.java, DummyAsyncSink.java,
> TokenBucketProvider.java, TokenBucketRateLimitingStrategy.java
>
>
> [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
> may hang when using a custom rate-limiting strategy that may block new
> requests when no others are in progress. This issue occurs when implementing
> rate limits, such as restricting API requests per interval.
> Given the following RateLimitingStrategy implementation based on
> [bucket4j|https://github.com/bucket4j/bucket4j]:
> {code:java}
> package some.pipeline.ratelimit
> import com.typesafe.scalalogging.LazyLogging
> import io.github.bucket4j.Bucket
> import org.apache.flink.connector.base.sink.writer.strategy.{
> RateLimitingStrategy,
> RequestInfo,
> ResultInfo
> }
> import java.io.Serializable
> class TokenBucketRateLimitingStrategy(
> maxInFlightRequests: Int,
> tokensPerSecond: Long,
> tokensPerMinute: Long
> ) extends RateLimitingStrategy
> with LazyLogging
> with Serializable {
> @transient
> private var currentInFlightRequests = 0
> @transient
> private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
> "TokenBucketRateLimitingStrategy",
> tokensPerSecond,
> tokensPerMinute
> )
> override def shouldBlock(requestInfo: RequestInfo): Boolean = {
> currentInFlightRequests >= maxInFlightRequests ||
> areTokensNotAvailable(requestInfo)
> }
> private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
> val batchSize = requestInfo.getBatchSize
> if (batchSize <= 0) {
> logger.debug(s"Received request with invalid batch size: $batchSize,
> allowing to proceed")
> return false
> }
> !bucket.estimateAbilityToConsume(batchSize).canBeConsumed
> }
> ...
> } {code}
>
> AsyncSinkWriter may hang on
> [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
> when all tokens are already consumed and no requests are in flight:
> {code:java}
> private void flush() throws InterruptedException {
> RequestInfo requestInfo = createRequestInfo();
> while (rateLimitingStrategy.shouldBlock(requestInfo)) {
> mailboxExecutor.yield();
> requestInfo = createRequestInfo();
> }
> ...
> }{code}
>
> Rate limiting activation causes Flink to hang during checkpointing, leading
> to job failures from checkpoint timeouts.
> Solution:
> Ensure mailboxExecutor.yield() is only called when in-flight requests are
> present. Replacing it with the private function
> [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
> provides a direct solution to this problem:
> {code:java}
> private void flush() throws InterruptedException {
> RequestInfo requestInfo = createRequestInfo();
> while (rateLimitingStrategy.shouldBlock(requestInfo)) {
> yieldIfThereExistsInFlightRequests();
> requestInfo = createRequestInfo();
> }
> ...
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)