[FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka 
Threads


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd324ea7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd324ea7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd324ea7

Branch: refs/heads/master
Commit: fd324ea72979cc3d4202ffa3ea174ec4cc9d153b
Parents: 50bd65a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 10 14:51:10 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 22:15:32 2016 +0100

----------------------------------------------------------------------
 .../kafka/internals/ClosableBlockingQueue.java  | 502 +++++++++++++++
 .../internals/ClosableBlockingQueueTest.java    | 603 +++++++++++++++++++
 2 files changed, 1105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 0000000..856c2ad
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * <ol>
+ *     <li>The queue can be closed atomically when empty. Adding elements 
after the queue
+ *         is closed fails. This allows queue consumers to atomically discover 
that no elements
+ *         are available and mark themselves as shut down.</li>
+ *     <li>The queue allows to poll batches of elements in one polling 
call.</li>
+ * </ol>
+ * 
+ * The queue has no capacity restriction and is safe for multiple producers 
and consumers.
+ * 
+ * <p>Note: Null elements are prohibited.
+ * 
+ * @param <E> The type of elements in the queue.
+ */
+public class ClosableBlockingQueue<E> {
+
+       /** The lock used to make queue accesses and open checks atomic */
+       private final ReentrantLock lock;
+       
+       /** The condition on which blocking get-calls wait if the queue is 
empty */
+       private final Condition nonEmpty;
+       
+       /** The deque of elements */
+       private final ArrayDeque<E> elements;
+       
+       /** Flag marking the status of the queue */
+       private volatile boolean open;
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new empty queue.
+        */
+       public ClosableBlockingQueue() {
+               this(10);
+       }
+
+       /**
+        * Creates a new empty queue, reserving space for at least the 
specified number
+        * of elements. The queu can still grow, of more elements are added 
than the
+        * reserved space.
+        * 
+        * @param initialSize The number of elements to reserve space for.
+        */
+       public ClosableBlockingQueue(int initialSize) {
+               this.lock = new ReentrantLock(true);
+               this.nonEmpty = this.lock.newCondition();
+               
+               this.elements = new ArrayDeque<>(initialSize);
+               this.open = true;
+               
+               
+       }
+
+       /**
+        * Creates a new queue that contains the given elements.
+        * 
+        * @param initialElements The elements to initially add to the queue.
+        */
+       public ClosableBlockingQueue(Collection<? extends E> initialElements) {
+               this(initialElements.size());
+               this.elements.addAll(initialElements);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Size and status
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the number of elements currently in the queue.
+        * @return The number of elements currently in the queue.
+        */
+       public int size() {
+               return elements.size();
+       }
+
+       /**
+        * Checks whether the queue is empty (has no elements).
+        * @return True, if the queue is empty; false, if it is non-empty.
+        */
+       public boolean isEmpty() {
+               return size() == 0;
+       }
+
+       /**
+        * Checks whether the queue is currently open, meaning elements can be 
added and polled.
+        * @return True, if the queue is open; false, if it is closed.
+        */
+       public boolean isOpen() {
+               return open;
+       }
+       
+       /**
+        * Tries to close the queue. Closing the queue only succeeds when no 
elements are
+        * in the queue when this method is called. Checking whether the queue 
is empty, and
+        * marking the queue as closed is one atomic operation.
+        *
+        * @return True, if the queue is closed, false if the queue remains 
open.
+        */
+       public boolean close() {
+               lock.lock();
+               try {
+                       if (open) {
+                               if (elements.isEmpty()) {
+                                       open = false;
+                                       nonEmpty.signalAll();
+                                       return true;
+                               } else {
+                                       return false;
+                               }
+                       }
+                       else {
+                               // already closed
+                               return true;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Adding / Removing elements
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Tries to add an element to the queue, if the queue is still open. 
Checking whether the queue
+        * is open and adding the element is one atomic operation.
+        * 
+        * <p>Unlike the {@link #add(Object)} method, this method never throws 
an exception,
+        * but only indicates via the return code if the element was added or 
the
+        * queue was closed.
+        * 
+        * @param element The element to add.
+        * @return True, if the element was added, false if the queue was 
closes.
+        */
+       public boolean addIfOpen(E element) {
+               requireNonNull(element);
+               
+               lock.lock();
+               try {
+                       if (open) {
+                               elements.addLast(element);
+                               if (elements.size() == 1) {
+                                       nonEmpty.signalAll();
+                               }
+                       }
+                       return open;
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Adds the element to the queue, or fails with an exception, if the 
queue is closed.
+        * Checking whether the queue is open and adding the element is one 
atomic operation.
+        * 
+        * @param element The element to add.
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        */
+       public void add(E element) throws IllegalStateException {
+               requireNonNull(element);
+
+               lock.lock();
+               try {
+                       if (open) {
+                               elements.addLast(element);
+                               if (elements.size() == 1) {
+                                       nonEmpty.signalAll();
+                               }
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Returns the queue's next element without removing it, if the queue 
is non-empty.
+        * Otherwise, returns null. 
+        *
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and getting the next element is 
one atomic operation.
+        * 
+        * <p>This method never blocks.
+        * 
+        * @return The queue's next element, or null, if the queue is empty.
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        */
+       public E peek() {
+               lock.lock();
+               try {
+                       if (open) {
+                               if (elements.size() > 0) {
+                                       return elements.getFirst();
+                               } else {
+                                       return null;
+                               }
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Returns the queue's next element and removes it, the queue is 
non-empty.
+        * Otherwise, this method returns null. 
+        *
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the next element is 
one atomic operation.
+        *
+        * <p>This method never blocks.
+        *
+        * @return The queue's next element, or null, if the queue is empty.
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        */
+       public E poll() {
+               lock.lock();
+               try {
+                       if (open) {
+                               if (elements.size() > 0) {
+                                       return elements.removeFirst();
+                               } else {
+                                       return null;
+                               }
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Returns all of the queue's current elements in a list, if the queue 
is non-empty.
+        * Otherwise, this method returns null. 
+        *
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the elements is one 
atomic operation.
+        *
+        * <p>This method never blocks.
+        *
+        * @return All of the queue's elements, or null, if the queue is empty.
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        */
+       public List<E> pollBatch() {
+               lock.lock();
+               try {
+                       if (open) {
+                               if (elements.size() > 0) {
+                                       ArrayList<E> result = new 
ArrayList<>(elements);
+                                       elements.clear();
+                                       return result;
+                               } else {
+                                       return null;
+                               }
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Returns the next element in the queue. If the queue is empty, this 
method
+        * waits until at least one element is added.
+        * 
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the next element is 
one atomic operation.
+        * 
+        * @return The next element in the queue, never null.
+        * 
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
+        *                              element to be added.
+        */
+       public E getElementBlocking() throws InterruptedException {
+               lock.lock();
+               try {
+                       while (open && elements.isEmpty()) {
+                               nonEmpty.await();
+                       }
+                       
+                       if (open) {
+                               return elements.removeFirst();
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Returns the next element in the queue. If the queue is empty, this 
method
+        * waits at most a certain time until an element becomes available. If 
no element
+        * is available after that time, the method returns null.
+        * 
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the next element is 
one atomic operation.
+        * 
+        * @param timeoutMillis The number of milliseconds to block, at most.
+        * @return The next element in the queue, or null, if the timeout 
expires  before an element is available.
+        * 
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
+        *                              element to be added.
+        */
+       public E getElementBlocking(long timeoutMillis) throws 
InterruptedException {
+               if (timeoutMillis == 0L) {
+                       // wait forever case
+                       return getElementBlocking();
+               } else if (timeoutMillis < 0L) {
+                       throw new IllegalArgumentException("invalid timeout");
+               }
+               
+               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+               
+               lock.lock();
+               try {
+                       while (open && elements.isEmpty() && timeoutMillis > 0) 
{ 
+                               nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
+                               timeoutMillis = deadline - 
System.currentTimeMillis();
+                       }
+                       
+                       if (!open) {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+                       else if (elements.isEmpty()) {
+                               return null;
+                       } else {
+                               return elements.removeFirst();
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Gets all the elements found in the list, or blocks until at least 
one element
+        * was added. If the queue is empty when this method is called, it 
blocks until
+        * at least one element is added.
+        *
+        * <p>This method always returns a list with at least one element.
+        * 
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the next element is 
one atomic operation.
+        * 
+        * @return A list with all elements in the queue, always at least one 
element.
+        * 
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
+        *                              element to be added.
+        */
+       public List<E> getBatchBlocking() throws InterruptedException {
+               lock.lock();
+               try {
+                       while (open && elements.isEmpty()) {
+                               nonEmpty.await();
+                       }
+                       if (open) {
+                               ArrayList<E> result = new ArrayList<>(elements);
+                               elements.clear();
+                               return result;
+                       } else {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Gets all the elements found in the list, or blocks until at least 
one element
+        * was added. This method is similar as {@link #getBatchBlocking()}, 
but takes
+        * a number of milliseconds that the method will maximally wait before 
returning.
+        * 
+        * <p>This method never returns null, but an empty list, if the queue 
is empty when
+        * the method is called and the request times out before an element was 
added.
+        * 
+        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
+        * Checking whether the queue is open and removing the next element is 
one atomic operation.
+        * 
+        * @param timeoutMillis The number of milliseconds to wait, at most.
+        * @return A list with all elements in the queue, possible an empty 
list.
+        *
+        * @throws IllegalStateException Thrown, if the queue is closed.
+        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
+        *                              element to be added.
+        */
+       public List<E> getBatchBlocking(long timeoutMillis) throws 
InterruptedException {
+               if (timeoutMillis == 0L) {
+                       // wait forever case
+                       return getBatchBlocking();
+               } else if (timeoutMillis < 0L) {
+                       throw new IllegalArgumentException("invalid timeout");
+               }
+
+               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+
+               lock.lock();
+               try {
+                       while (open && elements.isEmpty() && timeoutMillis > 0) 
{
+                               nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
+                               timeoutMillis = deadline - 
System.currentTimeMillis();
+                       }
+
+                       if (!open) {
+                               throw new IllegalStateException("queue is 
closed");
+                       }
+                       else if (elements.isEmpty()) {
+                               return Collections.emptyList();
+                       }
+                       else {
+                               ArrayList<E> result = new ArrayList<>(elements);
+                               elements.clear();
+                               return result;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Standard Utilities
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               int hashCode = 17;
+               for (E element : elements) {
+                       hashCode = 31 * hashCode + element.hashCode();
+               }
+               return hashCode;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == 
ClosableBlockingQueue.class) {
+                       @SuppressWarnings("unchecked")
+                       ClosableBlockingQueue<E> that = 
(ClosableBlockingQueue<E>) obj;
+                       
+                       if (this.elements.size() == that.elements.size()) {
+                               Iterator<E> thisElements = 
this.elements.iterator();
+                               for (E thatNext : that.elements) {
+                                       E thisNext = thisElements.next();
+                                       if (!(thisNext == null ? thatNext == 
null : thisNext.equals(thatNext))) {
+                                               return false;
+                                       }
+                               }
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return elements.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
new file mode 100644
index 0000000..6298c92
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+public class ClosableBlockingQueueTest {
+
+       // 
------------------------------------------------------------------------
+       //  single-threaded unit tests
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testCreateQueueHashCodeEquals() {
+               try {
+                       ClosableBlockingQueue<String> queue1 = new 
ClosableBlockingQueue<>();
+                       ClosableBlockingQueue<String> queue2 = new 
ClosableBlockingQueue<>(22);
+
+                       assertTrue(queue1.isOpen());
+                       assertTrue(queue2.isOpen());
+                       assertTrue(queue1.isEmpty());
+                       assertTrue(queue2.isEmpty());
+                       assertEquals(0, queue1.size());
+                       assertEquals(0, queue2.size());
+                       
+                       assertTrue(queue1.hashCode() == queue2.hashCode());
+                       //noinspection EqualsWithItself
+                       assertTrue(queue1.equals(queue1));
+                       //noinspection EqualsWithItself
+                       assertTrue(queue2.equals(queue2));
+                       assertTrue(queue1.equals(queue2));
+                       
+                       assertNotNull(queue1.toString());
+                       assertNotNull(queue2.toString());
+
+                       List<String> elements = new ArrayList<>();
+                       elements.add("a");
+                       elements.add("b");
+                       elements.add("c");
+
+                       ClosableBlockingQueue<String> queue3 = new 
ClosableBlockingQueue<>(elements);
+                       ClosableBlockingQueue<String> queue4 = new 
ClosableBlockingQueue<>(asList("a", "b", "c"));
+
+                       assertTrue(queue3.isOpen());
+                       assertTrue(queue4.isOpen());
+                       assertFalse(queue3.isEmpty());
+                       assertFalse(queue4.isEmpty());
+                       assertEquals(3, queue3.size());
+                       assertEquals(3, queue4.size());
+
+                       assertTrue(queue3.hashCode() == queue4.hashCode());
+                       //noinspection EqualsWithItself
+                       assertTrue(queue3.equals(queue3));
+                       //noinspection EqualsWithItself
+                       assertTrue(queue4.equals(queue4));
+                       assertTrue(queue3.equals(queue4));
+                       
+                       assertNotNull(queue3.toString());
+                       assertNotNull(queue4.toString());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCloseEmptyQueue() {
+               try {
+                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
+                       assertTrue(queue.isOpen());
+                       assertTrue(queue.close());
+                       assertFalse(queue.isOpen());
+                       
+                       assertFalse(queue.addIfOpen("element"));
+                       assertTrue(queue.isEmpty());
+                       
+                       try {
+                               queue.add("some element");
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testCloseNonEmptyQueue() {
+               try {
+                       ClosableBlockingQueue<Integer> queue = new 
ClosableBlockingQueue<>(asList(1, 2, 3));
+                       assertTrue(queue.isOpen());
+                       
+                       assertFalse(queue.close());
+                       assertFalse(queue.close());
+                       
+                       queue.poll();
+
+                       assertFalse(queue.close());
+                       assertFalse(queue.close());
+                       
+                       queue.pollBatch();
+
+                       assertTrue(queue.close());
+                       assertFalse(queue.isOpen());
+
+                       assertFalse(queue.addIfOpen(42));
+                       assertTrue(queue.isEmpty());
+
+                       try {
+                               queue.add(99);
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPeekAndPoll() {
+               try {
+                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
+                       
+                       assertNull(queue.peek());
+                       assertNull(queue.peek());
+                       assertNull(queue.poll());
+                       assertNull(queue.poll());
+                       
+                       assertEquals(0, queue.size());
+                       
+                       queue.add("a");
+                       queue.add("b");
+                       queue.add("c");
+
+                       assertEquals(3, queue.size());
+                       
+                       assertEquals("a", queue.peek());
+                       assertEquals("a", queue.peek());
+                       assertEquals("a", queue.peek());
+
+                       assertEquals(3, queue.size());
+                       
+                       assertEquals("a", queue.poll());
+                       assertEquals("b", queue.poll());
+
+                       assertEquals(1, queue.size());
+                       
+                       assertEquals("c", queue.peek());
+                       assertEquals("c", queue.peek());
+
+                       assertEquals("c", queue.poll());
+
+                       assertEquals(0, queue.size());
+                       assertNull(queue.poll());
+                       assertNull(queue.peek());
+                       assertNull(queue.peek());
+                       
+                       assertTrue(queue.close());
+                       
+                       try {
+                               queue.peek();
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+
+                       try {
+                               queue.poll();
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPollBatch() {
+               try {
+                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
+
+                       assertNull(queue.pollBatch());
+                       
+                       queue.add("a");
+                       queue.add("b");
+                       
+                       assertEquals(asList("a", "b"), queue.pollBatch());
+                       assertNull(queue.pollBatch());
+                       
+                       queue.add("c");
+
+                       assertEquals(singletonList("c"), queue.pollBatch());
+                       assertNull(queue.pollBatch());
+
+                       assertTrue(queue.close());
+
+                       try {
+                               queue.pollBatch();
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testGetElementBlocking() {
+               try {
+                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
+
+                       assertNull(queue.getElementBlocking(1));
+                       assertNull(queue.getElementBlocking(3));
+                       assertNull(queue.getElementBlocking(2));
+
+                       assertEquals(0, queue.size());
+
+                       queue.add("a");
+                       queue.add("b");
+                       queue.add("c");
+                       queue.add("d");
+                       queue.add("e");
+                       queue.add("f");
+
+                       assertEquals(6, queue.size());
+
+                       assertEquals("a", queue.getElementBlocking(99));
+                       assertEquals("b", queue.getElementBlocking());
+
+                       assertEquals(4, queue.size());
+
+                       assertEquals("c", queue.getElementBlocking(0));
+                       assertEquals("d", queue.getElementBlocking(1000000));
+                       assertEquals("e", queue.getElementBlocking());
+                       assertEquals("f", queue.getElementBlocking(1786598));
+
+                       assertEquals(0, queue.size());
+
+                       assertNull(queue.getElementBlocking(1));
+                       assertNull(queue.getElementBlocking(3));
+                       assertNull(queue.getElementBlocking(2));
+
+                       assertTrue(queue.close());
+
+                       try {
+                               queue.getElementBlocking();
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+
+                       try {
+                               queue.getElementBlocking(1000000000L);
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testGetBatchBlocking() {
+               try {
+                       ClosableBlockingQueue<String> queue = new 
ClosableBlockingQueue<>();
+
+                       assertEquals(emptyList(), queue.getBatchBlocking(1));
+                       assertEquals(emptyList(), queue.getBatchBlocking(3));
+                       assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+                       queue.add("a");
+                       queue.add("b");
+
+                       assertEquals(asList("a", "b"), 
queue.getBatchBlocking(900000009));
+
+                       queue.add("c");
+                       queue.add("d");
+
+                       assertEquals(asList("c", "d"), 
queue.getBatchBlocking());
+
+                       assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+                       queue.add("e");
+
+                       assertEquals(singletonList("e"), 
queue.getBatchBlocking(0));
+
+                       queue.add("f");
+
+                       assertEquals(singletonList("f"), 
queue.getBatchBlocking(1000000000));
+
+                       assertEquals(0, queue.size());
+
+                       assertEquals(emptyList(), queue.getBatchBlocking(1));
+                       assertEquals(emptyList(), queue.getBatchBlocking(3));
+                       assertEquals(emptyList(), queue.getBatchBlocking(2));
+
+                       assertTrue(queue.close());
+
+                       try {
+                               queue.getBatchBlocking();
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+
+                       try {
+                               queue.getBatchBlocking(1000000000L);
+                               fail("should cause an exception");
+                       } catch (IllegalStateException ignored) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  multi-threaded tests
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void notifyOnClose() {
+               try {
+                       final long oneYear = 365L * 24 * 60 * 60 * 1000;
+                       
+                       // test "getBatchBlocking()"
+                       final ClosableBlockingQueue<String> queue1 = new 
ClosableBlockingQueue<>();
+                       QueueCall call1 = new QueueCall() {
+                               @Override
+                               public void call() throws Exception {
+                                       queue1.getBatchBlocking();
+                               }
+                       };
+                       testCallExitsOnClose(call1, queue1);
+
+                       // test "getBatchBlocking()"
+                       final ClosableBlockingQueue<String> queue2 = new 
ClosableBlockingQueue<>();
+                       QueueCall call2 = new QueueCall() {
+                               @Override
+                               public void call() throws Exception {
+                                       queue2.getBatchBlocking(oneYear);
+                               }
+                       };
+                       testCallExitsOnClose(call2, queue2);
+
+                       // test "getBatchBlocking()"
+                       final ClosableBlockingQueue<String> queue3 = new 
ClosableBlockingQueue<>();
+                       QueueCall call3 = new QueueCall() {
+                               @Override
+                               public void call() throws Exception {
+                                       queue3.getElementBlocking();
+                               }
+                       };
+                       testCallExitsOnClose(call3, queue3);
+
+                       // test "getBatchBlocking()"
+                       final ClosableBlockingQueue<String> queue4 = new 
ClosableBlockingQueue<>();
+                       QueueCall call4 = new QueueCall() {
+                               @Override
+                               public void call() throws Exception {
+                                       queue4.getElementBlocking(oneYear);
+                               }
+                       };
+                       testCallExitsOnClose(call4, queue4);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+       @Test
+       public void testMultiThreadedAddGet() {
+               try {
+                       final ClosableBlockingQueue<Integer> queue = new 
ClosableBlockingQueue<>();
+                       final AtomicReference<Throwable> pushErrorRef = new 
AtomicReference<>();
+                       final AtomicReference<Throwable> pollErrorRef = new 
AtomicReference<>();
+                       
+                       final int numElements = 2000;
+                       
+                       Thread pusher = new Thread("pusher") {
+
+                               @Override
+                               public void run() {
+                                       try {
+                                               final Random rnd = new Random();
+                                               for (int i = 0; i < 
numElements; i++) {
+                                                       queue.add(i);
+                                                       
+                                                       // sleep a bit, 
sometimes
+                                                       int sleepTime = 
rnd.nextInt(3);
+                                                       if (sleepTime > 1) {
+                                                               
Thread.sleep(sleepTime);
+                                                       }
+                                               }
+                                               
+                                               while (true) {
+                                                       if (queue.close()) {
+                                                               break;
+                                                       } else {
+                                                               Thread.sleep(5);
+                                                       }
+                                               }
+                                       } catch (Throwable t) {
+                                               pushErrorRef.set(t);
+                                       }
+                               }
+                       };
+                       pusher.start();
+
+                       Thread poller = new Thread("poller") {
+
+                               @SuppressWarnings("InfiniteLoopStatement")
+                               @Override
+                               public void run() {
+                                       try {
+                                               int count = 0;
+                                               
+                                               try {
+                                                       final Random rnd = new 
Random();
+                                                       int nextExpected = 0;
+                                                       
+                                                       while (true) {
+                                                               int getMethod = 
count % 7;
+                                                               switch 
(getMethod) {
+                                                                       case 0: 
{
+                                                                               
Integer next = queue.getElementBlocking(1);
+                                                                               
if (next != null) {
+                                                                               
        assertEquals(nextExpected, next.intValue());
+                                                                               
        nextExpected++;
+                                                                               
        count++;
+                                                                               
}
+                                                                               
break;
+                                                                       }
+                                                                       case 1: 
{
+                                                                               
List<Integer> nextList = queue.getBatchBlocking();
+                                                                               
for (Integer next : nextList) {
+                                                                               
        assertNotNull(next);
+                                                                               
        assertEquals(nextExpected, next.intValue());
+                                                                               
        nextExpected++;
+                                                                               
        count++;
+                                                                               
}
+                                                                               
break;
+                                                                       }
+                                                                       case 2: 
{
+                                                                               
List<Integer> nextList = queue.getBatchBlocking(1);
+                                                                               
if (nextList != null) {
+                                                                               
        for (Integer next : nextList) {
+                                                                               
                assertNotNull(next);
+                                                                               
                assertEquals(nextExpected, next.intValue());
+                                                                               
                nextExpected++;
+                                                                               
                count++;
+                                                                               
        }
+                                                                               
}
+                                                                               
break;
+                                                                       }
+                                                                       case 3: 
{
+                                                                               
Integer next = queue.poll();
+                                                                               
if (next != null) {
+                                                                               
        assertEquals(nextExpected, next.intValue());
+                                                                               
        nextExpected++;
+                                                                               
        count++;
+                                                                               
}
+                                                                               
break;
+                                                                       }
+                                                                       case 4: 
{
+                                                                               
List<Integer> nextList = queue.pollBatch();
+                                                                               
if (nextList != null) {
+                                                                               
        for (Integer next : nextList) {
+                                                                               
                assertNotNull(next);
+                                                                               
                assertEquals(nextExpected, next.intValue());
+                                                                               
                nextExpected++;
+                                                                               
                count++;
+                                                                               
        }
+                                                                               
}
+                                                                               
break;
+                                                                       }
+                                                                       
default: {
+                                                                               
Integer next = queue.getElementBlocking();
+                                                                               
assertNotNull(next);
+                                                                               
assertEquals(nextExpected, next.intValue());
+                                                                               
nextExpected++;
+                                                                               
count++;
+                                                                       }
+                                                               }
+                                                               
+                                                               // sleep a bit, 
sometimes
+                                                               int sleepTime = 
rnd.nextInt(3);
+                                                               if (sleepTime > 
1) {
+                                                                       
Thread.sleep(sleepTime);
+                                                               }
+                                                       }
+                                               } catch (IllegalStateException 
e) {
+                                                       // we get this once the 
queue is closed
+                                                       
assertEquals(numElements, count);
+                                               }
+                                       } catch (Throwable t) {
+                                               pollErrorRef.set(t);
+                                       }
+                               }
+                       };
+                       poller.start();
+                       
+                       pusher.join();
+                       poller.join();
+                       
+                       if (pushErrorRef.get() != null) {
+                               Throwable t = pushErrorRef.get();
+                               t.printStackTrace();
+                               fail("Error in pusher: " + t.getMessage());
+                       }
+                       if (pollErrorRef.get() != null) {
+                               Throwable t = pollErrorRef.get();
+                               t.printStackTrace();
+                               fail("Error in poller: " + t.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utils
+       // 
------------------------------------------------------------------------
+       
+       private static void testCallExitsOnClose(
+                       final QueueCall call, ClosableBlockingQueue<String> 
queue) throws Exception {
+               
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+               
+               Runnable runnable = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       call.call();
+                               } catch (Throwable t) {
+                                       errorRef.set(t);
+                               }
+                       }
+               };
+
+               Thread thread = new Thread(runnable);
+               thread.start();
+               Thread.sleep(100);
+               queue.close();
+               thread.join();
+
+               @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+               Throwable cause = errorRef.get();
+               assertTrue(cause instanceof IllegalStateException);
+       }
+       
+       private interface QueueCall {
+               void call() throws Exception;
+       }
+}

Reply via email to