This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-shaded.git


The following commit(s) were added to refs/heads/main by this push:
     new 04180c7  [arrow] Add IntObjectHashMap and IntObjectMap implementations 
to replace dependency on eclipse-collections by arrow (#6)
04180c7 is described below

commit 04180c7fe083f728010731e686c392f3f724b240
Author: Jark Wu <[email protected]>
AuthorDate: Sun Jan 18 15:14:23 2026 +0800

    [arrow] Add IntObjectHashMap and IntObjectMap implementations to replace 
dependency on eclipse-collections by arrow (#6)
---
 .../fluss-shaded-arrow-15/pom.xml                  |   2 +-
 .../apache/arrow/vector/util/IntObjectHashMap.java | 729 +++++++++++++++++++++
 .../org/apache/arrow/vector/util/IntObjectMap.java |  88 +++
 .../arrow/vector/util/MapWithOrdinalImpl.java      | 247 +++++++
 .../arrow/vector/util/MultiMapWithOrdinal.java     | 238 +++++++
 .../src/main/resources/META-INF/NOTICE             |   8 +-
 pom.xml                                            |  13 +
 7 files changed, 1320 insertions(+), 5 deletions(-)

diff --git a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
index d60f32b..f6fb9d4 100644
--- a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
+++ b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
@@ -36,7 +36,7 @@
     <properties>
         <!-- Arrow 15 mistaken introduced org.eclipse.collections as a 
dependency, which has
         license legal problem. See 
https://github.com/apache/arrow/issues/40896 -->
-        <arrow.version>15.0.0</arrow.version>
+        <arrow.version>15.0.2</arrow.version>
     </properties>
 
     <dependencies>
diff --git 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
new file mode 100644
index 0000000..59b1256
--- /dev/null
+++ 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on 
eclipse-collections,
+//  which carries a Category-B license (see 
https://github.com/apache/arrow/issues/40896).
+//  Without these override classes, Fluss ArrowReaderWriterTest fails with a 
ClassNotFoundException.
+//  This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * A vendored specialized copy of Netty's IntObjectHashMap for use within 
Arrow. Avoids requiring
+ * Netty in the Arrow core just for this one class.
+ *
+ * @param <V> The value type stored in the map.
+ */
+class IntObjectHashMap<V> implements IntObjectMap<V> {
+
+    /** Default initial capacity. Used if not specified in the constructor */
+    public static final int DEFAULT_CAPACITY = 8;
+
+    /** Default load factor. Used if not specified in the constructor */
+    public static final float DEFAULT_LOAD_FACTOR = 0.5f;
+
+    /**
+     * Placeholder for null values, so we can use the actual null to mean 
available. (Better than
+     * using a placeholder for available: less references for GC processing.)
+     */
+    private static final Object NULL_VALUE = new Object();
+
+    /** The maximum number of elements allowed without allocating more space. 
*/
+    private int maxSize;
+
+    /** The load factor for the map. Used to calculate {@link #maxSize}. */
+    private final float loadFactor;
+
+    private int[] keys;
+    private V[] values;
+    private int size;
+    private int mask;
+
+    private final Set<Integer> keySet = new KeySet();
+    private final Set<Entry<Integer, V>> entrySet = new EntrySet();
+    private final Iterable<PrimitiveEntry<V>> entries =
+            new Iterable<PrimitiveEntry<V>>() {
+                @Override
+                public Iterator<PrimitiveEntry<V>> iterator() {
+                    return new PrimitiveIterator();
+                }
+            };
+
+    public IntObjectHashMap() {
+        this(DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR);
+    }
+
+    public IntObjectHashMap(int initialCapacity) {
+        this(initialCapacity, DEFAULT_LOAD_FACTOR);
+    }
+
+    public IntObjectHashMap(int initialCapacity, float loadFactor) {
+        if (loadFactor <= 0.0f || loadFactor > 1.0f) {
+            // Cannot exceed 1 because we can never store more than capacity 
elements;
+            // using a bigger loadFactor would trigger rehashing before the 
desired load is reached.
+            throw new IllegalArgumentException("loadFactor must be > 0 and <= 
1");
+        }
+
+        this.loadFactor = loadFactor;
+
+        // Adjust the initial capacity if necessary.
+        int capacity = safeFindNextPositivePowerOfTwo(initialCapacity);
+        mask = capacity - 1;
+
+        // Allocate the arrays.
+        keys = new int[capacity];
+        @SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
+        V[] temp = (V[]) new Object[capacity];
+        values = temp;
+
+        // Initialize the maximum size value.
+        maxSize = calcMaxSize(capacity);
+    }
+
+    private static <T> T toExternal(T value) {
+        assert value != null : "null is not a legitimate internal value. 
Concurrent Modification?";
+        return value == NULL_VALUE ? null : value;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T toInternal(T value) {
+        return value == null ? (T) NULL_VALUE : value;
+    }
+
+    @Override
+    public V get(int key) {
+        int index = indexOf(key);
+        return index == -1 ? null : toExternal(values[index]);
+    }
+
+    @Override
+    public V put(int key, V value) {
+        int startIndex = hashIndex(key);
+        int index = startIndex;
+
+        for (; ; ) {
+            if (values[index] == null) {
+                // Found empty slot, use it.
+                keys[index] = key;
+                values[index] = toInternal(value);
+                growSize();
+                return null;
+            }
+            if (keys[index] == key) {
+                // Found existing entry with this key, just replace the value.
+                V previousValue = values[index];
+                values[index] = toInternal(value);
+                return toExternal(previousValue);
+            }
+
+            // Conflict, keep probing ...
+            if ((index = probeNext(index)) == startIndex) {
+                // Can only happen if the map was full at MAX_ARRAY_SIZE and 
couldn't grow.
+                throw new IllegalStateException("Unable to insert");
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Integer, ? extends V> sourceMap) {
+        if (sourceMap instanceof IntObjectHashMap) {
+            // Optimization - iterate through the arrays.
+            @SuppressWarnings("unchecked")
+            IntObjectHashMap<V> source = (IntObjectHashMap<V>) sourceMap;
+            for (int i = 0; i < source.values.length; ++i) {
+                V sourceValue = source.values[i];
+                if (sourceValue != null) {
+                    put(source.keys[i], sourceValue);
+                }
+            }
+            return;
+        }
+
+        // Otherwise, just add each entry.
+        for (Entry<? extends Integer, ? extends V> entry : 
sourceMap.entrySet()) {
+            put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public V remove(int key) {
+        int index = indexOf(key);
+        if (index == -1) {
+            return null;
+        }
+
+        V prev = values[index];
+        removeAt(index);
+        return toExternal(prev);
+    }
+
+    @Override
+    public int size() {
+        return size;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    @Override
+    public void clear() {
+        Arrays.fill(keys, (int) 0);
+        Arrays.fill(values, null);
+        size = 0;
+    }
+
+    @Override
+    public boolean containsKey(int key) {
+        return indexOf(key) >= 0;
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        @SuppressWarnings("unchecked")
+        V v1 = toInternal((V) value);
+        for (V v2 : values) {
+            // The map supports null values; this will be matched as 
NULL_VALUE.equals(NULL_VALUE).
+            if (v2 != null && v2.equals(v1)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Iterable<PrimitiveEntry<V>> entries() {
+        return entries;
+    }
+
+    @Override
+    public Collection<V> values() {
+        return new AbstractCollection<V>() {
+            @Override
+            public Iterator<V> iterator() {
+                return new Iterator<V>() {
+                    final PrimitiveIterator iter = new PrimitiveIterator();
+
+                    @Override
+                    public boolean hasNext() {
+                        return iter.hasNext();
+                    }
+
+                    @Override
+                    public V next() {
+                        return iter.next().value();
+                    }
+
+                    @Override
+                    public void remove() {
+                        iter.remove();
+                    }
+                };
+            }
+
+            @Override
+            public int size() {
+                return size;
+            }
+        };
+    }
+
+    @Override
+    public int hashCode() {
+        // Hashcode is based on all non-zero, valid keys. We have to scan the 
whole keys
+        // array, which may have different lengths for two maps of same 
size(), so the
+        // capacity cannot be used as input for hashing but the size can.
+        int hash = size;
+        for (int key : keys) {
+            // 0 can be a valid key or unused slot, but won't impact the 
hashcode in either case.
+            // This way we can use a cheap loop without conditionals, or 
hard-to-unroll operations,
+            // or the devastatingly bad memory locality of visiting value 
objects.
+            // Also, it's important to use a hash function that does not 
depend on the ordering
+            // of terms, only their values; since the map is an unordered 
collection and
+            // entries can end up in different positions in different maps 
that have the same
+            // elements, but with different history of puts/removes, due to 
conflicts.
+            hash ^= hashCode(key);
+        }
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof IntObjectMap)) {
+            return false;
+        }
+        @SuppressWarnings("rawtypes")
+        IntObjectMap other = (IntObjectMap) obj;
+        if (size != other.size()) {
+            return false;
+        }
+        for (int i = 0; i < values.length; ++i) {
+            V value = values[i];
+            if (value != null) {
+                int key = keys[i];
+                Object otherValue = other.get(key);
+                if (value == NULL_VALUE) {
+                    if (otherValue != null) {
+                        return false;
+                    }
+                } else if (!value.equals(otherValue)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return containsKey(objectToKey(key));
+    }
+
+    @Override
+    public V get(Object key) {
+        return get(objectToKey(key));
+    }
+
+    @Override
+    public V put(Integer key, V value) {
+        return put(objectToKey(key), value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        return remove(objectToKey(key));
+    }
+
+    @Override
+    public Set<Integer> keySet() {
+        return keySet;
+    }
+
+    @Override
+    public Set<Entry<Integer, V>> entrySet() {
+        return entrySet;
+    }
+
+    private int objectToKey(Object key) {
+        return (int) (Integer) key;
+    }
+
+    /**
+     * Locates the index for the given key. This method probes using double 
hashing.
+     *
+     * @param key the key for an entry in the map.
+     * @return the index where the key was found, or {@code -1} if no entry is 
found for that key.
+     */
+    private int indexOf(int key) {
+        int startIndex = hashIndex(key);
+        int index = startIndex;
+
+        for (; ; ) {
+            if (values[index] == null) {
+                // It's available, so no chance that this value exists 
anywhere in the map.
+                return -1;
+            }
+            if (key == keys[index]) {
+                return index;
+            }
+
+            // Conflict, keep probing ...
+            if ((index = probeNext(index)) == startIndex) {
+                return -1;
+            }
+        }
+    }
+
+    /** Returns the hashed index for the given key. */
+    private int hashIndex(int key) {
+        // The array lengths are always a power of two, so we can use a 
bitmask to stay inside the
+        // array bounds.
+        return hashCode(key) & mask;
+    }
+
+    /** Returns the hash code for the key. */
+    private static int hashCode(int key) {
+        return key;
+    }
+
+    /** Get the next sequential index after {@code index} and wraps if 
necessary. */
+    private int probeNext(int index) {
+        // The array lengths are always a power of two, so we can use a 
bitmask to stay inside the
+        // array bounds.
+        return (index + 1) & mask;
+    }
+
+    /** Grows the map size after an insertion. If necessary, performs a rehash 
of the map. */
+    private void growSize() {
+        size++;
+
+        if (size > maxSize) {
+            if (keys.length == Integer.MAX_VALUE) {
+                throw new IllegalStateException("Max capacity reached at 
size=" + size);
+            }
+
+            // Double the capacity.
+            rehash(keys.length << 1);
+        }
+    }
+
+    /**
+     * Removes entry at the given index position. Also performs opportunistic, 
incremental rehashing
+     * if necessary to not break conflict chains.
+     *
+     * @param index the index position of the element to remove.
+     * @return {@code true} if the next item was moved back. {@code false} 
otherwise.
+     */
+    private boolean removeAt(final int index) {
+        --size;
+        // Clearing the key is not strictly necessary (for GC like in a 
regular collection),
+        // but recommended for security. The memory location is still fresh in 
the cache anyway.
+        keys[index] = 0;
+        values[index] = null;
+
+        // In the interval from index to the next available entry, the arrays 
may have entries
+        // that are displaced from their base position due to prior conflicts. 
Iterate these
+        // entries and move them back if possible, optimizing future lookups.
+        // Knuth Section 6.4 Algorithm R, also used by the JDK's 
IdentityHashMap.
+
+        int nextFree = index;
+        int i = probeNext(index);
+        for (V value = values[i]; value != null; value = values[i = 
probeNext(i)]) {
+            int key = keys[i];
+            int bucket = hashIndex(key);
+            if (i < bucket && (bucket <= nextFree || nextFree <= i)
+                    || bucket <= nextFree && nextFree <= i) {
+                // Move the displaced entry "back" to the first available 
position.
+                keys[nextFree] = key;
+                values[nextFree] = value;
+                // Put the first entry after the displaced entry
+                keys[i] = 0;
+                values[i] = null;
+                nextFree = i;
+            }
+        }
+        return nextFree != index;
+    }
+
+    /** Calculates the maximum size allowed before rehashing. */
+    private int calcMaxSize(int capacity) {
+        // Clip the upper bound so that there will always be at least one 
available slot.
+        int upperBound = capacity - 1;
+        return Math.min(upperBound, (int) (capacity * loadFactor));
+    }
+
+    /**
+     * Rehashes the map for the given capacity.
+     *
+     * @param newCapacity the new capacity for the map.
+     */
+    private void rehash(int newCapacity) {
+        int[] oldKeys = keys;
+        V[] oldVals = values;
+
+        keys = new int[newCapacity];
+        @SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
+        V[] temp = (V[]) new Object[newCapacity];
+        values = temp;
+
+        maxSize = calcMaxSize(newCapacity);
+        mask = newCapacity - 1;
+
+        // Insert to the new arrays.
+        for (int i = 0; i < oldVals.length; ++i) {
+            V oldVal = oldVals[i];
+            if (oldVal != null) {
+                // Inlined put(), but much simpler: we don't need to worry 
about
+                // duplicated keys, growing/rehashing, or failing to insert.
+                int oldKey = oldKeys[i];
+                int index = hashIndex(oldKey);
+
+                for (; ; ) {
+                    if (values[index] == null) {
+                        keys[index] = oldKey;
+                        values[index] = oldVal;
+                        break;
+                    }
+
+                    // Conflict, keep probing. Can wrap around, but never 
reaches startIndex again.
+                    index = probeNext(index);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (isEmpty()) {
+            return "{}";
+        }
+        StringBuilder sb = new StringBuilder(4 * size);
+        sb.append('{');
+        boolean first = true;
+        for (int i = 0; i < values.length; ++i) {
+            V value = values[i];
+            if (value != null) {
+                if (!first) {
+                    sb.append(", ");
+                }
+                sb.append(keyToString(keys[i]))
+                        .append('=')
+                        .append(value == this ? "(this Map)" : 
toExternal(value));
+                first = false;
+            }
+        }
+        return sb.append('}').toString();
+    }
+
+    /**
+     * Helper method called by {@link #toString()} in order to convert a 
single map key into a
+     * string. This is protected to allow subclasses to override the 
appearance of a given key.
+     */
+    protected String keyToString(int key) {
+        return Integer.toString(key);
+    }
+
+    /** Set implementation for iterating over the entries of the map. */
+    private final class EntrySet extends AbstractSet<Entry<Integer, V>> {
+        @Override
+        public Iterator<Entry<Integer, V>> iterator() {
+            return new MapIterator();
+        }
+
+        @Override
+        public int size() {
+            return IntObjectHashMap.this.size();
+        }
+    }
+
+    /** Set implementation for iterating over the keys. */
+    private final class KeySet extends AbstractSet<Integer> {
+        @Override
+        public int size() {
+            return IntObjectHashMap.this.size();
+        }
+
+        @Override
+        public boolean contains(Object o) {
+            return IntObjectHashMap.this.containsKey(o);
+        }
+
+        @Override
+        public boolean remove(Object o) {
+            return IntObjectHashMap.this.remove(o) != null;
+        }
+
+        @Override
+        public boolean retainAll(Collection<?> retainedKeys) {
+            boolean changed = false;
+            for (Iterator<PrimitiveEntry<V>> iter = entries().iterator(); 
iter.hasNext(); ) {
+                PrimitiveEntry<V> entry = iter.next();
+                if (!retainedKeys.contains(entry.key())) {
+                    changed = true;
+                    iter.remove();
+                }
+            }
+            return changed;
+        }
+
+        @Override
+        public void clear() {
+            IntObjectHashMap.this.clear();
+        }
+
+        @Override
+        public Iterator<Integer> iterator() {
+            return new Iterator<Integer>() {
+                private final Iterator<Entry<Integer, V>> iter = 
entrySet.iterator();
+
+                @Override
+                public boolean hasNext() {
+                    return iter.hasNext();
+                }
+
+                @Override
+                public Integer next() {
+                    return iter.next().getKey();
+                }
+
+                @Override
+                public void remove() {
+                    iter.remove();
+                }
+            };
+        }
+    }
+
+    /**
+     * Iterator over primitive entries. Entry key/values are overwritten by 
each call to {@link
+     * #next()}.
+     */
+    private final class PrimitiveIterator
+            implements Iterator<PrimitiveEntry<V>>, PrimitiveEntry<V> {
+        private int prevIndex = -1;
+        private int nextIndex = -1;
+        private int entryIndex = -1;
+
+        private void scanNext() {
+            while (++nextIndex != values.length && values[nextIndex] == null) 
{}
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextIndex == -1) {
+                scanNext();
+            }
+            return nextIndex != values.length;
+        }
+
+        @Override
+        public PrimitiveEntry<V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            prevIndex = nextIndex;
+            scanNext();
+
+            // Always return the same Entry object, just change its index each 
time.
+            entryIndex = prevIndex;
+            return this;
+        }
+
+        @Override
+        public void remove() {
+            if (prevIndex == -1) {
+                throw new IllegalStateException("next must be called before 
each remove.");
+            }
+            if (removeAt(prevIndex)) {
+                // removeAt may move elements "back" in the array if they have 
been displaced
+                // because their spot in the
+                // array was occupied when they were inserted. If this occurs 
then the nextIndex is
+                // now invalid and
+                // should instead point to the prevIndex which now holds an 
element which was "moved
+                // back".
+                nextIndex = prevIndex;
+            }
+            prevIndex = -1;
+        }
+
+        // Entry implementation. Since this implementation uses a single 
Entry, we coalesce that
+        // into the Iterator object (potentially making loop optimization much 
easier).
+
+        @Override
+        public int key() {
+            return keys[entryIndex];
+        }
+
+        @Override
+        public V value() {
+            return toExternal(values[entryIndex]);
+        }
+
+        @Override
+        public void setValue(V value) {
+            values[entryIndex] = toInternal(value);
+        }
+    }
+
+    /** Iterator used by the {@link Map} interface. */
+    private final class MapIterator implements Iterator<Entry<Integer, V>> {
+        private final PrimitiveIterator iter = new PrimitiveIterator();
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public Entry<Integer, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            iter.next();
+
+            return new MapEntry(iter.entryIndex);
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+    }
+
+    /** A single entry in the map. */
+    final class MapEntry implements Entry<Integer, V> {
+        private final int entryIndex;
+
+        MapEntry(int entryIndex) {
+            this.entryIndex = entryIndex;
+        }
+
+        @Override
+        public Integer getKey() {
+            verifyExists();
+            return keys[entryIndex];
+        }
+
+        @Override
+        public V getValue() {
+            verifyExists();
+            return toExternal(values[entryIndex]);
+        }
+
+        @Override
+        public V setValue(V value) {
+            verifyExists();
+            V prevValue = toExternal(values[entryIndex]);
+            values[entryIndex] = toInternal(value);
+            return prevValue;
+        }
+
+        private void verifyExists() {
+            if (values[entryIndex] == null) {
+                throw new IllegalStateException("The map entry has been 
removed");
+            }
+        }
+    }
+
+    static int safeFindNextPositivePowerOfTwo(final int value) {
+        return value <= 0
+                ? 1
+                : value >= 0x40000000 ? 0x40000000 : 
findNextPositivePowerOfTwo(value);
+    }
+
+    static int findNextPositivePowerOfTwo(final int value) {
+        assert value > Integer.MIN_VALUE && value < 0x40000000;
+        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+    }
+}
diff --git 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
new file mode 100644
index 0000000..1dfbe15
--- /dev/null
+++ 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on 
eclipse-collections,
+//  which carries a Category-B license (see 
https://github.com/apache/arrow/issues/40896).
+//  Without these override classes, Fluss ArrowReaderWriterTest fails with a 
ClassNotFoundException.
+//  This workaround should be removed once we upgrade to Arrow 16 or later.
+/**
+ * A vendored specialized copy of Netty's IntObjectMap for use within Arrow. 
Avoids requiring Netty
+ * in the Arrow core just for this one class.
+ *
+ * @param <V> the value type stored in the map.
+ */
+interface IntObjectMap<V> extends Map<Integer, V> {
+
+    /**
+     * A primitive entry in the map, provided by the iterator from {@link 
#entries()}.
+     *
+     * @param <V> the value type stored in the map.
+     */
+    interface PrimitiveEntry<V> {
+        /** Gets the key for this entry. */
+        int key();
+
+        /** Gets the value for this entry. */
+        V value();
+
+        /** Sets the value for this entry. */
+        void setValue(V value);
+    }
+
+    /**
+     * Gets the value in the map with the specified key.
+     *
+     * @param key the key whose associated value is to be returned.
+     * @return the value or {@code null} if the key was not found in the map.
+     */
+    V get(int key);
+
+    /**
+     * Puts the given entry into the map.
+     *
+     * @param key the key of the entry.
+     * @param value the value of the entry.
+     * @return the previous value for this key or {@code null} if there was no 
previous mapping.
+     */
+    V put(int key, V value);
+
+    /**
+     * Removes the entry with the specified key.
+     *
+     * @param key the key for the entry to be removed from this map.
+     * @return the previous value for the key, or {@code null} if there was no 
mapping.
+     */
+    V remove(int key);
+
+    /**
+     * Gets an iterable to traverse over the primitive entries contained in 
this map. As an
+     * optimization, the {@link PrimitiveEntry}s returned by the {@link 
Iterator} may change as the
+     * {@link Iterator} progresses. The caller should not rely on {@link 
PrimitiveEntry} key/value
+     * stability.
+     */
+    Iterable<PrimitiveEntry<V>> entries();
+
+    /** Indicates whether or not this map contains a value for the specified 
key. */
+    boolean containsKey(int key);
+}
diff --git 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
new file mode 100644
index 0000000..dc63689
--- /dev/null
+++ 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on 
eclipse-collections,
+//  which carries a Category-B license (see 
https://github.com/apache/arrow/issues/40896).
+//  Without these override classes, Fluss ArrowReaderWriterTest fails with a 
ClassNotFoundException.
+//  This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * An implementation of map that supports constant time look-up by a generic 
key or an ordinal.
+ *
+ * <p>This class extends the functionality a regular {@link Map} with ordinal 
lookup support. Upon
+ * insertion an unused ordinal is assigned to the inserted (key, value) tuple. 
Upon update the same
+ * ordinal id is re-used while value is replaced. Upon deletion of an existing 
item, its
+ * corresponding ordinal is recycled and could be used by another item.
+ *
+ * <p>For any instance with N items, this implementation guarantees that 
ordinals are in the range
+ * of [0, N). However, the ordinal assignment is dynamic and may change after 
an insertion or
+ * deletion. Consumers of this class are responsible for explicitly checking 
the ordinal
+ * corresponding to a key via {@link MapWithOrdinalImpl#getOrdinal(Object)} 
before attempting to
+ * execute a lookup with an ordinal.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapWithOrdinalImpl<K, V> implements MapWithOrdinal<K, V> {
+
+    private final Map<K, Map.Entry<Integer, V>> primary = new 
LinkedHashMap<>();
+    private final IntObjectHashMap<V> secondary = new IntObjectHashMap<>();
+
+    private final Map<K, V> delegate =
+            new Map<K, V>() {
+                @Override
+                public boolean isEmpty() {
+                    return size() == 0;
+                }
+
+                @Override
+                public int size() {
+                    return primary.size();
+                }
+
+                @Override
+                public boolean containsKey(Object key) {
+                    return primary.containsKey(key);
+                }
+
+                @Override
+                public boolean containsValue(Object value) {
+                    return primary.containsValue(value);
+                }
+
+                @Override
+                public V get(Object key) {
+                    Entry<Integer, V> pair = primary.get(key);
+                    if (pair != null) {
+                        return pair.getValue();
+                    }
+                    return null;
+                }
+
+                @Override
+                public V put(K key, V value) {
+                    final Entry<Integer, V> oldPair = primary.get(key);
+                    // if key exists try replacing otherwise, assign a new 
ordinal identifier
+                    final int ordinal = oldPair == null ? primary.size() : 
oldPair.getKey();
+                    primary.put(key, new 
AbstractMap.SimpleImmutableEntry<>(ordinal, value));
+                    secondary.put(ordinal, value);
+                    return oldPair == null ? null : oldPair.getValue();
+                }
+
+                @Override
+                public V remove(Object key) {
+                    final Entry<Integer, V> oldPair = primary.remove(key);
+                    if (oldPair != null) {
+                        final int lastOrdinal = secondary.size();
+                        final V last = secondary.get(lastOrdinal);
+                        // normalize mappings so that all numbers until 
primary.size() is assigned
+                        // swap the last element with the deleted one
+                        secondary.put(oldPair.getKey(), last);
+                        primary.put(
+                                (K) key,
+                                new 
AbstractMap.SimpleImmutableEntry<>(oldPair.getKey(), last));
+                    }
+                    return oldPair == null ? null : oldPair.getValue();
+                }
+
+                @Override
+                public void putAll(Map<? extends K, ? extends V> m) {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public void clear() {
+                    primary.clear();
+                    secondary.clear();
+                }
+
+                @Override
+                public Set<K> keySet() {
+                    return primary.keySet();
+                }
+
+                @Override
+                public Collection<V> values() {
+                    return secondary.values();
+                }
+
+                @Override
+                public Set<Entry<K, V>> entrySet() {
+                    return primary.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            new 
AbstractMap.SimpleImmutableEntry<>(
+                                                    entry.getKey(), 
entry.getValue().getValue()))
+                            .collect(Collectors.toSet());
+                }
+            };
+
+    /**
+     * Returns the value corresponding to the given ordinal.
+     *
+     * @param id ordinal value for lookup
+     * @return an instance of V
+     */
+    @Override
+    public V getByOrdinal(int id) {
+        return secondary.get(id);
+    }
+
+    /**
+     * Returns the ordinal corresponding to the given key.
+     *
+     * @param key key for ordinal lookup
+     * @return ordinal value corresponding to key if it exists or -1
+     */
+    @Override
+    public int getOrdinal(K key) {
+        Map.Entry<Integer, V> pair = primary.get(key);
+        if (pair != null) {
+            return pair.getKey();
+        }
+        return -1;
+    }
+
+    @Override
+    public int size() {
+        return delegate.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return delegate.isEmpty();
+    }
+
+    @Override
+    public Collection<V> getAll(K key) {
+        if (delegate.containsKey(key)) {
+            List<V> list = new ArrayList<>(1);
+            list.add(get(key));
+            return list;
+        }
+        return null;
+    }
+
+    @Override
+    public V get(K key) {
+        return delegate.get(key);
+    }
+
+    /**
+     * Inserts the tuple (key, value) into the map extending the semantics of 
{@link Map#put} with
+     * automatic ordinal assignment. A new ordinal is assigned if key does not 
exists. Otherwise the
+     * same ordinal is re-used but the value is replaced.
+     *
+     * @see Map#put
+     */
+    @Override
+    public boolean put(K key, V value, boolean overwrite) {
+        return delegate.put(key, value) != null;
+    }
+
+    @Override
+    public Collection<V> values() {
+        return delegate.values();
+    }
+
+    @Override
+    public boolean remove(K key, V value) {
+        return false;
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return delegate.containsKey(key);
+    }
+
+    /**
+     * Removes the element corresponding to the key if exists extending the 
semantics of {@link
+     * Map#remove} with ordinal re-cycling. The ordinal corresponding to the 
given key may
+     * be re-assigned to another tuple. It is important that consumer checks 
the ordinal value via
+     * {@link MapWithOrdinalImpl#getOrdinal(Object)} before attempting to 
look-up by ordinal.
+     *
+     * @see Map#remove
+     */
+    @Override
+    public boolean removeAll(K key) {
+        return delegate.remove(key) != null;
+    }
+
+    @Override
+    public void clear() {
+        delegate.clear();
+    }
+
+    @Override
+    public Set<K> keys() {
+        return delegate.keySet();
+    }
+}
diff --git 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
new file mode 100644
index 0000000..ff77c80
--- /dev/null
+++ 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on 
eclipse-collections,
+//  which carries a Category-B license (see 
https://github.com/apache/arrow/issues/40896).
+//  Without these override classes, Fluss ArrowReaderWriterTest fails with a 
ClassNotFoundException.
+//  This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * An implementation of a multimap that supports constant time look-up by a 
generic key or an
+ * ordinal.
+ *
+ * <p>This class extends the functionality a regular {@link Map} with ordinal 
lookup support. Upon
+ * insertion an unused ordinal is assigned to the inserted (key, value) tuple. 
Upon update the same
+ * ordinal id is re-used while value is replaced. Upon deletion of an existing 
item, its
+ * corresponding ordinal is recycled and could be used by another item.
+ *
+ * <p>For any instance with N items, this implementation guarantees that 
ordinals are in the range
+ * of [0, N). However, the ordinal assignment is dynamic and may change after 
an insertion or
+ * deletion. Consumers of this class are responsible for explicitly checking 
the ordinal
+ * corresponding to a key via {@link MultiMapWithOrdinal#getOrdinal(Object)} 
before attempting to
+ * execute a lookup with an ordinal.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MultiMapWithOrdinal<K, V> implements MapWithOrdinal<K, V> {
+
+    private final Map<K, Set<Integer>> keyToOrdinal = new LinkedHashMap<>();
+    private final IntObjectHashMap<V> ordinalToValue = new 
IntObjectHashMap<>();
+
+    /**
+     * Returns the value corresponding to the given ordinal.
+     *
+     * @param id ordinal value for lookup
+     * @return an instance of V
+     */
+    @Override
+    public V getByOrdinal(int id) {
+        return ordinalToValue.get(id);
+    }
+
+    /**
+     * Returns the ordinal corresponding to the given key.
+     *
+     * @param key key for ordinal lookup
+     * @return ordinal value corresponding to key if it exists or -1
+     */
+    @Override
+    public int getOrdinal(K key) {
+        Set<Integer> pair = getOrdinals(key);
+        if (!pair.isEmpty()) {
+            return pair.iterator().next();
+        }
+        return -1;
+    }
+
+    private Set<Integer> getOrdinals(K key) {
+        return keyToOrdinal.getOrDefault(key, new HashSet<>());
+    }
+
+    @Override
+    public int size() {
+        return ordinalToValue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return ordinalToValue.isEmpty();
+    }
+
+    /** get set of values for key. */
+    @Override
+    public V get(K key) {
+        Set<Integer> ordinals = keyToOrdinal.get(key);
+        if (ordinals == null) {
+            return null;
+        }
+        return 
ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList()).get(0);
+    }
+
+    /** get set of values for key. */
+    @Override
+    public Collection<V> getAll(K key) {
+        Set<Integer> ordinals = keyToOrdinal.get(key);
+        if (ordinals == null) {
+            return null;
+        }
+        return 
ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList());
+    }
+
+    /**
+     * Inserts the tuple (key, value) into the multimap with automatic ordinal 
assignment.
+     *
+     * <p>A new ordinal is assigned if key/value pair does not exists.
+     *
+     * <p>If overwrite is true the existing key will be overwritten with value 
else value will be
+     * appended to the multimap.
+     */
+    @Override
+    public boolean put(K key, V value, boolean overwrite) {
+        if (overwrite) {
+            removeAll(key);
+        }
+        Set<Integer> ordinalSet = getOrdinals(key);
+        int nextOrdinal = ordinalToValue.size();
+        ordinalToValue.put(nextOrdinal, value);
+        boolean changed = ordinalSet.add(nextOrdinal);
+        keyToOrdinal.put(key, ordinalSet);
+        return changed;
+    }
+
+    @Override
+    public Collection<V> values() {
+        return ordinalToValue.values();
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        return keyToOrdinal.containsKey(key);
+    }
+
+    /**
+     * Removes the element corresponding to the key/value if exists with 
ordinal re-cycling.
+     *
+     * <p>The ordinal corresponding to the given key may be re-assigned to 
another tuple. It is
+     * important that consumer checks the ordinal value via {@link
+     * MultiMapWithOrdinal#getOrdinal(Object)} before attempting to look-up by 
ordinal.
+     *
+     * <p>If the multimap is changed return true.
+     */
+    @Override
+    public synchronized boolean remove(K key, V value) {
+        Set<Integer> removalSet = getOrdinals(key);
+        if (removalSet.isEmpty()) {
+            return false;
+        }
+        Optional<V> removeValue =
+                
removalSet.stream().map(ordinalToValue::get).filter(value::equals).findFirst();
+        if (!removeValue.isPresent()) {
+            return false;
+        }
+        int removalOrdinal = removeKv(removalSet, key, value);
+        int lastOrdinal = ordinalToValue.size();
+        if (lastOrdinal != removalOrdinal) { // we didn't remove the last 
ordinal
+            swapOrdinal(lastOrdinal, removalOrdinal);
+        }
+        return true;
+    }
+
+    private void swapOrdinal(int lastOrdinal, int removalOrdinal) {
+        V swapOrdinalValue = ordinalToValue.remove(lastOrdinal);
+        ordinalToValue.put(removalOrdinal, swapOrdinalValue);
+        K swapOrdinalKey =
+                keyToOrdinal.entrySet().stream()
+                        .filter(kv -> kv.getValue().stream().anyMatch(o -> o 
== lastOrdinal))
+                        .map(Map.Entry::getKey)
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "MultimapWithOrdinal in bad 
state"));
+        ordinalToValue.put(removalOrdinal, swapOrdinalValue);
+        Set<Integer> swapSet = getOrdinals(swapOrdinalKey);
+        swapSet.remove(lastOrdinal);
+        swapSet.add(removalOrdinal);
+        keyToOrdinal.put(swapOrdinalKey, swapSet);
+    }
+
+    private int removeKv(Set<Integer> removalSet, K key, V value) {
+        Integer removalOrdinal =
+                removalSet.stream()
+                        .filter(i -> ordinalToValue.get(i).equals(value))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "MultimapWithOrdinal in bad 
state"));
+        ordinalToValue.remove(removalOrdinal);
+        removalSet.remove(removalOrdinal);
+        if (removalSet.isEmpty()) {
+            keyToOrdinal.remove(key);
+        } else {
+            keyToOrdinal.put(key, removalSet);
+        }
+        return removalOrdinal;
+    }
+
+    /** remove all entries of key. */
+    @Override
+    public synchronized boolean removeAll(K key) {
+        Collection<V> values = this.getAll(key);
+        if (values == null) {
+            return false;
+        }
+        for (V v : values) {
+            this.remove(key, v);
+        }
+        return true;
+    }
+
+    @Override
+    public void clear() {
+        ordinalToValue.clear();
+        keyToOrdinal.clear();
+    }
+
+    @Override
+    public Set<K> keys() {
+        return keyToOrdinal.keySet();
+    }
+}
diff --git 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
index 176cd9c..c94901f 100644
--- 
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
+++ 
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
@@ -6,8 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.arrow:arrow-vector:15.0.0
-- org.apache.arrow:arrow-format:15.0.0
-- org.apache.arrow:arrow-memory-core:15.0.0
-- org.apache.arrow:arrow-memory-netty:15.0.0
+- org.apache.arrow:arrow-vector:15.0.2
+- org.apache.arrow:arrow-format:15.0.2
+- org.apache.arrow:arrow-memory-core:15.0.2
+- org.apache.arrow:arrow-memory-netty:15.0.2
 - com.google.flatbuffers:flatbuffers-java:23.5.26
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b76a9a0..38472ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
     </pluginRepositories>
 
     <properties>
+        <target.java.version>1.8</target.java.version>
         <shading.prefix>org.apache.fluss.shaded</shading.prefix>
         <netty.version>4.1.130.Final</netty.version>
         <jackson.version>2.15.3</jackson.version>
@@ -173,6 +174,18 @@
     <build>
         <pluginManagement>
             <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>3.8.0</version>
+                    <configuration>
+                        <source>${target.java.version}</source>
+                        <target>${target.java.version}</target>
+                        <!-- The semantics of this option are reversed, see 
MCOMPILER-209. -->
+                        
<useIncrementalCompilation>false</useIncrementalCompilation>
+                    </configuration>
+                </plugin>
+
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>

Reply via email to