This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2af65df0127db5890cfc3d3f382e2ba7eca44771 Author: Stephan Ewen <se...@apache.org> AuthorDate: Thu Sep 9 16:14:54 2021 +0200 [hotfix][connectors] Deduplicate config default in FutureCompletingBlockingQueue. --- .../reader/synchronization/FutureCompletingBlockingQueue.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index 9977bcd..a341c52 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader.synchronization; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.concurrent.GuardedBy; @@ -80,9 +81,6 @@ public class FutureCompletingBlockingQueue<T> { */ public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture(); - /** The default capacity for the queue. */ - private static final int DEFAULT_CAPACITY = 2; - // ------------------------------------------------------------------------ /** The maximum capacity of the queue. */ @@ -109,7 +107,7 @@ public class FutureCompletingBlockingQueue<T> { private ConditionAndFlag[] putConditionAndFlags; public FutureCompletingBlockingQueue() { - this(DEFAULT_CAPACITY); + this(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue()); } public FutureCompletingBlockingQueue(int capacity) {