Repository: bookkeeper Updated Branches: refs/heads/master eeaddeda6 -> e2894a9e9
BOOKKEEPER-1066: Introduce GrowableArrayBlockingQueue In multiple places, (eg: journal, ordered executor, etc..), we are using `LinkedBlockingQueue` instances to pass objects between threads. The `LinkedBlockingQueue` differs from the `ArrayBlockingQueue` in that it doesn't require to define a max queue size, though, being implemented with a linked list, it requires to allocates list nodes each time an item is added. We can use a `GrowableArrayBlockingQueue` that behaves in the same way as the `LinkedBlockingQueue`, but it's implemented with an array that can be resized when the queue reaches the capacity. Author: Matteo Merli <[email protected]> Reviewers: Enrico Olivelli <[email protected]> Closes #153 from merlimat/growable-blocking-queue Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e2894a9e Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e2894a9e Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e2894a9e Branch: refs/heads/master Commit: e2894a9e9946b4cc4647c280094bcb2eb1a521ea Parents: eeadded Author: Matteo Merli <[email protected]> Authored: Fri May 12 15:17:36 2017 +0200 Committer: eolivelli <[email protected]> Committed: Fri May 12 15:17:36 2017 +0200 ---------------------------------------------------------------------- .../org/apache/bookkeeper/util/MathUtils.java | 4 + .../collections/GrowableArrayBlockingQueue.java | 359 +++++++++++++++++++ .../GrowableArrayBlockingQueueTest.java | 206 +++++++++++ 3 files changed, 569 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java index 6aa9073..1b3044d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java @@ -35,6 +35,10 @@ public class MathUtils { } + public static int findNextPositivePowerOfTwo(final int value) { + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } + /** * Current time from some arbitrary time base in the past, counting in * milliseconds, and not affected by settimeofday or similar system clock http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java new file mode 100644 index 0000000..8f7dae7 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java @@ -0,0 +1,359 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.util.collections; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.bookkeeper.util.MathUtils; + + +/** + * This implements a {@link BlockingQueue} backed by an array with no fixed capacity. + * + * When the capacity is reached, data will be moved to a bigger array. + * + */ +public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements BlockingQueue<T> { + + private final ReentrantLock headLock = new ReentrantLock(); + private final PaddedInt headIndex = new PaddedInt(); + private final PaddedInt tailIndex = new PaddedInt(); + private final ReentrantLock tailLock = new ReentrantLock(); + private final Condition isNotEmpty = headLock.newCondition(); + + private T[] data; + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater<GrowableArrayBlockingQueue> SIZE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(GrowableArrayBlockingQueue.class, "size"); + @SuppressWarnings("unused") + private volatile int size = 0; + + public GrowableArrayBlockingQueue() { + this(64); + } + + @SuppressWarnings("unchecked") + public GrowableArrayBlockingQueue(int initialCapacity) { + headIndex.value = 0; + tailIndex.value = 0; + + int capacity = MathUtils.findNextPositivePowerOfTwo(initialCapacity); + data = (T[]) new Object[capacity]; + } + + @Override + public T remove() { + T item = poll(); + if (item == null) { + throw new NoSuchElementException(); + } + + return item; + } + + @Override + public T poll() { + headLock.lock(); + try { + if (SIZE_UPDATER.get(this) > 0) { + T item = data[headIndex.value]; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + SIZE_UPDATER.decrementAndGet(this); + return item; + } else { + return null; + } + } finally { + headLock.unlock(); + } + } + + @Override + public T element() { + T item = peek(); + if (item == null) { + throw new NoSuchElementException(); + } + + return item; + } + + @Override + public T peek() { + headLock.lock(); + try { + if (SIZE_UPDATER.get(this) > 0) { + return data[headIndex.value]; + } else { + return null; + } + } finally { + headLock.unlock(); + } + } + + @Override + public boolean offer(T e) { + // Queue is unbounded and it will never reject new items + put(e); + return true; + } + + @Override + public void put(T e) { + tailLock.lock(); + + boolean wasEmpty = false; + + try { + if (SIZE_UPDATER.get(this) == data.length) { + expandArray(); + } + + data[tailIndex.value] = e; + tailIndex.value = (tailIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.getAndIncrement(this) == 0) { + wasEmpty = true; + } + } finally { + tailLock.unlock(); + } + + if (wasEmpty) { + headLock.lock(); + try { + isNotEmpty.signal(); + } finally { + headLock.unlock(); + } + } + } + + @Override + public boolean add(T e) { + put(e); + return true; + } + + @Override + public boolean offer(T e, long timeout, TimeUnit unit) { + // Queue is unbounded and it will never reject new items + put(e); + return true; + } + + @Override + public T take() throws InterruptedException { + headLock.lockInterruptibly(); + + try { + while (SIZE_UPDATER.get(this) == 0) { + isNotEmpty.await(); + } + + T item = data[headIndex.value]; + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.decrementAndGet(this) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + return item; + } finally { + headLock.unlock(); + } + } + + @Override + public T poll(long timeout, TimeUnit unit) throws InterruptedException { + headLock.lockInterruptibly(); + + try { + long timeoutNanos = unit.toNanos(timeout); + while (SIZE_UPDATER.get(this) == 0) { + if (timeoutNanos <= 0) { + return null; + } + + timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos); + } + + T item = data[headIndex.value]; + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.decrementAndGet(this) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + return item; + } finally { + headLock.unlock(); + } + } + + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + @Override + public int drainTo(Collection<? super T> c) { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection<? super T> c, int maxElements) { + headLock.lock(); + + try { + int drainedItems = 0; + int size = SIZE_UPDATER.get(this); + + while (size > 0 && drainedItems < maxElements) { + T item = data[headIndex.value]; + data[headIndex.value] = null; + c.add(item); + + headIndex.value = (headIndex.value + 1) & (data.length - 1); + --size; + ++drainedItems; + } + + if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + + return drainedItems; + } finally { + headLock.unlock(); + } + } + + @Override + public void clear() { + headLock.lock(); + + try { + int size = SIZE_UPDATER.get(this); + + for (int i = 0; i < size; i++) { + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + } + + if (SIZE_UPDATER.addAndGet(this, -size) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + } finally { + headLock.unlock(); + } + } + + @Override + public int size() { + return SIZE_UPDATER.get(this); + } + + @Override + public Iterator<T> iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + tailLock.lock(); + headLock.lock(); + + try { + int headIndex = this.headIndex.value; + int size = SIZE_UPDATER.get(this); + + sb.append('['); + + for (int i = 0; i < size; i++) { + T item = data[headIndex]; + if (i > 0) { + sb.append(", "); + } + + sb.append(item); + + headIndex = (headIndex + 1) & (data.length - 1); + } + + sb.append(']'); + } finally { + headLock.unlock(); + tailLock.unlock(); + } + return sb.toString(); + } + + @SuppressWarnings("unchecked") + private void expandArray() { + // We already hold the tailLock + headLock.lock(); + + try { + int size = SIZE_UPDATER.get(this); + int newCapacity = data.length * 2; + T[] newData = (T[]) new Object[newCapacity]; + + int oldHeadIndex = headIndex.value; + int newTailIndex = 0; + + for (int i = 0; i < size; i++) { + newData[newTailIndex++] = data[oldHeadIndex]; + oldHeadIndex = (oldHeadIndex + 1) & (data.length - 1); + } + + data = newData; + headIndex.value = 0; + tailIndex.value = size; + } finally { + headLock.unlock(); + } + } + + final static class PaddedInt { + private int value; + + // Padding to avoid false sharing + public volatile int pi1 = 1; + public volatile long p1 = 1L, p2 = 2L, p3 = 3L, p4 = 4L, p5 = 5L, p6 = 6L; + + public long exposeToAvoidOptimization() { + return pi1 + p1 + p2 + p3 + p4 + p5 + p6; + } + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java new file mode 100644 index 0000000..9fd7e84 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java @@ -0,0 +1,206 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.util.collections; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class GrowableArrayBlockingQueueTest { + + @Test + public void simple() throws Exception { + BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4); + + assertEquals(null, queue.poll()); + + assertEquals(Integer.MAX_VALUE, queue.remainingCapacity()); + assertEquals("[]", queue.toString()); + + try { + queue.element(); + fail("Should have thrown exception"); + } catch (NoSuchElementException e) { + // Expected + } + + try { + queue.iterator(); + fail("Should have thrown exception"); + } catch (UnsupportedOperationException e) { + // Expected + } + + // Test index rollover + for (int i = 0; i < 100; i++) { + queue.add(i); + + assertEquals(i, queue.take().intValue()); + } + + queue.offer(1); + assertEquals("[1]", queue.toString()); + queue.offer(2); + assertEquals("[1, 2]", queue.toString()); + queue.offer(3); + assertEquals("[1, 2, 3]", queue.toString()); + queue.offer(4); + assertEquals("[1, 2, 3, 4]", queue.toString()); + + assertEquals(4, queue.size()); + + List<Integer> list = new ArrayList<>(); + queue.drainTo(list, 3); + + assertEquals(1, queue.size()); + assertEquals(Lists.newArrayList(1, 2, 3), list); + assertEquals("[4]", queue.toString()); + assertEquals(4, queue.peek().intValue()); + + assertEquals(4, queue.element().intValue()); + assertEquals(4, queue.remove().intValue()); + try { + queue.remove(); + fail("Should have thrown exception"); + } catch (NoSuchElementException e) { + // Expected + } + } + + @Test(timeout = 10000) + public void blockingTake() throws Exception { + BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(); + + CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + try { + int expected = 0; + + for (int i = 0; i < 100; i++) { + int n = queue.take(); + + assertEquals(expected++, n); + } + + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + int n = 0; + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + queue.put(n); + ++n; + } + + // Wait until all the entries are consumed + while (!queue.isEmpty()) { + Thread.sleep(1); + } + } + + latch.await(); + } + + @Test + public void growArray() throws Exception { + BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4); + + assertEquals(null, queue.poll()); + + assertTrue(queue.offer(1)); + assertTrue(queue.offer(2)); + assertTrue(queue.offer(3)); + assertTrue(queue.offer(4)); + assertTrue(queue.offer(5)); + + assertEquals(5, queue.size()); + + queue.clear(); + assertEquals(0, queue.size()); + + assertTrue(queue.offer(1, 1, TimeUnit.SECONDS)); + assertTrue(queue.offer(2, 1, TimeUnit.SECONDS)); + assertTrue(queue.offer(3, 1, TimeUnit.SECONDS)); + assertEquals(3, queue.size()); + + List<Integer> list = new ArrayList<>(); + queue.drainTo(list); + assertEquals(0, queue.size()); + + assertEquals(Lists.newArrayList(1, 2, 3), list); + } + + @Test(timeout = 10000) + public void pollTimeout() throws Exception { + BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4); + + assertEquals(null, queue.poll(1, TimeUnit.MILLISECONDS)); + + queue.put(1); + assertEquals(1, queue.poll(1, TimeUnit.MILLISECONDS).intValue()); + + // 0 timeout should not block + assertEquals(null, queue.poll(0, TimeUnit.HOURS)); + + queue.put(2); + queue.put(3); + assertEquals(2, queue.poll(1, TimeUnit.HOURS).intValue()); + assertEquals(3, queue.poll(1, TimeUnit.HOURS).intValue()); + } + + @Test(timeout = 10000) + public void pollTimeout2() throws Exception { + BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(); + + CountDownLatch latch = new CountDownLatch(1); + + new Thread(() -> { + try { + queue.poll(1, TimeUnit.HOURS); + + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + // Make sure background thread is waiting on poll + Thread.sleep(100); + queue.put(1); + + latch.await(); + } +}
