This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit d1ae5b59aa92f96589a686cdb1c89ca23e00d881 Author: Jark Wu <[email protected]> AuthorDate: Wed Dec 24 01:14:23 2025 +0800 [common] Add IntObjectHashMap and IntObjectMap implementations to replace dependency on eclipse-collections by arrow --- .../apache/arrow/vector/util/IntObjectHashMap.java | 730 +++++++++++++++++++++ .../org/apache/arrow/vector/util/IntObjectMap.java | 89 +++ .../arrow/vector/util/MapWithOrdinalImpl.java | 248 +++++++ .../arrow/vector/util/MultiMapWithOrdinal.java | 239 +++++++ fluss-test-coverage/pom.xml | 2 + 5 files changed, 1308 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectHashMap.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectHashMap.java new file mode 100644 index 000000000..c9ecf04d1 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectHashMap.java @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util; + +import java.util.AbstractCollection; +import java.util.AbstractSet; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +// TODO: This is a temporary workaround to resolve Arrow 15's dependency on eclipse-collections, +// which carries a Category-B license (see https://github.com/apache/arrow/issues/40896). +// Without these override classes, ArrowReaderWriterTest fails with a ClassNotFoundException. +// This workaround should be removed once we upgrade to Arrow 16 or later. Alternatively, these +// override classes could be moved to `fluss-shaded-arrow` to isolate the dependency. + +/** + * A vendored specialized copy of Netty's IntObjectHashMap for use within Arrow. Avoids requiring + * Netty in the Arrow core just for this one class. + * + * @param <V> The value type stored in the map. + */ +class IntObjectHashMap<V> implements IntObjectMap<V> { + + /** Default initial capacity. Used if not specified in the constructor */ + public static final int DEFAULT_CAPACITY = 8; + + /** Default load factor. Used if not specified in the constructor */ + public static final float DEFAULT_LOAD_FACTOR = 0.5f; + + /** + * Placeholder for null values, so we can use the actual null to mean available. (Better than + * using a placeholder for available: less references for GC processing.) + */ + private static final Object NULL_VALUE = new Object(); + + /** The maximum number of elements allowed without allocating more space. */ + private int maxSize; + + /** The load factor for the map. Used to calculate {@link #maxSize}. */ + private final float loadFactor; + + private int[] keys; + private V[] values; + private int size; + private int mask; + + private final Set<Integer> keySet = new KeySet(); + private final Set<Entry<Integer, V>> entrySet = new EntrySet(); + private final Iterable<PrimitiveEntry<V>> entries = + new Iterable<PrimitiveEntry<V>>() { + @Override + public Iterator<PrimitiveEntry<V>> iterator() { + return new PrimitiveIterator(); + } + }; + + public IntObjectHashMap() { + this(DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR); + } + + public IntObjectHashMap(int initialCapacity) { + this(initialCapacity, DEFAULT_LOAD_FACTOR); + } + + public IntObjectHashMap(int initialCapacity, float loadFactor) { + if (loadFactor <= 0.0f || loadFactor > 1.0f) { + // Cannot exceed 1 because we can never store more than capacity elements; + // using a bigger loadFactor would trigger rehashing before the desired load is reached. + throw new IllegalArgumentException("loadFactor must be > 0 and <= 1"); + } + + this.loadFactor = loadFactor; + + // Adjust the initial capacity if necessary. + int capacity = safeFindNextPositivePowerOfTwo(initialCapacity); + mask = capacity - 1; + + // Allocate the arrays. + keys = new int[capacity]; + @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) + V[] temp = (V[]) new Object[capacity]; + values = temp; + + // Initialize the maximum size value. + maxSize = calcMaxSize(capacity); + } + + private static <T> T toExternal(T value) { + assert value != null : "null is not a legitimate internal value. Concurrent Modification?"; + return value == NULL_VALUE ? null : value; + } + + @SuppressWarnings("unchecked") + private static <T> T toInternal(T value) { + return value == null ? (T) NULL_VALUE : value; + } + + @Override + public V get(int key) { + int index = indexOf(key); + return index == -1 ? null : toExternal(values[index]); + } + + @Override + public V put(int key, V value) { + int startIndex = hashIndex(key); + int index = startIndex; + + for (; ; ) { + if (values[index] == null) { + // Found empty slot, use it. + keys[index] = key; + values[index] = toInternal(value); + growSize(); + return null; + } + if (keys[index] == key) { + // Found existing entry with this key, just replace the value. + V previousValue = values[index]; + values[index] = toInternal(value); + return toExternal(previousValue); + } + + // Conflict, keep probing ... + if ((index = probeNext(index)) == startIndex) { + // Can only happen if the map was full at MAX_ARRAY_SIZE and couldn't grow. + throw new IllegalStateException("Unable to insert"); + } + } + } + + @Override + public void putAll(Map<? extends Integer, ? extends V> sourceMap) { + if (sourceMap instanceof IntObjectHashMap) { + // Optimization - iterate through the arrays. + @SuppressWarnings("unchecked") + IntObjectHashMap<V> source = (IntObjectHashMap<V>) sourceMap; + for (int i = 0; i < source.values.length; ++i) { + V sourceValue = source.values[i]; + if (sourceValue != null) { + put(source.keys[i], sourceValue); + } + } + return; + } + + // Otherwise, just add each entry. + for (Entry<? extends Integer, ? extends V> entry : sourceMap.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public V remove(int key) { + int index = indexOf(key); + if (index == -1) { + return null; + } + + V prev = values[index]; + removeAt(index); + return toExternal(prev); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @Override + public void clear() { + Arrays.fill(keys, (int) 0); + Arrays.fill(values, null); + size = 0; + } + + @Override + public boolean containsKey(int key) { + return indexOf(key) >= 0; + } + + @Override + public boolean containsValue(Object value) { + @SuppressWarnings("unchecked") + V v1 = toInternal((V) value); + for (V v2 : values) { + // The map supports null values; this will be matched as NULL_VALUE.equals(NULL_VALUE). + if (v2 != null && v2.equals(v1)) { + return true; + } + } + return false; + } + + @Override + public Iterable<PrimitiveEntry<V>> entries() { + return entries; + } + + @Override + public Collection<V> values() { + return new AbstractCollection<V>() { + @Override + public Iterator<V> iterator() { + return new Iterator<V>() { + final PrimitiveIterator iter = new PrimitiveIterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public V next() { + return iter.next().value(); + } + + @Override + public void remove() { + iter.remove(); + } + }; + } + + @Override + public int size() { + return size; + } + }; + } + + @Override + public int hashCode() { + // Hashcode is based on all non-zero, valid keys. We have to scan the whole keys + // array, which may have different lengths for two maps of same size(), so the + // capacity cannot be used as input for hashing but the size can. + int hash = size; + for (int key : keys) { + // 0 can be a valid key or unused slot, but won't impact the hashcode in either case. + // This way we can use a cheap loop without conditionals, or hard-to-unroll operations, + // or the devastatingly bad memory locality of visiting value objects. + // Also, it's important to use a hash function that does not depend on the ordering + // of terms, only their values; since the map is an unordered collection and + // entries can end up in different positions in different maps that have the same + // elements, but with different history of puts/removes, due to conflicts. + hash ^= hashCode(key); + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof IntObjectMap)) { + return false; + } + @SuppressWarnings("rawtypes") + IntObjectMap other = (IntObjectMap) obj; + if (size != other.size()) { + return false; + } + for (int i = 0; i < values.length; ++i) { + V value = values[i]; + if (value != null) { + int key = keys[i]; + Object otherValue = other.get(key); + if (value == NULL_VALUE) { + if (otherValue != null) { + return false; + } + } else if (!value.equals(otherValue)) { + return false; + } + } + } + return true; + } + + @Override + public boolean containsKey(Object key) { + return containsKey(objectToKey(key)); + } + + @Override + public V get(Object key) { + return get(objectToKey(key)); + } + + @Override + public V put(Integer key, V value) { + return put(objectToKey(key), value); + } + + @Override + public V remove(Object key) { + return remove(objectToKey(key)); + } + + @Override + public Set<Integer> keySet() { + return keySet; + } + + @Override + public Set<Entry<Integer, V>> entrySet() { + return entrySet; + } + + private int objectToKey(Object key) { + return (int) (Integer) key; + } + + /** + * Locates the index for the given key. This method probes using double hashing. + * + * @param key the key for an entry in the map. + * @return the index where the key was found, or {@code -1} if no entry is found for that key. + */ + private int indexOf(int key) { + int startIndex = hashIndex(key); + int index = startIndex; + + for (; ; ) { + if (values[index] == null) { + // It's available, so no chance that this value exists anywhere in the map. + return -1; + } + if (key == keys[index]) { + return index; + } + + // Conflict, keep probing ... + if ((index = probeNext(index)) == startIndex) { + return -1; + } + } + } + + /** Returns the hashed index for the given key. */ + private int hashIndex(int key) { + // The array lengths are always a power of two, so we can use a bitmask to stay inside the + // array bounds. + return hashCode(key) & mask; + } + + /** Returns the hash code for the key. */ + private static int hashCode(int key) { + return key; + } + + /** Get the next sequential index after {@code index} and wraps if necessary. */ + private int probeNext(int index) { + // The array lengths are always a power of two, so we can use a bitmask to stay inside the + // array bounds. + return (index + 1) & mask; + } + + /** Grows the map size after an insertion. If necessary, performs a rehash of the map. */ + private void growSize() { + size++; + + if (size > maxSize) { + if (keys.length == Integer.MAX_VALUE) { + throw new IllegalStateException("Max capacity reached at size=" + size); + } + + // Double the capacity. + rehash(keys.length << 1); + } + } + + /** + * Removes entry at the given index position. Also performs opportunistic, incremental rehashing + * if necessary to not break conflict chains. + * + * @param index the index position of the element to remove. + * @return {@code true} if the next item was moved back. {@code false} otherwise. + */ + private boolean removeAt(final int index) { + --size; + // Clearing the key is not strictly necessary (for GC like in a regular collection), + // but recommended for security. The memory location is still fresh in the cache anyway. + keys[index] = 0; + values[index] = null; + + // In the interval from index to the next available entry, the arrays may have entries + // that are displaced from their base position due to prior conflicts. Iterate these + // entries and move them back if possible, optimizing future lookups. + // Knuth Section 6.4 Algorithm R, also used by the JDK's IdentityHashMap. + + int nextFree = index; + int i = probeNext(index); + for (V value = values[i]; value != null; value = values[i = probeNext(i)]) { + int key = keys[i]; + int bucket = hashIndex(key); + if (i < bucket && (bucket <= nextFree || nextFree <= i) + || bucket <= nextFree && nextFree <= i) { + // Move the displaced entry "back" to the first available position. + keys[nextFree] = key; + values[nextFree] = value; + // Put the first entry after the displaced entry + keys[i] = 0; + values[i] = null; + nextFree = i; + } + } + return nextFree != index; + } + + /** Calculates the maximum size allowed before rehashing. */ + private int calcMaxSize(int capacity) { + // Clip the upper bound so that there will always be at least one available slot. + int upperBound = capacity - 1; + return Math.min(upperBound, (int) (capacity * loadFactor)); + } + + /** + * Rehashes the map for the given capacity. + * + * @param newCapacity the new capacity for the map. + */ + private void rehash(int newCapacity) { + int[] oldKeys = keys; + V[] oldVals = values; + + keys = new int[newCapacity]; + @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) + V[] temp = (V[]) new Object[newCapacity]; + values = temp; + + maxSize = calcMaxSize(newCapacity); + mask = newCapacity - 1; + + // Insert to the new arrays. + for (int i = 0; i < oldVals.length; ++i) { + V oldVal = oldVals[i]; + if (oldVal != null) { + // Inlined put(), but much simpler: we don't need to worry about + // duplicated keys, growing/rehashing, or failing to insert. + int oldKey = oldKeys[i]; + int index = hashIndex(oldKey); + + for (; ; ) { + if (values[index] == null) { + keys[index] = oldKey; + values[index] = oldVal; + break; + } + + // Conflict, keep probing. Can wrap around, but never reaches startIndex again. + index = probeNext(index); + } + } + } + } + + @Override + public String toString() { + if (isEmpty()) { + return "{}"; + } + StringBuilder sb = new StringBuilder(4 * size); + sb.append('{'); + boolean first = true; + for (int i = 0; i < values.length; ++i) { + V value = values[i]; + if (value != null) { + if (!first) { + sb.append(", "); + } + sb.append(keyToString(keys[i])) + .append('=') + .append(value == this ? "(this Map)" : toExternal(value)); + first = false; + } + } + return sb.append('}').toString(); + } + + /** + * Helper method called by {@link #toString()} in order to convert a single map key into a + * string. This is protected to allow subclasses to override the appearance of a given key. + */ + protected String keyToString(int key) { + return Integer.toString(key); + } + + /** Set implementation for iterating over the entries of the map. */ + private final class EntrySet extends AbstractSet<Entry<Integer, V>> { + @Override + public Iterator<Entry<Integer, V>> iterator() { + return new MapIterator(); + } + + @Override + public int size() { + return IntObjectHashMap.this.size(); + } + } + + /** Set implementation for iterating over the keys. */ + private final class KeySet extends AbstractSet<Integer> { + @Override + public int size() { + return IntObjectHashMap.this.size(); + } + + @Override + public boolean contains(Object o) { + return IntObjectHashMap.this.containsKey(o); + } + + @Override + public boolean remove(Object o) { + return IntObjectHashMap.this.remove(o) != null; + } + + @Override + public boolean retainAll(Collection<?> retainedKeys) { + boolean changed = false; + for (Iterator<PrimitiveEntry<V>> iter = entries().iterator(); iter.hasNext(); ) { + PrimitiveEntry<V> entry = iter.next(); + if (!retainedKeys.contains(entry.key())) { + changed = true; + iter.remove(); + } + } + return changed; + } + + @Override + public void clear() { + IntObjectHashMap.this.clear(); + } + + @Override + public Iterator<Integer> iterator() { + return new Iterator<Integer>() { + private final Iterator<Entry<Integer, V>> iter = entrySet.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Integer next() { + return iter.next().getKey(); + } + + @Override + public void remove() { + iter.remove(); + } + }; + } + } + + /** + * Iterator over primitive entries. Entry key/values are overwritten by each call to {@link + * #next()}. + */ + private final class PrimitiveIterator + implements Iterator<PrimitiveEntry<V>>, PrimitiveEntry<V> { + private int prevIndex = -1; + private int nextIndex = -1; + private int entryIndex = -1; + + private void scanNext() { + while (++nextIndex != values.length && values[nextIndex] == null) {} + } + + @Override + public boolean hasNext() { + if (nextIndex == -1) { + scanNext(); + } + return nextIndex != values.length; + } + + @Override + public PrimitiveEntry<V> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + prevIndex = nextIndex; + scanNext(); + + // Always return the same Entry object, just change its index each time. + entryIndex = prevIndex; + return this; + } + + @Override + public void remove() { + if (prevIndex == -1) { + throw new IllegalStateException("next must be called before each remove."); + } + if (removeAt(prevIndex)) { + // removeAt may move elements "back" in the array if they have been displaced + // because their spot in the + // array was occupied when they were inserted. If this occurs then the nextIndex is + // now invalid and + // should instead point to the prevIndex which now holds an element which was "moved + // back". + nextIndex = prevIndex; + } + prevIndex = -1; + } + + // Entry implementation. Since this implementation uses a single Entry, we coalesce that + // into the Iterator object (potentially making loop optimization much easier). + + @Override + public int key() { + return keys[entryIndex]; + } + + @Override + public V value() { + return toExternal(values[entryIndex]); + } + + @Override + public void setValue(V value) { + values[entryIndex] = toInternal(value); + } + } + + /** Iterator used by the {@link Map} interface. */ + private final class MapIterator implements Iterator<Entry<Integer, V>> { + private final PrimitiveIterator iter = new PrimitiveIterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Entry<Integer, V> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + iter.next(); + + return new MapEntry(iter.entryIndex); + } + + @Override + public void remove() { + iter.remove(); + } + } + + /** A single entry in the map. */ + final class MapEntry implements Entry<Integer, V> { + private final int entryIndex; + + MapEntry(int entryIndex) { + this.entryIndex = entryIndex; + } + + @Override + public Integer getKey() { + verifyExists(); + return keys[entryIndex]; + } + + @Override + public V getValue() { + verifyExists(); + return toExternal(values[entryIndex]); + } + + @Override + public V setValue(V value) { + verifyExists(); + V prevValue = toExternal(values[entryIndex]); + values[entryIndex] = toInternal(value); + return prevValue; + } + + private void verifyExists() { + if (values[entryIndex] == null) { + throw new IllegalStateException("The map entry has been removed"); + } + } + } + + static int safeFindNextPositivePowerOfTwo(final int value) { + return value <= 0 + ? 1 + : value >= 0x40000000 ? 0x40000000 : findNextPositivePowerOfTwo(value); + } + + static int findNextPositivePowerOfTwo(final int value) { + assert value > Integer.MIN_VALUE && value < 0x40000000; + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectMap.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectMap.java new file mode 100644 index 000000000..60f61787c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/IntObjectMap.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util; + +import java.util.Iterator; +import java.util.Map; + +// TODO: This is a temporary workaround to resolve Arrow 15's dependency on eclipse-collections, +// which carries a Category-B license (see https://github.com/apache/arrow/issues/40896). +// Without these override classes, ArrowReaderWriterTest fails with a ClassNotFoundException. +// This workaround should be removed once we upgrade to Arrow 16 or later. Alternatively, these +// override classes could be moved to `fluss-shaded-arrow` to isolate the dependency. +/** + * A vendored specialized copy of Netty's IntObjectMap for use within Arrow. Avoids requiring Netty + * in the Arrow core just for this one class. + * + * @param <V> the value type stored in the map. + */ +interface IntObjectMap<V> extends Map<Integer, V> { + + /** + * A primitive entry in the map, provided by the iterator from {@link #entries()}. + * + * @param <V> the value type stored in the map. + */ + interface PrimitiveEntry<V> { + /** Gets the key for this entry. */ + int key(); + + /** Gets the value for this entry. */ + V value(); + + /** Sets the value for this entry. */ + void setValue(V value); + } + + /** + * Gets the value in the map with the specified key. + * + * @param key the key whose associated value is to be returned. + * @return the value or {@code null} if the key was not found in the map. + */ + V get(int key); + + /** + * Puts the given entry into the map. + * + * @param key the key of the entry. + * @param value the value of the entry. + * @return the previous value for this key or {@code null} if there was no previous mapping. + */ + V put(int key, V value); + + /** + * Removes the entry with the specified key. + * + * @param key the key for the entry to be removed from this map. + * @return the previous value for the key, or {@code null} if there was no mapping. + */ + V remove(int key); + + /** + * Gets an iterable to traverse over the primitive entries contained in this map. As an + * optimization, the {@link PrimitiveEntry}s returned by the {@link Iterator} may change as the + * {@link Iterator} progresses. The caller should not rely on {@link PrimitiveEntry} key/value + * stability. + */ + Iterable<PrimitiveEntry<V>> entries(); + + /** Indicates whether or not this map contains a value for the specified key. */ + boolean containsKey(int key); +} diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MapWithOrdinalImpl.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MapWithOrdinalImpl.java new file mode 100644 index 000000000..528655d7b --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MapWithOrdinalImpl.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +// TODO: This is a temporary workaround to resolve Arrow 15's dependency on eclipse-collections, +// which carries a Category-B license (see https://github.com/apache/arrow/issues/40896). +// Without these override classes, ArrowReaderWriterTest fails with a ClassNotFoundException. +// This workaround should be removed once we upgrade to Arrow 16 or later. Alternatively, these +// override classes could be moved to `fluss-shaded-arrow` to isolate the dependency. + +/** + * An implementation of map that supports constant time look-up by a generic key or an ordinal. + * + * <p>This class extends the functionality a regular {@link Map} with ordinal lookup support. Upon + * insertion an unused ordinal is assigned to the inserted (key, value) tuple. Upon update the same + * ordinal id is re-used while value is replaced. Upon deletion of an existing item, its + * corresponding ordinal is recycled and could be used by another item. + * + * <p>For any instance with N items, this implementation guarantees that ordinals are in the range + * of [0, N). However, the ordinal assignment is dynamic and may change after an insertion or + * deletion. Consumers of this class are responsible for explicitly checking the ordinal + * corresponding to a key via {@link MapWithOrdinalImpl#getOrdinal(Object)} before attempting to + * execute a lookup with an ordinal. + * + * @param <K> key type + * @param <V> value type + */ +public class MapWithOrdinalImpl<K, V> implements MapWithOrdinal<K, V> { + + private final Map<K, Map.Entry<Integer, V>> primary = new LinkedHashMap<>(); + private final IntObjectHashMap<V> secondary = new IntObjectHashMap<>(); + + private final Map<K, V> delegate = + new Map<K, V>() { + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public int size() { + return primary.size(); + } + + @Override + public boolean containsKey(Object key) { + return primary.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return primary.containsValue(value); + } + + @Override + public V get(Object key) { + Entry<Integer, V> pair = primary.get(key); + if (pair != null) { + return pair.getValue(); + } + return null; + } + + @Override + public V put(K key, V value) { + final Entry<Integer, V> oldPair = primary.get(key); + // if key exists try replacing otherwise, assign a new ordinal identifier + final int ordinal = oldPair == null ? primary.size() : oldPair.getKey(); + primary.put(key, new AbstractMap.SimpleImmutableEntry<>(ordinal, value)); + secondary.put(ordinal, value); + return oldPair == null ? null : oldPair.getValue(); + } + + @Override + public V remove(Object key) { + final Entry<Integer, V> oldPair = primary.remove(key); + if (oldPair != null) { + final int lastOrdinal = secondary.size(); + final V last = secondary.get(lastOrdinal); + // normalize mappings so that all numbers until primary.size() is assigned + // swap the last element with the deleted one + secondary.put(oldPair.getKey(), last); + primary.put( + (K) key, + new AbstractMap.SimpleImmutableEntry<>(oldPair.getKey(), last)); + } + return oldPair == null ? null : oldPair.getValue(); + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + primary.clear(); + secondary.clear(); + } + + @Override + public Set<K> keySet() { + return primary.keySet(); + } + + @Override + public Collection<V> values() { + return secondary.values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return primary.entrySet().stream() + .map( + entry -> + new AbstractMap.SimpleImmutableEntry<>( + entry.getKey(), entry.getValue().getValue())) + .collect(Collectors.toSet()); + } + }; + + /** + * Returns the value corresponding to the given ordinal. + * + * @param id ordinal value for lookup + * @return an instance of V + */ + @Override + public V getByOrdinal(int id) { + return secondary.get(id); + } + + /** + * Returns the ordinal corresponding to the given key. + * + * @param key key for ordinal lookup + * @return ordinal value corresponding to key if it exists or -1 + */ + @Override + public int getOrdinal(K key) { + Map.Entry<Integer, V> pair = primary.get(key); + if (pair != null) { + return pair.getKey(); + } + return -1; + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public Collection<V> getAll(K key) { + if (delegate.containsKey(key)) { + List<V> list = new ArrayList<>(1); + list.add(get(key)); + return list; + } + return null; + } + + @Override + public V get(K key) { + return delegate.get(key); + } + + /** + * Inserts the tuple (key, value) into the map extending the semantics of {@link Map#put} with + * automatic ordinal assignment. A new ordinal is assigned if key does not exists. Otherwise the + * same ordinal is re-used but the value is replaced. + * + * @see java.util.Map#put + */ + @Override + public boolean put(K key, V value, boolean overwrite) { + return delegate.put(key, value) != null; + } + + @Override + public Collection<V> values() { + return delegate.values(); + } + + @Override + public boolean remove(K key, V value) { + return false; + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key); + } + + /** + * Removes the element corresponding to the key if exists extending the semantics of {@link + * java.util.Map#remove} with ordinal re-cycling. The ordinal corresponding to the given key may + * be re-assigned to another tuple. It is important that consumer checks the ordinal value via + * {@link MapWithOrdinalImpl#getOrdinal(Object)} before attempting to look-up by ordinal. + * + * @see java.util.Map#remove + */ + @Override + public boolean removeAll(K key) { + return delegate.remove(key) != null; + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set<K> keys() { + return delegate.keySet(); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MultiMapWithOrdinal.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MultiMapWithOrdinal.java new file mode 100644 index 000000000..964a1eef9 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/vector/util/MultiMapWithOrdinal.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +// TODO: This is a temporary workaround to resolve Arrow 15's dependency on eclipse-collections, +// which carries a Category-B license (see https://github.com/apache/arrow/issues/40896). +// Without these override classes, ArrowReaderWriterTest fails with a ClassNotFoundException. +// This workaround should be removed once we upgrade to Arrow 16 or later. Alternatively, these +// override classes could be moved to `fluss-shaded-arrow` to isolate the dependency. + +/** + * An implementation of a multimap that supports constant time look-up by a generic key or an + * ordinal. + * + * <p>This class extends the functionality a regular {@link Map} with ordinal lookup support. Upon + * insertion an unused ordinal is assigned to the inserted (key, value) tuple. Upon update the same + * ordinal id is re-used while value is replaced. Upon deletion of an existing item, its + * corresponding ordinal is recycled and could be used by another item. + * + * <p>For any instance with N items, this implementation guarantees that ordinals are in the range + * of [0, N). However, the ordinal assignment is dynamic and may change after an insertion or + * deletion. Consumers of this class are responsible for explicitly checking the ordinal + * corresponding to a key via {@link MultiMapWithOrdinal#getOrdinal(Object)} before attempting to + * execute a lookup with an ordinal. + * + * @param <K> key type + * @param <V> value type + */ +public class MultiMapWithOrdinal<K, V> implements MapWithOrdinal<K, V> { + + private final Map<K, Set<Integer>> keyToOrdinal = new LinkedHashMap<>(); + private final IntObjectHashMap<V> ordinalToValue = new IntObjectHashMap<>(); + + /** + * Returns the value corresponding to the given ordinal. + * + * @param id ordinal value for lookup + * @return an instance of V + */ + @Override + public V getByOrdinal(int id) { + return ordinalToValue.get(id); + } + + /** + * Returns the ordinal corresponding to the given key. + * + * @param key key for ordinal lookup + * @return ordinal value corresponding to key if it exists or -1 + */ + @Override + public int getOrdinal(K key) { + Set<Integer> pair = getOrdinals(key); + if (!pair.isEmpty()) { + return pair.iterator().next(); + } + return -1; + } + + private Set<Integer> getOrdinals(K key) { + return keyToOrdinal.getOrDefault(key, new HashSet<>()); + } + + @Override + public int size() { + return ordinalToValue.size(); + } + + @Override + public boolean isEmpty() { + return ordinalToValue.isEmpty(); + } + + /** get set of values for key. */ + @Override + public V get(K key) { + Set<Integer> ordinals = keyToOrdinal.get(key); + if (ordinals == null) { + return null; + } + return ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList()).get(0); + } + + /** get set of values for key. */ + @Override + public Collection<V> getAll(K key) { + Set<Integer> ordinals = keyToOrdinal.get(key); + if (ordinals == null) { + return null; + } + return ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList()); + } + + /** + * Inserts the tuple (key, value) into the multimap with automatic ordinal assignment. + * + * <p>A new ordinal is assigned if key/value pair does not exists. + * + * <p>If overwrite is true the existing key will be overwritten with value else value will be + * appended to the multimap. + */ + @Override + public boolean put(K key, V value, boolean overwrite) { + if (overwrite) { + removeAll(key); + } + Set<Integer> ordinalSet = getOrdinals(key); + int nextOrdinal = ordinalToValue.size(); + ordinalToValue.put(nextOrdinal, value); + boolean changed = ordinalSet.add(nextOrdinal); + keyToOrdinal.put(key, ordinalSet); + return changed; + } + + @Override + public Collection<V> values() { + return ordinalToValue.values(); + } + + @Override + public boolean containsKey(K key) { + return keyToOrdinal.containsKey(key); + } + + /** + * Removes the element corresponding to the key/value if exists with ordinal re-cycling. + * + * <p>The ordinal corresponding to the given key may be re-assigned to another tuple. It is + * important that consumer checks the ordinal value via {@link + * MultiMapWithOrdinal#getOrdinal(Object)} before attempting to look-up by ordinal. + * + * <p>If the multimap is changed return true. + */ + @Override + public synchronized boolean remove(K key, V value) { + Set<Integer> removalSet = getOrdinals(key); + if (removalSet.isEmpty()) { + return false; + } + Optional<V> removeValue = + removalSet.stream().map(ordinalToValue::get).filter(value::equals).findFirst(); + if (!removeValue.isPresent()) { + return false; + } + int removalOrdinal = removeKv(removalSet, key, value); + int lastOrdinal = ordinalToValue.size(); + if (lastOrdinal != removalOrdinal) { // we didn't remove the last ordinal + swapOrdinal(lastOrdinal, removalOrdinal); + } + return true; + } + + private void swapOrdinal(int lastOrdinal, int removalOrdinal) { + V swapOrdinalValue = ordinalToValue.remove(lastOrdinal); + ordinalToValue.put(removalOrdinal, swapOrdinalValue); + K swapOrdinalKey = + keyToOrdinal.entrySet().stream() + .filter(kv -> kv.getValue().stream().anyMatch(o -> o == lastOrdinal)) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "MultimapWithOrdinal in bad state")); + ordinalToValue.put(removalOrdinal, swapOrdinalValue); + Set<Integer> swapSet = getOrdinals(swapOrdinalKey); + swapSet.remove(lastOrdinal); + swapSet.add(removalOrdinal); + keyToOrdinal.put(swapOrdinalKey, swapSet); + } + + private int removeKv(Set<Integer> removalSet, K key, V value) { + Integer removalOrdinal = + removalSet.stream() + .filter(i -> ordinalToValue.get(i).equals(value)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "MultimapWithOrdinal in bad state")); + ordinalToValue.remove(removalOrdinal); + removalSet.remove(removalOrdinal); + if (removalSet.isEmpty()) { + keyToOrdinal.remove(key); + } else { + keyToOrdinal.put(key, removalSet); + } + return removalOrdinal; + } + + /** remove all entries of key. */ + @Override + public synchronized boolean removeAll(K key) { + Collection<V> values = this.getAll(key); + if (values == null) { + return false; + } + for (V v : values) { + this.remove(key, v); + } + return true; + } + + @Override + public void clear() { + ordinalToValue.clear(); + keyToOrdinal.clear(); + } + + @Override + public Set<K> keys() { + return keyToOrdinal.keySet(); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 46c34da57..f82aa3321 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -325,6 +325,8 @@ </exclude> <exclude>org.apache.fluss.security.auth.ServerAuthenticator</exclude> <exclude>org.apache.fluss.config.cluster.AlterConfig</exclude> + <!-- can be removed once we upgrade arrow to 16.0+ --> + <exclude>org.apache.fluss.shaded.arrow.org.apache.arrow.vector.util.*</exclude> <!-- start exclude for flink-connector --> <exclude>org.apache.fluss.flink.utils.*</exclude> <exclude>org.apache.fluss.flink.source.*
