This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-shaded.git
The following commit(s) were added to refs/heads/main by this push:
new 04180c7 [arrow] Add IntObjectHashMap and IntObjectMap implementations
to replace dependency on eclipse-collections by arrow (#6)
04180c7 is described below
commit 04180c7fe083f728010731e686c392f3f724b240
Author: Jark Wu <[email protected]>
AuthorDate: Sun Jan 18 15:14:23 2026 +0800
[arrow] Add IntObjectHashMap and IntObjectMap implementations to replace
dependency on eclipse-collections by arrow (#6)
---
.../fluss-shaded-arrow-15/pom.xml | 2 +-
.../apache/arrow/vector/util/IntObjectHashMap.java | 729 +++++++++++++++++++++
.../org/apache/arrow/vector/util/IntObjectMap.java | 88 +++
.../arrow/vector/util/MapWithOrdinalImpl.java | 247 +++++++
.../arrow/vector/util/MultiMapWithOrdinal.java | 238 +++++++
.../src/main/resources/META-INF/NOTICE | 8 +-
pom.xml | 13 +
7 files changed, 1320 insertions(+), 5 deletions(-)
diff --git a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
index d60f32b..f6fb9d4 100644
--- a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
+++ b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/pom.xml
@@ -36,7 +36,7 @@
<properties>
<!-- Arrow 15 mistaken introduced org.eclipse.collections as a
dependency, which has
license legal problem. See
https://github.com/apache/arrow/issues/40896 -->
- <arrow.version>15.0.0</arrow.version>
+ <arrow.version>15.0.2</arrow.version>
</properties>
<dependencies>
diff --git
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
new file mode 100644
index 0000000..59b1256
--- /dev/null
+++
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectHashMap.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on
eclipse-collections,
+// which carries a Category-B license (see
https://github.com/apache/arrow/issues/40896).
+// Without these override classes, Fluss ArrowReaderWriterTest fails with a
ClassNotFoundException.
+// This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * A vendored specialized copy of Netty's IntObjectHashMap for use within
Arrow. Avoids requiring
+ * Netty in the Arrow core just for this one class.
+ *
+ * @param <V> The value type stored in the map.
+ */
+class IntObjectHashMap<V> implements IntObjectMap<V> {
+
+ /** Default initial capacity. Used if not specified in the constructor */
+ public static final int DEFAULT_CAPACITY = 8;
+
+ /** Default load factor. Used if not specified in the constructor */
+ public static final float DEFAULT_LOAD_FACTOR = 0.5f;
+
+ /**
+ * Placeholder for null values, so we can use the actual null to mean
available. (Better than
+ * using a placeholder for available: less references for GC processing.)
+ */
+ private static final Object NULL_VALUE = new Object();
+
+ /** The maximum number of elements allowed without allocating more space.
*/
+ private int maxSize;
+
+ /** The load factor for the map. Used to calculate {@link #maxSize}. */
+ private final float loadFactor;
+
+ private int[] keys;
+ private V[] values;
+ private int size;
+ private int mask;
+
+ private final Set<Integer> keySet = new KeySet();
+ private final Set<Entry<Integer, V>> entrySet = new EntrySet();
+ private final Iterable<PrimitiveEntry<V>> entries =
+ new Iterable<PrimitiveEntry<V>>() {
+ @Override
+ public Iterator<PrimitiveEntry<V>> iterator() {
+ return new PrimitiveIterator();
+ }
+ };
+
+ public IntObjectHashMap() {
+ this(DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR);
+ }
+
+ public IntObjectHashMap(int initialCapacity) {
+ this(initialCapacity, DEFAULT_LOAD_FACTOR);
+ }
+
+ public IntObjectHashMap(int initialCapacity, float loadFactor) {
+ if (loadFactor <= 0.0f || loadFactor > 1.0f) {
+ // Cannot exceed 1 because we can never store more than capacity
elements;
+ // using a bigger loadFactor would trigger rehashing before the
desired load is reached.
+ throw new IllegalArgumentException("loadFactor must be > 0 and <=
1");
+ }
+
+ this.loadFactor = loadFactor;
+
+ // Adjust the initial capacity if necessary.
+ int capacity = safeFindNextPositivePowerOfTwo(initialCapacity);
+ mask = capacity - 1;
+
+ // Allocate the arrays.
+ keys = new int[capacity];
+ @SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
+ V[] temp = (V[]) new Object[capacity];
+ values = temp;
+
+ // Initialize the maximum size value.
+ maxSize = calcMaxSize(capacity);
+ }
+
+ private static <T> T toExternal(T value) {
+ assert value != null : "null is not a legitimate internal value.
Concurrent Modification?";
+ return value == NULL_VALUE ? null : value;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T toInternal(T value) {
+ return value == null ? (T) NULL_VALUE : value;
+ }
+
+ @Override
+ public V get(int key) {
+ int index = indexOf(key);
+ return index == -1 ? null : toExternal(values[index]);
+ }
+
+ @Override
+ public V put(int key, V value) {
+ int startIndex = hashIndex(key);
+ int index = startIndex;
+
+ for (; ; ) {
+ if (values[index] == null) {
+ // Found empty slot, use it.
+ keys[index] = key;
+ values[index] = toInternal(value);
+ growSize();
+ return null;
+ }
+ if (keys[index] == key) {
+ // Found existing entry with this key, just replace the value.
+ V previousValue = values[index];
+ values[index] = toInternal(value);
+ return toExternal(previousValue);
+ }
+
+ // Conflict, keep probing ...
+ if ((index = probeNext(index)) == startIndex) {
+ // Can only happen if the map was full at MAX_ARRAY_SIZE and
couldn't grow.
+ throw new IllegalStateException("Unable to insert");
+ }
+ }
+ }
+
+ @Override
+ public void putAll(Map<? extends Integer, ? extends V> sourceMap) {
+ if (sourceMap instanceof IntObjectHashMap) {
+ // Optimization - iterate through the arrays.
+ @SuppressWarnings("unchecked")
+ IntObjectHashMap<V> source = (IntObjectHashMap<V>) sourceMap;
+ for (int i = 0; i < source.values.length; ++i) {
+ V sourceValue = source.values[i];
+ if (sourceValue != null) {
+ put(source.keys[i], sourceValue);
+ }
+ }
+ return;
+ }
+
+ // Otherwise, just add each entry.
+ for (Entry<? extends Integer, ? extends V> entry :
sourceMap.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public V remove(int key) {
+ int index = indexOf(key);
+ if (index == -1) {
+ return null;
+ }
+
+ V prev = values[index];
+ removeAt(index);
+ return toExternal(prev);
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @Override
+ public void clear() {
+ Arrays.fill(keys, (int) 0);
+ Arrays.fill(values, null);
+ size = 0;
+ }
+
+ @Override
+ public boolean containsKey(int key) {
+ return indexOf(key) >= 0;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ @SuppressWarnings("unchecked")
+ V v1 = toInternal((V) value);
+ for (V v2 : values) {
+ // The map supports null values; this will be matched as
NULL_VALUE.equals(NULL_VALUE).
+ if (v2 != null && v2.equals(v1)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Iterable<PrimitiveEntry<V>> entries() {
+ return entries;
+ }
+
+ @Override
+ public Collection<V> values() {
+ return new AbstractCollection<V>() {
+ @Override
+ public Iterator<V> iterator() {
+ return new Iterator<V>() {
+ final PrimitiveIterator iter = new PrimitiveIterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public V next() {
+ return iter.next().value();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+ };
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+ };
+ }
+
+ @Override
+ public int hashCode() {
+ // Hashcode is based on all non-zero, valid keys. We have to scan the
whole keys
+ // array, which may have different lengths for two maps of same
size(), so the
+ // capacity cannot be used as input for hashing but the size can.
+ int hash = size;
+ for (int key : keys) {
+ // 0 can be a valid key or unused slot, but won't impact the
hashcode in either case.
+ // This way we can use a cheap loop without conditionals, or
hard-to-unroll operations,
+ // or the devastatingly bad memory locality of visiting value
objects.
+ // Also, it's important to use a hash function that does not
depend on the ordering
+ // of terms, only their values; since the map is an unordered
collection and
+ // entries can end up in different positions in different maps
that have the same
+ // elements, but with different history of puts/removes, due to
conflicts.
+ hash ^= hashCode(key);
+ }
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof IntObjectMap)) {
+ return false;
+ }
+ @SuppressWarnings("rawtypes")
+ IntObjectMap other = (IntObjectMap) obj;
+ if (size != other.size()) {
+ return false;
+ }
+ for (int i = 0; i < values.length; ++i) {
+ V value = values[i];
+ if (value != null) {
+ int key = keys[i];
+ Object otherValue = other.get(key);
+ if (value == NULL_VALUE) {
+ if (otherValue != null) {
+ return false;
+ }
+ } else if (!value.equals(otherValue)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return containsKey(objectToKey(key));
+ }
+
+ @Override
+ public V get(Object key) {
+ return get(objectToKey(key));
+ }
+
+ @Override
+ public V put(Integer key, V value) {
+ return put(objectToKey(key), value);
+ }
+
+ @Override
+ public V remove(Object key) {
+ return remove(objectToKey(key));
+ }
+
+ @Override
+ public Set<Integer> keySet() {
+ return keySet;
+ }
+
+ @Override
+ public Set<Entry<Integer, V>> entrySet() {
+ return entrySet;
+ }
+
+ private int objectToKey(Object key) {
+ return (int) (Integer) key;
+ }
+
+ /**
+ * Locates the index for the given key. This method probes using double
hashing.
+ *
+ * @param key the key for an entry in the map.
+ * @return the index where the key was found, or {@code -1} if no entry is
found for that key.
+ */
+ private int indexOf(int key) {
+ int startIndex = hashIndex(key);
+ int index = startIndex;
+
+ for (; ; ) {
+ if (values[index] == null) {
+ // It's available, so no chance that this value exists
anywhere in the map.
+ return -1;
+ }
+ if (key == keys[index]) {
+ return index;
+ }
+
+ // Conflict, keep probing ...
+ if ((index = probeNext(index)) == startIndex) {
+ return -1;
+ }
+ }
+ }
+
+ /** Returns the hashed index for the given key. */
+ private int hashIndex(int key) {
+ // The array lengths are always a power of two, so we can use a
bitmask to stay inside the
+ // array bounds.
+ return hashCode(key) & mask;
+ }
+
+ /** Returns the hash code for the key. */
+ private static int hashCode(int key) {
+ return key;
+ }
+
+ /** Get the next sequential index after {@code index} and wraps if
necessary. */
+ private int probeNext(int index) {
+ // The array lengths are always a power of two, so we can use a
bitmask to stay inside the
+ // array bounds.
+ return (index + 1) & mask;
+ }
+
+ /** Grows the map size after an insertion. If necessary, performs a rehash
of the map. */
+ private void growSize() {
+ size++;
+
+ if (size > maxSize) {
+ if (keys.length == Integer.MAX_VALUE) {
+ throw new IllegalStateException("Max capacity reached at
size=" + size);
+ }
+
+ // Double the capacity.
+ rehash(keys.length << 1);
+ }
+ }
+
+ /**
+ * Removes entry at the given index position. Also performs opportunistic,
incremental rehashing
+ * if necessary to not break conflict chains.
+ *
+ * @param index the index position of the element to remove.
+ * @return {@code true} if the next item was moved back. {@code false}
otherwise.
+ */
+ private boolean removeAt(final int index) {
+ --size;
+ // Clearing the key is not strictly necessary (for GC like in a
regular collection),
+ // but recommended for security. The memory location is still fresh in
the cache anyway.
+ keys[index] = 0;
+ values[index] = null;
+
+ // In the interval from index to the next available entry, the arrays
may have entries
+ // that are displaced from their base position due to prior conflicts.
Iterate these
+ // entries and move them back if possible, optimizing future lookups.
+ // Knuth Section 6.4 Algorithm R, also used by the JDK's
IdentityHashMap.
+
+ int nextFree = index;
+ int i = probeNext(index);
+ for (V value = values[i]; value != null; value = values[i =
probeNext(i)]) {
+ int key = keys[i];
+ int bucket = hashIndex(key);
+ if (i < bucket && (bucket <= nextFree || nextFree <= i)
+ || bucket <= nextFree && nextFree <= i) {
+ // Move the displaced entry "back" to the first available
position.
+ keys[nextFree] = key;
+ values[nextFree] = value;
+ // Put the first entry after the displaced entry
+ keys[i] = 0;
+ values[i] = null;
+ nextFree = i;
+ }
+ }
+ return nextFree != index;
+ }
+
+ /** Calculates the maximum size allowed before rehashing. */
+ private int calcMaxSize(int capacity) {
+ // Clip the upper bound so that there will always be at least one
available slot.
+ int upperBound = capacity - 1;
+ return Math.min(upperBound, (int) (capacity * loadFactor));
+ }
+
+ /**
+ * Rehashes the map for the given capacity.
+ *
+ * @param newCapacity the new capacity for the map.
+ */
+ private void rehash(int newCapacity) {
+ int[] oldKeys = keys;
+ V[] oldVals = values;
+
+ keys = new int[newCapacity];
+ @SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
+ V[] temp = (V[]) new Object[newCapacity];
+ values = temp;
+
+ maxSize = calcMaxSize(newCapacity);
+ mask = newCapacity - 1;
+
+ // Insert to the new arrays.
+ for (int i = 0; i < oldVals.length; ++i) {
+ V oldVal = oldVals[i];
+ if (oldVal != null) {
+ // Inlined put(), but much simpler: we don't need to worry
about
+ // duplicated keys, growing/rehashing, or failing to insert.
+ int oldKey = oldKeys[i];
+ int index = hashIndex(oldKey);
+
+ for (; ; ) {
+ if (values[index] == null) {
+ keys[index] = oldKey;
+ values[index] = oldVal;
+ break;
+ }
+
+ // Conflict, keep probing. Can wrap around, but never
reaches startIndex again.
+ index = probeNext(index);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (isEmpty()) {
+ return "{}";
+ }
+ StringBuilder sb = new StringBuilder(4 * size);
+ sb.append('{');
+ boolean first = true;
+ for (int i = 0; i < values.length; ++i) {
+ V value = values[i];
+ if (value != null) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(keyToString(keys[i]))
+ .append('=')
+ .append(value == this ? "(this Map)" :
toExternal(value));
+ first = false;
+ }
+ }
+ return sb.append('}').toString();
+ }
+
+ /**
+ * Helper method called by {@link #toString()} in order to convert a
single map key into a
+ * string. This is protected to allow subclasses to override the
appearance of a given key.
+ */
+ protected String keyToString(int key) {
+ return Integer.toString(key);
+ }
+
+ /** Set implementation for iterating over the entries of the map. */
+ private final class EntrySet extends AbstractSet<Entry<Integer, V>> {
+ @Override
+ public Iterator<Entry<Integer, V>> iterator() {
+ return new MapIterator();
+ }
+
+ @Override
+ public int size() {
+ return IntObjectHashMap.this.size();
+ }
+ }
+
+ /** Set implementation for iterating over the keys. */
+ private final class KeySet extends AbstractSet<Integer> {
+ @Override
+ public int size() {
+ return IntObjectHashMap.this.size();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return IntObjectHashMap.this.containsKey(o);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return IntObjectHashMap.this.remove(o) != null;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> retainedKeys) {
+ boolean changed = false;
+ for (Iterator<PrimitiveEntry<V>> iter = entries().iterator();
iter.hasNext(); ) {
+ PrimitiveEntry<V> entry = iter.next();
+ if (!retainedKeys.contains(entry.key())) {
+ changed = true;
+ iter.remove();
+ }
+ }
+ return changed;
+ }
+
+ @Override
+ public void clear() {
+ IntObjectHashMap.this.clear();
+ }
+
+ @Override
+ public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+ private final Iterator<Entry<Integer, V>> iter =
entrySet.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Integer next() {
+ return iter.next().getKey();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+ };
+ }
+ }
+
+ /**
+ * Iterator over primitive entries. Entry key/values are overwritten by
each call to {@link
+ * #next()}.
+ */
+ private final class PrimitiveIterator
+ implements Iterator<PrimitiveEntry<V>>, PrimitiveEntry<V> {
+ private int prevIndex = -1;
+ private int nextIndex = -1;
+ private int entryIndex = -1;
+
+ private void scanNext() {
+ while (++nextIndex != values.length && values[nextIndex] == null)
{}
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextIndex == -1) {
+ scanNext();
+ }
+ return nextIndex != values.length;
+ }
+
+ @Override
+ public PrimitiveEntry<V> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ prevIndex = nextIndex;
+ scanNext();
+
+ // Always return the same Entry object, just change its index each
time.
+ entryIndex = prevIndex;
+ return this;
+ }
+
+ @Override
+ public void remove() {
+ if (prevIndex == -1) {
+ throw new IllegalStateException("next must be called before
each remove.");
+ }
+ if (removeAt(prevIndex)) {
+ // removeAt may move elements "back" in the array if they have
been displaced
+ // because their spot in the
+ // array was occupied when they were inserted. If this occurs
then the nextIndex is
+ // now invalid and
+ // should instead point to the prevIndex which now holds an
element which was "moved
+ // back".
+ nextIndex = prevIndex;
+ }
+ prevIndex = -1;
+ }
+
+ // Entry implementation. Since this implementation uses a single
Entry, we coalesce that
+ // into the Iterator object (potentially making loop optimization much
easier).
+
+ @Override
+ public int key() {
+ return keys[entryIndex];
+ }
+
+ @Override
+ public V value() {
+ return toExternal(values[entryIndex]);
+ }
+
+ @Override
+ public void setValue(V value) {
+ values[entryIndex] = toInternal(value);
+ }
+ }
+
+ /** Iterator used by the {@link Map} interface. */
+ private final class MapIterator implements Iterator<Entry<Integer, V>> {
+ private final PrimitiveIterator iter = new PrimitiveIterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<Integer, V> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ iter.next();
+
+ return new MapEntry(iter.entryIndex);
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+ }
+
+ /** A single entry in the map. */
+ final class MapEntry implements Entry<Integer, V> {
+ private final int entryIndex;
+
+ MapEntry(int entryIndex) {
+ this.entryIndex = entryIndex;
+ }
+
+ @Override
+ public Integer getKey() {
+ verifyExists();
+ return keys[entryIndex];
+ }
+
+ @Override
+ public V getValue() {
+ verifyExists();
+ return toExternal(values[entryIndex]);
+ }
+
+ @Override
+ public V setValue(V value) {
+ verifyExists();
+ V prevValue = toExternal(values[entryIndex]);
+ values[entryIndex] = toInternal(value);
+ return prevValue;
+ }
+
+ private void verifyExists() {
+ if (values[entryIndex] == null) {
+ throw new IllegalStateException("The map entry has been
removed");
+ }
+ }
+ }
+
+ static int safeFindNextPositivePowerOfTwo(final int value) {
+ return value <= 0
+ ? 1
+ : value >= 0x40000000 ? 0x40000000 :
findNextPositivePowerOfTwo(value);
+ }
+
+ static int findNextPositivePowerOfTwo(final int value) {
+ assert value > Integer.MIN_VALUE && value < 0x40000000;
+ return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+ }
+}
diff --git
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
new file mode 100644
index 0000000..1dfbe15
--- /dev/null
+++
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/IntObjectMap.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on
eclipse-collections,
+// which carries a Category-B license (see
https://github.com/apache/arrow/issues/40896).
+// Without these override classes, Fluss ArrowReaderWriterTest fails with a
ClassNotFoundException.
+// This workaround should be removed once we upgrade to Arrow 16 or later.
+/**
+ * A vendored specialized copy of Netty's IntObjectMap for use within Arrow.
Avoids requiring Netty
+ * in the Arrow core just for this one class.
+ *
+ * @param <V> the value type stored in the map.
+ */
+interface IntObjectMap<V> extends Map<Integer, V> {
+
+ /**
+ * A primitive entry in the map, provided by the iterator from {@link
#entries()}.
+ *
+ * @param <V> the value type stored in the map.
+ */
+ interface PrimitiveEntry<V> {
+ /** Gets the key for this entry. */
+ int key();
+
+ /** Gets the value for this entry. */
+ V value();
+
+ /** Sets the value for this entry. */
+ void setValue(V value);
+ }
+
+ /**
+ * Gets the value in the map with the specified key.
+ *
+ * @param key the key whose associated value is to be returned.
+ * @return the value or {@code null} if the key was not found in the map.
+ */
+ V get(int key);
+
+ /**
+ * Puts the given entry into the map.
+ *
+ * @param key the key of the entry.
+ * @param value the value of the entry.
+ * @return the previous value for this key or {@code null} if there was no
previous mapping.
+ */
+ V put(int key, V value);
+
+ /**
+ * Removes the entry with the specified key.
+ *
+ * @param key the key for the entry to be removed from this map.
+ * @return the previous value for the key, or {@code null} if there was no
mapping.
+ */
+ V remove(int key);
+
+ /**
+ * Gets an iterable to traverse over the primitive entries contained in
this map. As an
+ * optimization, the {@link PrimitiveEntry}s returned by the {@link
Iterator} may change as the
+ * {@link Iterator} progresses. The caller should not rely on {@link
PrimitiveEntry} key/value
+ * stability.
+ */
+ Iterable<PrimitiveEntry<V>> entries();
+
+ /** Indicates whether or not this map contains a value for the specified
key. */
+ boolean containsKey(int key);
+}
diff --git
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
new file mode 100644
index 0000000..dc63689
--- /dev/null
+++
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MapWithOrdinalImpl.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on
eclipse-collections,
+// which carries a Category-B license (see
https://github.com/apache/arrow/issues/40896).
+// Without these override classes, Fluss ArrowReaderWriterTest fails with a
ClassNotFoundException.
+// This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * An implementation of map that supports constant time look-up by a generic
key or an ordinal.
+ *
+ * <p>This class extends the functionality a regular {@link Map} with ordinal
lookup support. Upon
+ * insertion an unused ordinal is assigned to the inserted (key, value) tuple.
Upon update the same
+ * ordinal id is re-used while value is replaced. Upon deletion of an existing
item, its
+ * corresponding ordinal is recycled and could be used by another item.
+ *
+ * <p>For any instance with N items, this implementation guarantees that
ordinals are in the range
+ * of [0, N). However, the ordinal assignment is dynamic and may change after
an insertion or
+ * deletion. Consumers of this class are responsible for explicitly checking
the ordinal
+ * corresponding to a key via {@link MapWithOrdinalImpl#getOrdinal(Object)}
before attempting to
+ * execute a lookup with an ordinal.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MapWithOrdinalImpl<K, V> implements MapWithOrdinal<K, V> {
+
+ private final Map<K, Map.Entry<Integer, V>> primary = new
LinkedHashMap<>();
+ private final IntObjectHashMap<V> secondary = new IntObjectHashMap<>();
+
+ private final Map<K, V> delegate =
+ new Map<K, V>() {
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ public int size() {
+ return primary.size();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return primary.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return primary.containsValue(value);
+ }
+
+ @Override
+ public V get(Object key) {
+ Entry<Integer, V> pair = primary.get(key);
+ if (pair != null) {
+ return pair.getValue();
+ }
+ return null;
+ }
+
+ @Override
+ public V put(K key, V value) {
+ final Entry<Integer, V> oldPair = primary.get(key);
+ // if key exists try replacing otherwise, assign a new
ordinal identifier
+ final int ordinal = oldPair == null ? primary.size() :
oldPair.getKey();
+ primary.put(key, new
AbstractMap.SimpleImmutableEntry<>(ordinal, value));
+ secondary.put(ordinal, value);
+ return oldPair == null ? null : oldPair.getValue();
+ }
+
+ @Override
+ public V remove(Object key) {
+ final Entry<Integer, V> oldPair = primary.remove(key);
+ if (oldPair != null) {
+ final int lastOrdinal = secondary.size();
+ final V last = secondary.get(lastOrdinal);
+ // normalize mappings so that all numbers until
primary.size() is assigned
+ // swap the last element with the deleted one
+ secondary.put(oldPair.getKey(), last);
+ primary.put(
+ (K) key,
+ new
AbstractMap.SimpleImmutableEntry<>(oldPair.getKey(), last));
+ }
+ return oldPair == null ? null : oldPair.getValue();
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ primary.clear();
+ secondary.clear();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return primary.keySet();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return secondary.values();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return primary.entrySet().stream()
+ .map(
+ entry ->
+ new
AbstractMap.SimpleImmutableEntry<>(
+ entry.getKey(),
entry.getValue().getValue()))
+ .collect(Collectors.toSet());
+ }
+ };
+
+ /**
+ * Returns the value corresponding to the given ordinal.
+ *
+ * @param id ordinal value for lookup
+ * @return an instance of V
+ */
+ @Override
+ public V getByOrdinal(int id) {
+ return secondary.get(id);
+ }
+
+ /**
+ * Returns the ordinal corresponding to the given key.
+ *
+ * @param key key for ordinal lookup
+ * @return ordinal value corresponding to key if it exists or -1
+ */
+ @Override
+ public int getOrdinal(K key) {
+ Map.Entry<Integer, V> pair = primary.get(key);
+ if (pair != null) {
+ return pair.getKey();
+ }
+ return -1;
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public Collection<V> getAll(K key) {
+ if (delegate.containsKey(key)) {
+ List<V> list = new ArrayList<>(1);
+ list.add(get(key));
+ return list;
+ }
+ return null;
+ }
+
+ @Override
+ public V get(K key) {
+ return delegate.get(key);
+ }
+
+ /**
+ * Inserts the tuple (key, value) into the map extending the semantics of
{@link Map#put} with
+ * automatic ordinal assignment. A new ordinal is assigned if key does not
exists. Otherwise the
+ * same ordinal is re-used but the value is replaced.
+ *
+ * @see Map#put
+ */
+ @Override
+ public boolean put(K key, V value, boolean overwrite) {
+ return delegate.put(key, value) != null;
+ }
+
+ @Override
+ public Collection<V> values() {
+ return delegate.values();
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ return false;
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key);
+ }
+
+ /**
+ * Removes the element corresponding to the key if exists extending the
semantics of {@link
+ * Map#remove} with ordinal re-cycling. The ordinal corresponding to the
given key may
+ * be re-assigned to another tuple. It is important that consumer checks
the ordinal value via
+ * {@link MapWithOrdinalImpl#getOrdinal(Object)} before attempting to
look-up by ordinal.
+ *
+ * @see Map#remove
+ */
+ @Override
+ public boolean removeAll(K key) {
+ return delegate.remove(key) != null;
+ }
+
+ @Override
+ public void clear() {
+ delegate.clear();
+ }
+
+ @Override
+ public Set<K> keys() {
+ return delegate.keySet();
+ }
+}
diff --git
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
new file mode 100644
index 0000000..ff77c80
--- /dev/null
+++
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/java/org/apache/arrow/vector/util/MultiMapWithOrdinal.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.vector.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+// TODO: This is a temporary workaround to resolve Arrow 15's dependency on
eclipse-collections,
+// which carries a Category-B license (see
https://github.com/apache/arrow/issues/40896).
+// Without these override classes, Fluss ArrowReaderWriterTest fails with a
ClassNotFoundException.
+// This workaround should be removed once we upgrade to Arrow 16 or later.
+
+/**
+ * An implementation of a multimap that supports constant time look-up by a
generic key or an
+ * ordinal.
+ *
+ * <p>This class extends the functionality a regular {@link Map} with ordinal
lookup support. Upon
+ * insertion an unused ordinal is assigned to the inserted (key, value) tuple.
Upon update the same
+ * ordinal id is re-used while value is replaced. Upon deletion of an existing
item, its
+ * corresponding ordinal is recycled and could be used by another item.
+ *
+ * <p>For any instance with N items, this implementation guarantees that
ordinals are in the range
+ * of [0, N). However, the ordinal assignment is dynamic and may change after
an insertion or
+ * deletion. Consumers of this class are responsible for explicitly checking
the ordinal
+ * corresponding to a key via {@link MultiMapWithOrdinal#getOrdinal(Object)}
before attempting to
+ * execute a lookup with an ordinal.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MultiMapWithOrdinal<K, V> implements MapWithOrdinal<K, V> {
+
+ private final Map<K, Set<Integer>> keyToOrdinal = new LinkedHashMap<>();
+ private final IntObjectHashMap<V> ordinalToValue = new
IntObjectHashMap<>();
+
+ /**
+ * Returns the value corresponding to the given ordinal.
+ *
+ * @param id ordinal value for lookup
+ * @return an instance of V
+ */
+ @Override
+ public V getByOrdinal(int id) {
+ return ordinalToValue.get(id);
+ }
+
+ /**
+ * Returns the ordinal corresponding to the given key.
+ *
+ * @param key key for ordinal lookup
+ * @return ordinal value corresponding to key if it exists or -1
+ */
+ @Override
+ public int getOrdinal(K key) {
+ Set<Integer> pair = getOrdinals(key);
+ if (!pair.isEmpty()) {
+ return pair.iterator().next();
+ }
+ return -1;
+ }
+
+ private Set<Integer> getOrdinals(K key) {
+ return keyToOrdinal.getOrDefault(key, new HashSet<>());
+ }
+
+ @Override
+ public int size() {
+ return ordinalToValue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ordinalToValue.isEmpty();
+ }
+
+ /** get set of values for key. */
+ @Override
+ public V get(K key) {
+ Set<Integer> ordinals = keyToOrdinal.get(key);
+ if (ordinals == null) {
+ return null;
+ }
+ return
ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList()).get(0);
+ }
+
+ /** get set of values for key. */
+ @Override
+ public Collection<V> getAll(K key) {
+ Set<Integer> ordinals = keyToOrdinal.get(key);
+ if (ordinals == null) {
+ return null;
+ }
+ return
ordinals.stream().map(ordinalToValue::get).collect(Collectors.toList());
+ }
+
+ /**
+ * Inserts the tuple (key, value) into the multimap with automatic ordinal
assignment.
+ *
+ * <p>A new ordinal is assigned if key/value pair does not exists.
+ *
+ * <p>If overwrite is true the existing key will be overwritten with value
else value will be
+ * appended to the multimap.
+ */
+ @Override
+ public boolean put(K key, V value, boolean overwrite) {
+ if (overwrite) {
+ removeAll(key);
+ }
+ Set<Integer> ordinalSet = getOrdinals(key);
+ int nextOrdinal = ordinalToValue.size();
+ ordinalToValue.put(nextOrdinal, value);
+ boolean changed = ordinalSet.add(nextOrdinal);
+ keyToOrdinal.put(key, ordinalSet);
+ return changed;
+ }
+
+ @Override
+ public Collection<V> values() {
+ return ordinalToValue.values();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return keyToOrdinal.containsKey(key);
+ }
+
+ /**
+ * Removes the element corresponding to the key/value if exists with
ordinal re-cycling.
+ *
+ * <p>The ordinal corresponding to the given key may be re-assigned to
another tuple. It is
+ * important that consumer checks the ordinal value via {@link
+ * MultiMapWithOrdinal#getOrdinal(Object)} before attempting to look-up by
ordinal.
+ *
+ * <p>If the multimap is changed return true.
+ */
+ @Override
+ public synchronized boolean remove(K key, V value) {
+ Set<Integer> removalSet = getOrdinals(key);
+ if (removalSet.isEmpty()) {
+ return false;
+ }
+ Optional<V> removeValue =
+
removalSet.stream().map(ordinalToValue::get).filter(value::equals).findFirst();
+ if (!removeValue.isPresent()) {
+ return false;
+ }
+ int removalOrdinal = removeKv(removalSet, key, value);
+ int lastOrdinal = ordinalToValue.size();
+ if (lastOrdinal != removalOrdinal) { // we didn't remove the last
ordinal
+ swapOrdinal(lastOrdinal, removalOrdinal);
+ }
+ return true;
+ }
+
+ private void swapOrdinal(int lastOrdinal, int removalOrdinal) {
+ V swapOrdinalValue = ordinalToValue.remove(lastOrdinal);
+ ordinalToValue.put(removalOrdinal, swapOrdinalValue);
+ K swapOrdinalKey =
+ keyToOrdinal.entrySet().stream()
+ .filter(kv -> kv.getValue().stream().anyMatch(o -> o
== lastOrdinal))
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "MultimapWithOrdinal in bad
state"));
+ ordinalToValue.put(removalOrdinal, swapOrdinalValue);
+ Set<Integer> swapSet = getOrdinals(swapOrdinalKey);
+ swapSet.remove(lastOrdinal);
+ swapSet.add(removalOrdinal);
+ keyToOrdinal.put(swapOrdinalKey, swapSet);
+ }
+
+ private int removeKv(Set<Integer> removalSet, K key, V value) {
+ Integer removalOrdinal =
+ removalSet.stream()
+ .filter(i -> ordinalToValue.get(i).equals(value))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "MultimapWithOrdinal in bad
state"));
+ ordinalToValue.remove(removalOrdinal);
+ removalSet.remove(removalOrdinal);
+ if (removalSet.isEmpty()) {
+ keyToOrdinal.remove(key);
+ } else {
+ keyToOrdinal.put(key, removalSet);
+ }
+ return removalOrdinal;
+ }
+
+ /** remove all entries of key. */
+ @Override
+ public synchronized boolean removeAll(K key) {
+ Collection<V> values = this.getAll(key);
+ if (values == null) {
+ return false;
+ }
+ for (V v : values) {
+ this.remove(key, v);
+ }
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ ordinalToValue.clear();
+ keyToOrdinal.clear();
+ }
+
+ @Override
+ public Set<K> keys() {
+ return keyToOrdinal.keySet();
+ }
+}
diff --git
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
index 176cd9c..c94901f 100644
---
a/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
+++
b/fluss-shaded-arrow-parent/fluss-shaded-arrow-15/src/main/resources/META-INF/NOTICE
@@ -6,8 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.arrow:arrow-vector:15.0.0
-- org.apache.arrow:arrow-format:15.0.0
-- org.apache.arrow:arrow-memory-core:15.0.0
-- org.apache.arrow:arrow-memory-netty:15.0.0
+- org.apache.arrow:arrow-vector:15.0.2
+- org.apache.arrow:arrow-format:15.0.2
+- org.apache.arrow:arrow-memory-core:15.0.2
+- org.apache.arrow:arrow-memory-netty:15.0.2
- com.google.flatbuffers:flatbuffers-java:23.5.26
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b76a9a0..38472ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
</pluginRepositories>
<properties>
+ <target.java.version>1.8</target.java.version>
<shading.prefix>org.apache.fluss.shaded</shading.prefix>
<netty.version>4.1.130.Final</netty.version>
<jackson.version>2.15.3</jackson.version>
@@ -173,6 +174,18 @@
<build>
<pluginManagement>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>${target.java.version}</source>
+ <target>${target.java.version}</target>
+ <!-- The semantics of this option are reversed, see
MCOMPILER-209. -->
+
<useIncrementalCompilation>false</useIncrementalCompilation>
+ </configuration>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>