This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2af65df0127db5890cfc3d3f382e2ba7eca44771
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 9 16:14:54 2021 +0200

    [hotfix][connectors] Deduplicate config default in 
FutureCompletingBlockingQueue.
---
 .../reader/synchronization/FutureCompletingBlockingQueue.java       | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index 9977bcd..a341c52 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.connector.base.source.reader.synchronization;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.concurrent.GuardedBy;
@@ -80,9 +81,6 @@ public class FutureCompletingBlockingQueue<T> {
      */
     public static final CompletableFuture<Void> AVAILABLE = 
getAvailableFuture();
 
-    /** The default capacity for the queue. */
-    private static final int DEFAULT_CAPACITY = 2;
-
     // ------------------------------------------------------------------------
 
     /** The maximum capacity of the queue. */
@@ -109,7 +107,7 @@ public class FutureCompletingBlockingQueue<T> {
     private ConditionAndFlag[] putConditionAndFlags;
 
     public FutureCompletingBlockingQueue() {
-        this(DEFAULT_CAPACITY);
+        this(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue());
     }
 
     public FutureCompletingBlockingQueue(int capacity) {

Reply via email to