This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1379455457d5c18a5db39324ae71c5df1c134013
Author: fangliang <[email protected]>
AuthorDate: Mon Jul 6 15:21:22 2020 +0800

    [FLINK-17761][connector/common] Add a constructor taking capacity as a 
parameter for FutureCompletingBlockingQueue
    
    This closes #12566
---
 .../FutureCompletingBlockingQueue.java             | 10 +++++
 .../FutureCompletingBlockingQueueTest.java         | 44 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index 6a6dfac..de51af1 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -36,9 +36,19 @@ import java.util.concurrent.TimeUnit;
  * @param <T> the type of the elements in the queue.
  */
 public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
+
        private final FutureNotifier futureNotifier;
+       /**
+        * The default capacity for {@link LinkedBlockingQueue}.
+        */
+       private static final Integer DEFAULT_CAPACITY = 10000;
 
        public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
+               this(futureNotifier, DEFAULT_CAPACITY);
+       }
+
+       public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int 
capacity) {
+               super(capacity);
                this.futureNotifier = futureNotifier;
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
new file mode 100644
index 0000000..ad74f2a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.synchronization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The unit test for {@link FutureCompletingBlockingQueue}.
+ */
+public class FutureCompletingBlockingQueueTest {
+
+
+       private static final Integer DEFAULT_CAPACITY = 10000;
+       private static final Integer SPECIFIED_CAPACITY = 20000;
+
+       @Test
+       public void testFutureCompletingBlockingQueueConstructor() {
+               FutureNotifier notifier = new FutureNotifier();
+               FutureCompletingBlockingQueue<Object> 
defaultCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(notifier);
+               FutureCompletingBlockingQueue<Object> 
specifiedCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+               // The capacity of the queue needs to be equal to 10000
+               
assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), 
(int) DEFAULT_CAPACITY);
+               // The capacity of the queue needs to be equal to 
SPECIFIED_CAPACITY
+               
assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(),
 (int) SPECIFIED_CAPACITY);
+       }
+}

Reply via email to