[ https://issues.apache.org/jira/browse/FLINK-38096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michał Misiewicz updated FLINK-38096: ------------------------------------- Description: [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java] may hang when using a custom rate-limiting strategy that may block new requests when no others are in progress. This issue occurs when implementing rate limits, such as restricting API requests per interval. Given the following RateLimitingStrategy implementation based on [bucket4j|https://github.com/bucket4j/bucket4j]: {code:java} package some.pipeline.ratelimit import com.typesafe.scalalogging.LazyLogging import io.github.bucket4j.Bucket import org.apache.flink.connector.base.sink.writer.strategy.{ RateLimitingStrategy, RequestInfo, ResultInfo } import java.io.Serializable class TokenBucketRateLimitingStrategy( maxInFlightRequests: Int, tokensPerSecond: Long, tokensPerMinute: Long ) extends RateLimitingStrategy with LazyLogging with Serializable { @transient private var currentInFlightRequests = 0 @transient private lazy val bucket: Bucket = TokenBucketProvider.getInstance( "TokenBucketRateLimitingStrategy", tokensPerSecond, tokensPerMinute ) override def shouldBlock(requestInfo: RequestInfo): Boolean = { currentInFlightRequests >= maxInFlightRequests || areTokensNotAvailable(requestInfo) } private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = { val batchSize = requestInfo.getBatchSize if (batchSize <= 0) { logger.debug(s"Received request with invalid batch size: $batchSize, allowing to proceed") return false } !bucket.estimateAbilityToConsume(batchSize).canBeConsumed } ... } {code} AsyncSinkWriter may hang on [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356] when all tokens are already consumed and no requests are in flight: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { mailboxExecutor.yield(); requestInfo = createRequestInfo(); } ... }{code} Rate limiting activation causes Flink to hang during checkpointing, leading to job failures from checkpoint timeouts. Solution: Ensure mailboxExecutor.yield() is only called when in-flight requests are present. Replacing it with the private function [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443] provides a direct solution to this problem: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { yieldIfThereExistsInFlightRequests(); requestInfo = createRequestInfo(); } ... } {code} was: [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java] may hang when using a custom rate-limiting strategy that may block new requests when no others are in progress. This issue occurs when implementing rate limits, such as restricting API requests per interval. Given the following RateLimitingStrategy implementation based on [bucket4j|https://github.com/bucket4j/bucket4j]: {code:java} package some.package.ratelimit import com.typesafe.scalalogging.LazyLogging import io.github.bucket4j.Bucket import org.apache.flink.connector.base.sink.writer.strategy.{ RateLimitingStrategy, RequestInfo, ResultInfo } import java.io.Serializable class TokenBucketRateLimitingStrategy( maxInFlightRequests: Int, tokensPerSecond: Long, tokensPerMinute: Long ) extends RateLimitingStrategy with LazyLogging with Serializable { @transient private var currentInFlightRequests = 0 @transient private lazy val bucket: Bucket = TokenBucketProvider.getInstance( "TokenBucketRateLimitingStrategy", tokensPerSecond, tokensPerMinute ) override def shouldBlock(requestInfo: RequestInfo): Boolean = { currentInFlightRequests >= maxInFlightRequests || areTokensNotAvailable(requestInfo) } private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = { val batchSize = requestInfo.getBatchSize if (batchSize <= 0) { logger.debug(s"Received request with invalid batch size: $batchSize, allowing to proceed") return false } !bucket.estimateAbilityToConsume(batchSize).canBeConsumed } ... } {code} AsyncSinkWriter may hang on [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356] when all tokens are already consumed and no requests are in flight: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { mailboxExecutor.yield(); requestInfo = createRequestInfo(); } ... }{code} Rate limiting activation causes Flink to hang during checkpointing, leading to job failures from checkpoint timeouts. Solution: Ensure mailboxExecutor.yield() is only called when in-flight requests are present. Replacing it with the private function [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443] provides a direct solution to this problem: {code:java} private void flush() throws InterruptedException { RequestInfo requestInfo = createRequestInfo(); while (rateLimitingStrategy.shouldBlock(requestInfo)) { yieldIfThereExistsInFlightRequests(); requestInfo = createRequestInfo(); } ... } {code} > AsyncSinkWriter may hang when configured with a custom rate limiting strategy > ----------------------------------------------------------------------------- > > Key: FLINK-38096 > URL: https://issues.apache.org/jira/browse/FLINK-38096 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2 > Reporter: Michał Misiewicz > Priority: Major > > [AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java] > may hang when using a custom rate-limiting strategy that may block new > requests when no others are in progress. This issue occurs when implementing > rate limits, such as restricting API requests per interval. > Given the following RateLimitingStrategy implementation based on > [bucket4j|https://github.com/bucket4j/bucket4j]: > {code:java} > package some.pipeline.ratelimit > import com.typesafe.scalalogging.LazyLogging > import io.github.bucket4j.Bucket > import org.apache.flink.connector.base.sink.writer.strategy.{ > RateLimitingStrategy, > RequestInfo, > ResultInfo > } > import java.io.Serializable > class TokenBucketRateLimitingStrategy( > maxInFlightRequests: Int, > tokensPerSecond: Long, > tokensPerMinute: Long > ) extends RateLimitingStrategy > with LazyLogging > with Serializable { > @transient > private var currentInFlightRequests = 0 > @transient > private lazy val bucket: Bucket = TokenBucketProvider.getInstance( > "TokenBucketRateLimitingStrategy", > tokensPerSecond, > tokensPerMinute > ) > override def shouldBlock(requestInfo: RequestInfo): Boolean = { > currentInFlightRequests >= maxInFlightRequests || > areTokensNotAvailable(requestInfo) > } > private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = { > val batchSize = requestInfo.getBatchSize > if (batchSize <= 0) { > logger.debug(s"Received request with invalid batch size: $batchSize, > allowing to proceed") > return false > } > !bucket.estimateAbilityToConsume(batchSize).canBeConsumed > } > ... > } {code} > > AsyncSinkWriter may hang on > [mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356] > when all tokens are already consumed and no requests are in flight: > {code:java} > private void flush() throws InterruptedException { > RequestInfo requestInfo = createRequestInfo(); > while (rateLimitingStrategy.shouldBlock(requestInfo)) { > mailboxExecutor.yield(); > requestInfo = createRequestInfo(); > } > ... > }{code} > > Rate limiting activation causes Flink to hang during checkpointing, leading > to job failures from checkpoint timeouts. > Solution: > Ensure mailboxExecutor.yield() is only called when in-flight requests are > present. Replacing it with the private function > [yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443] > provides a direct solution to this problem: > {code:java} > private void flush() throws InterruptedException { > RequestInfo requestInfo = createRequestInfo(); > while (rateLimitingStrategy.shouldBlock(requestInfo)) { > yieldIfThereExistsInFlightRequests(); > requestInfo = createRequestInfo(); > } > ... > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)