This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1379455457d5c18a5db39324ae71c5df1c134013 Author: fangliang <[email protected]> AuthorDate: Mon Jul 6 15:21:22 2020 +0800 [FLINK-17761][connector/common] Add a constructor taking capacity as a parameter for FutureCompletingBlockingQueue This closes #12566 --- .../FutureCompletingBlockingQueue.java | 10 +++++ .../FutureCompletingBlockingQueueTest.java | 44 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index 6a6dfac..de51af1 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -36,9 +36,19 @@ import java.util.concurrent.TimeUnit; * @param <T> the type of the elements in the queue. */ public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> { + private final FutureNotifier futureNotifier; + /** + * The default capacity for {@link LinkedBlockingQueue}. + */ + private static final Integer DEFAULT_CAPACITY = 10000; public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) { + this(futureNotifier, DEFAULT_CAPACITY); + } + + public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) { + super(capacity); this.futureNotifier = futureNotifier; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java new file mode 100644 index 0000000..ad74f2a --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.synchronization; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * The unit test for {@link FutureCompletingBlockingQueue}. + */ +public class FutureCompletingBlockingQueueTest { + + + private static final Integer DEFAULT_CAPACITY = 10000; + private static final Integer SPECIFIED_CAPACITY = 20000; + + @Test + public void testFutureCompletingBlockingQueueConstructor() { + FutureNotifier notifier = new FutureNotifier(); + FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier); + FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY); + // The capacity of the queue needs to be equal to 10000 + assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY); + // The capacity of the queue needs to be equal to SPECIFIED_CAPACITY + assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY); + } +}
