Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536
  
    --- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils.collections;
    +
    +import org.apache.activemq.artemis.core.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.function.Consumer;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the the different collections used for 
each priority level.
    + *
    + * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
    + * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
    + * if list, then list semantics.
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
    + */
    +public class PriorityCollection<T extends PriorityAware> extends 
AbstractCollection<T> {
    +
    +   private final Supplier<Collection<T>> supplier;
    +   private volatile PriorityHolder<T>[] priorityHolders = 
newPrioritySetArrayInstance(0);
    +   private volatile int size;
    +
    +   private void setArray(PriorityHolder<T>[] priorityHolders) {
    +      this.priorityHolders = priorityHolders;
    +   }
    +
    +   private PriorityHolder<T>[] getArray() {
    +      return priorityHolders;
    +   }
    +
    +
    +   public PriorityCollection(Supplier<Collection<T>> supplier) {
    +      this.supplier = supplier;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int 
length) {
    +      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, 
length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   public Set<Integer> getPriorites() {
    +      PriorityHolder<T>[] snapshot = getArray();
    +      return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      Iterator<T>[] iterators = getIterators();
    +      return new MultiIterator<>(iterators);
    +   }
    +
    +   private Iterator<T>[] getIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      Iterator<T>[] iterators = newIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = snapshot[i].getValues().iterator();
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
    +      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
    +   }
    +
    +   public ResettableIterator<T> resettableIterator() {
    +      return new MultiResettableIterator<T>(getResettableIterators());
    +   }
    +
    +   private ResettableIterator<T>[] getResettableIterators() {
    +      PriorityHolder<T>[] snapshot = this.getArray();
    +      int size = snapshot.length;
    +      ResettableIterator<T>[] iterators = 
newResettableIteratorArrayInstance(size);
    +      for (int i = 0; i < size; i++) {
    +         iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
    +      }
    +      return iterators;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> ResettableIterator<T>[] 
newResettableIteratorArrayInstance(int length) {
    +      return (ResettableIterator<T>[]) 
Array.newInstance(ResettableIterator.class, length);
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      PriorityHolder<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].getValues().forEach(action);
    +      }
    +   }
    +
    +   private Collection<T> getCollection(int priority, boolean 
createIfMissing) {
    +      PriorityHolder<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         PriorityHolder<T> midVal = current[mid];
    +
    +         if (midVal.getPriority() > priority)
    +            low = mid + 1;
    +         else if (midVal.getPriority() < priority)
    +            high = mid - 1;
    +         else
    +            return midVal.getValues(); //key found
    +      }
    +
    +      if (createIfMissing) {
    +         PriorityHolder<T>[] newArray = 
newPrioritySetArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newArray, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newArray, low + 1, 
current.length - low);
    +         }
    +         newArray[low] = new PriorityHolder<T>(priority, supplier);
    +         setArray(newArray);
    +         return newArray[low].getValues();
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    This needs to be concurrent safe on modifications


---

Reply via email to