Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> { + + private final Supplier<Collection<T>> supplier; + private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder<T>[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder<T>[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier<Collection<T>> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set<Integer> getPriorites() { + PriorityHolder<T>[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator<T> iterator() { + Iterator<T>[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator<T>[] getIterators() { + PriorityHolder<T>[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator<T>[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static <T> Iterator<T>[] newIteratorArrayInstance(int length) { + return (Iterator<T>[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator<T> resettableIterator() { + return new MultiResettableIterator<T>(getResettableIterators()); + } + + private ResettableIterator<T>[] getResettableIterators() { + PriorityHolder<T>[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) { + return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length); + } + + @Override + public void forEach(Consumer<? super T> action) { + Objects.requireNonNull(action); + PriorityHolder<T>[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].getValues().forEach(action); + } + } + + private Collection<T> getCollection(int priority, boolean createIfMissing) { + PriorityHolder<T>[] current = getArray(); + int low = 0; + int high = current.length - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + PriorityHolder<T> midVal = current[mid]; + + if (midVal.getPriority() > priority) + low = mid + 1; + else if (midVal.getPriority() < priority) + high = mid - 1; + else + return midVal.getValues(); //key found + } + + if (createIfMissing) { + PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1); + if (low > 0) { + System.arraycopy(current, 0, newArray, 0, low); + } + if (current.length - low > 0) { + System.arraycopy(current, low, newArray, low + 1, current.length - low); + } + newArray[low] = new PriorityHolder<T>(priority, supplier); + setArray(newArray); + return newArray[low].getValues(); + } + return null; + } + + @Override + public synchronized boolean add(T t) { --- End diff -- This needs to be concurrent safe on modifications
---