Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178
  
    --- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +import org.apache.activemq.artemis.utils.collections.PriorityCollection;
    +import org.apache.activemq.artemis.utils.collections.ResettableIterator;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.Spliterator;
    +import java.util.concurrent.CopyOnWriteArraySet;
    +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    +import java.util.function.Consumer;
    +
    +/**
    + * This class's purpose is to hold the consumers.
    + *
    + * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
    + * but also lock less for a read path, which is our HOT path.
    + * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
    + *
    + * There can only be one resettable iterable view,
    + * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
    + * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
    + *
    + * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
    + *
    + * @param <T> The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> implements 
QueueConsumers<T> {
    +
    +   private final PriorityCollection<T> consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
    +   private final Collection<T> unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
    +   private final AtomicReferenceFieldUpdater<QueueConsumersImpl, 
ResettableIterator> changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
    +   private volatile ResettableIterator<T> changedIterator;
    +   private ResettableIterator<T> currentIterator = 
consumers.resettableIterator();
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      return consumers.getPriorites();
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return currentIterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return currentIterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      if (changedIterator != null) {
    +         currentIterator = changedIterator;
    +         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
    +      } else {
    +         currentIterator.reset();
    --- End diff --
    
    I suppose that `reset` is safe to be called just by one thread at time, if 
not, it would be complex because `currentIterator` could be changed 
indipendently


---

Reply via email to