Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> { + + private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater<QueueConsumersImpl, ResettableIterator> changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator<T> changedIterator; + private ResettableIterator<T> currentIterator = consumers.resettableIterator(); + + @Override + public Set<Integer> getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers<T> reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); --- End diff -- I suppose that `reset` is safe to be called just by one thread at time, if not, it would be complex because `currentIterator` could be changed indipendently
---