Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244982460 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> { + + private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level<T>[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level<T>[] array) { + this.levels = array; + } + + private Level<T>[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static <T> Level<T>[] newLevelArrayInstance(int length) { + return (Level<T>[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set<Integer> getPriorites() { + Level<T>[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator<T> iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers<T> reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer<? super T> action) { + Objects.requireNonNull(action); + Level<T>[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level<T> getLevel(int level, boolean createIfMissing) { + Level<T>[] current = getArray(); + int low = 0; + int high = current.length - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + Level<T> midVal = current[mid]; + + if (midVal.level() > level) + low = mid + 1; + else if (midVal.level() < level) + high = mid - 1; + else + return midVal; //key found + } + + if (createIfMissing) { + Level<T>[] newLevels = newLevelArrayInstance(current.length + 1); + if (low > 0) { + System.arraycopy(current, 0, newLevels, 0, low); + } + if (current.length - low > 0) { + System.arraycopy(current, low, newLevels, low + 1, current.length - low); + } + newLevels[low] = new Level<T>(level); + setArray(newLevels); + return newLevels[low]; + } + return null; + } + + @Override + public synchronized boolean add(T t) { + boolean result = addInternal(t); + calcSize(); + return result; + } + + private boolean addInternal(T t) { + if (t == null) return false; + Level<T> level = getLevel(t.getPriority(), true); + return level.add(t); + } + + @Override + public boolean remove(Object o) { + return o instanceof PriorityAware && remove((PriorityAware) o); + } + + public synchronized boolean remove(PriorityAware priorityAware) { + boolean result = removeInternal(priorityAware); + calcSize(); + return result; + } + + private boolean removeInternal(PriorityAware priorityAware) { + if ( priorityAware == null) return false; + Level<T> level = getLevel(priorityAware.getPriority(), false); + boolean result = level != null && level.remove(priorityAware); + if (level != null && level.size() == 0) { + removeLevel(level.level); + } + return result; + } + + private Level<T> removeLevel(int level) { + Level<T>[] current = getArray(); + int len = current.length; + int low = 0; + int high = len - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + Level<T> midVal = current[mid]; + + if (midVal.level() > level) + low = mid + 1; + else if (midVal.level() < level) + high = mid - 1; + else { + Level<T>[] newLevels = newLevelArrayInstance(len - 1); + System.arraycopy(current, 0, newLevels, 0, mid); + System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1); + setArray(newLevels); + return midVal; //key found + } + } + return null; + } + + @Override + public boolean containsAll(Collection<?> c) { + Objects.requireNonNull(c); + for (Object e : c) + if (!contains(e)) + return false; + return true; + } + + @Override + public synchronized boolean addAll(Collection<? extends T> c) { + Objects.requireNonNull(c); + boolean modified = false; + for (T e : c) + if (addInternal(e)) + modified = true; + calcSize(); + return modified; + } + + @Override + public synchronized boolean removeAll(Collection<?> c) { + Objects.requireNonNull(c); + boolean modified = false; + for (Object o : c) { + if (remove(o)) { + modified = true; + } + } + calcSize(); + return modified; + } + + @Override + public synchronized boolean retainAll(Collection<?> c) { + Objects.requireNonNull(c); + boolean modified = false; + Level<T>[] levels = getArray(); + for (Level<T> level : levels) { + if (level.retainAll(c)) { + modified = true; + } + } + calcSize(); + return modified; + } + + @Override + public synchronized void clear() { + Level<T>[] levels = getArray(); + for (Level<T> level : levels) { + level.clear(); + } + calcSize(); + } + + + + @Override + public boolean contains(Object o) { + return o instanceof PriorityAware && contains((PriorityAware) o); + } + + public boolean contains(PriorityAware priorityAware) { + if (priorityAware == null) return false; + Level<T> level = getLevel(priorityAware.getPriority(), false); + return level != null && level.contains(priorityAware); + } + + private void calcSize() { + Level<T>[] current = getArray(); + int size = 0; + for (Level<T> level : current) { + size += level.size(); + } + this.size = size; + } + + private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> { + + private final QueueConsumersImpl<T> queueConsumers; + private final boolean resetable; + private Level<T>[] levels; + int level = -1; + private ResetableIterator<T> currentIterator; + + private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) { + this.queueConsumers = queueConsumers; + this.levels = queueConsumers.getArray(); + this.resetable = resetable; + + } + + @Override + public boolean hasNext() { + while (true) { + if (currentIterator != null) { + if (currentIterator.hasNext()) { + return true; + } + } + int nextLevel = level + 1; + if (levels != null && nextLevel < levels.length) { + moveToLevel(nextLevel); + } else { + return false; + } + } + } + + @Override + public T next() { + while (true) { + if (currentIterator != null) { + if (currentIterator.hasNext()) { + return currentIterator.next(); + } + } + int nextLevel = level + 1; + if (levels != null && nextLevel < levels.length) { + moveToLevel(nextLevel); + } else { + return null; + } + } + } + + private void moveToLevel(int level) { + Level<T> level0 = levels[level]; + if (resetable) { + currentIterator = level0.resetableIterator().reset(); + } else { + currentIterator = level0.iterator(); + } + this.level = level; + } + + @Override + public ResetableIterator<T> reset() { + if (!resetable) { + throw new IllegalStateException("Iterator is not resetable"); + } + levels = queueConsumers.getArray(); + level = -1; + currentIterator = null; + return this; + } + } + + /** + * This is represents a getPriority and is modeled on CopyOnArrayList. + * + * @param <E> + */ + private static class Level<E> { --- End diff -- I'm not a big fan of multiple layers of abstractions, but I admit that `Level` is sharing here a lot from `CopyOnWriteArratList`: we can't just reuse it somehow? It is due to the `setArray` call?
---