Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979032 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> { + + private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level<T>[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level<T>[] array) { + this.levels = array; + } + + private Level<T>[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static <T> Level<T>[] newLevelArrayInstance(int length) { + return (Level<T>[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set<Integer> getPriorites() { + Level<T>[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator<T> iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers<T> reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer<? super T> action) { + Objects.requireNonNull(action); + Level<T>[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level<T> getLevel(int level, boolean createIfMissing) { + Level<T>[] current = getArray(); + int low = 0; + int high = current.length - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + Level<T> midVal = current[mid]; + + if (midVal.level() > level) + low = mid + 1; + else if (midVal.level() < level) + high = mid - 1; + else + return midVal; //key found + } + + if (createIfMissing) { + Level<T>[] newLevels = newLevelArrayInstance(current.length + 1); --- End diff -- will pack this one into a separate method (if possible)
---