Repository: hadoop Updated Branches: refs/heads/HDFS-7836 2214dab60 -> 087611208
HDFS-7844. Create an off-heap hash table implementation (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08761120 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08761120 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08761120 Branch: refs/heads/HDFS-7836 Commit: 0876112086d3995ce67f8f2dd26ef1fab563713a Parents: 2214dab Author: Colin Patrick Mccabe <[email protected]> Authored: Mon Mar 9 15:43:59 2015 -0700 Committer: Colin Patrick Mccabe <[email protected]> Committed: Mon Mar 9 15:43:59 2015 -0700 ---------------------------------------------------------------------- .../hadoop/fs/CommonConfigurationKeys.java | 3 + .../util/offheap/ByteArrayMemoryManager.java | 272 ++++++++ .../hadoop/util/offheap/MemoryManager.java | 107 ++++ .../util/offheap/NativeMemoryManager.java | 143 +++++ .../hadoop/util/offheap/ProbingHashTable.java | 636 +++++++++++++++++++ .../hadoop/util/offheap/TestMemoryManager.java | 202 ++++++ .../util/offheap/TestProbingHashTable.java | 392 ++++++++++++ 7 files changed, 1755 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 442dc7d..ece58e3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw"; + + public static final String HADOOP_MEMORY_MANAGER_KEY = "hadoop.memory.manager"; + public static final String HADOOP_MEMORY_MANAGER_DEFAULT = "org.apache.hadoop.util.offheap.NativeMemoryManager"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java new file mode 100644 index 0000000..57c7c76 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ByteArrayMemoryManager.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import java.io.IOException; +import java.lang.Long; +import java.lang.RuntimeException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ByteArrayMemoryManager is a memory manager which keeps all memory on the Java + * heap. It is useful for testing, since it peforms validation of all memory + * accesses and writes. It also can be used if sun.misc.Unsafe is not + * available, although its performance will be less than that of the off-heap + * code. + */ +@Private +@Unstable +public class ByteArrayMemoryManager implements MemoryManager { + static final Logger LOG = + LoggerFactory.getLogger(ByteArrayMemoryManager.class); + + private final static long MAX_ADDRESS = 0x3fffffffffffffffL; + + private final TreeMap<Long, byte[]> buffers = new TreeMap<Long, byte[]>(); + + private long curAddress = 1000; + + private final String name; + + public ByteArrayMemoryManager(String name) { + this.name = name; + LOG.debug("Created {}.", this); + } + + @Override + public synchronized void close() throws IOException { + Iterator<Entry<Long, byte[]>> iter = buffers.entrySet().iterator(); + if (iter.hasNext()) { + StringBuilder bld = new StringBuilder(); + Entry<Long, byte[]> entry = iter.next(); + bld.append(entryToString(entry)); + int numPrinted = 1; + while (iter.hasNext()) { + if (numPrinted >= 10) { + bld.append("..."); + break; + } + bld.append(", ").append(entryToString(entry)); + numPrinted++; + } + throw new RuntimeException("There are still unfreed buffers. " + + bld.toString()); + } + LOG.debug("Closed {}.", this); + } + + private static String entryToString(Entry<Long, byte[]> entry) { + StringBuilder bld = new StringBuilder(); + bld.append("Entry(base=0x").append(Long.toHexString(entry.getKey())). + append(", len=0x").append(Long.toHexString(entry.getValue().length)). + append(")"); + return bld.toString(); + } + + @Override + public synchronized long allocate(long size) { + if (curAddress + size > MAX_ADDRESS) { + throw new RuntimeException("Cannot allocate any more memory."); + } + if (size > 0x7fffffff) { + throw new RuntimeException("Attempted to allocate " + size + + " bytes, but we cannot allocate a Java byte array with " + + "more than 2^^31 entries."); + } + long addr = curAddress; + curAddress += size; + byte val[] = new byte[(int)size]; + buffers.put(Long.valueOf(addr), val); + LOG.trace("Allocated Entry(base=0x{}, len=0x{})", + Long.toHexString(addr), Long.toHexString(val.length)); + return addr; + } + + @Override + public synchronized long allocateZeroed(long size) { + // Java byte arrays are always zeroed on construction. + return allocate(size); + } + + @Override + public synchronized void free(long addr) { + byte val[] = buffers.remove(Long.valueOf(addr)); + if (val == null) { + LOG.error("Attempted to free unallocated address 0x{}", + Long.toHexString(addr)); + } else { + LOG.trace("Freed Entry(base=0x{}, len=0x{})", + Long.toHexString(addr), Long.toHexString(val.length)); + } + } + + private synchronized Entry<Long, byte[]> getEntry(long addr, String op) { + Entry<Long, byte[]> entry = buffers.floorEntry(Long.valueOf(addr)); + if (entry == null) { + throw new RuntimeException(op + " unallocated address 0x" + + Long.toHexString(addr)); + } + return entry; + } + + @Override + public synchronized byte getByte(long addr) { + Entry<Long, byte[]> entry = getEntry(addr, "Accessed"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 1 > arr.length) { + throw new RuntimeException("Attempted to read unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated area " + + "is " + entryToString(entry)); + } + int i = (int)off; + return arr[i]; + } + + @Override + public void putByte(long addr, byte val) { + Entry<Long, byte[]> entry = getEntry(addr, "Wrote to"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 1 > arr.length) { + throw new RuntimeException("Attempted to write to unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated area " + + "is " + entryToString(entry)); + } + int i = (int)off; + arr[i] = val; + } + + @Override + public synchronized short getShort(long addr) { + Entry<Long, byte[]> entry = getEntry(addr, "Accessed"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 2 > arr.length) { + throw new RuntimeException("Attempted to read unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + return (short)((arr[i + 0] & 0xff) << 8 | + (arr[i + 1] & 0xff)); + } + + @Override + public void putShort(long addr, short val) { + Entry<Long, byte[]> entry = getEntry(addr, "Wrote to"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 2 > arr.length) { + throw new RuntimeException("Attempted to write to unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + arr[i + 0] = (byte)((val >> 8) & 0xff); + arr[i + 1] = (byte)(val & 0xff); + } + + @Override + public int getInt(long addr) { + Entry<Long, byte[]> entry = getEntry(addr, "Accessed"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 4 > arr.length) { + throw new RuntimeException("Attempted to read unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + return arr[i + 0] << 24 | + (arr[i + 1] & 0xff) << 16 | + (arr[i + 2] & 0xff) << 8 | + (arr[i + 3] & 0xff); + } + + @Override + public void putInt(long addr, int val) { + Entry<Long, byte[]> entry = getEntry(addr, "Wrote to"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 4 > arr.length) { + throw new RuntimeException("Attempted to write to unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + arr[i + 0] = (byte)((val >> 24) & 0xff); + arr[i + 1] = (byte)((val >> 16) & 0xff); + arr[i + 2] = (byte)((val >> 8) & 0xff); + arr[i + 3] = (byte)(val & 0xff); + } + + @Override + public long getLong(long addr) { + Entry<Long, byte[]> entry = getEntry(addr, "Accessed"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 8 > arr.length) { + throw new RuntimeException("Attempted to read unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + return (arr[i + 0] & 0xffL) << 56 | + (arr[i + 1] & 0xffL) << 48 | + (arr[i + 2] & 0xffL) << 40 | + (arr[i + 3] & 0xffL) << 32 | + (arr[i + 4] & 0xffL) << 24 | + (arr[i + 5] & 0xffL) << 16 | + (arr[i + 6] & 0xffL) << 8 | + (arr[i + 7] & 0xffL); + } + + @Override + public void putLong(long addr, long val) { + Entry<Long, byte[]> entry = getEntry(addr, "Wrote to"); + long off = addr - entry.getKey(); + byte arr[] = entry.getValue(); + if (off + 8 > arr.length) { + throw new RuntimeException("Attempted to write to unallocated memory " + + "at 0x" + Long.toHexString(addr) + ". Closest lower allocated " + + "area is " + entryToString(entry)); + } + int i = (int)off; + arr[i + 0] = (byte)((val >> 56) & 0xff); + arr[i + 1] = (byte)((val >> 48) & 0xff); + arr[i + 2] = (byte)((val >> 40) & 0xff); + arr[i + 3] = (byte)((val >> 32) & 0xff); + arr[i + 4] = (byte)((val >> 24) & 0xff); + arr[i + 5] = (byte)((val >> 16) & 0xff); + arr[i + 6] = (byte)((val >> 8) & 0xff); + arr[i + 7] = (byte)(val & 0xff); + } + + @Override + public String toString() { + return "ByteArrayMemoryManager(" + name + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java new file mode 100644 index 0000000..24c67fc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/MemoryManager.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import java.io.Closeable; +import java.lang.Class; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Allocates memory which may be off-heap. + * + * MemoryManager objects are thread-safe. They can be used by multiple threads + * at once without additional synchronization. + */ +@Private +@Unstable +public interface MemoryManager extends Closeable { + /** + * Allocate a memory region. Will never return 0. + */ + long allocate(long size); + + /** + * Allocate a zeroed memory region. Will never return 0. + */ + long allocateZeroed(long size); + + /** + * Free memory. + */ + void free(long addr); + + byte getByte(long addr); + + void putByte(long addr, byte val); + + short getShort(long addr); + + void putShort(long addr, short val); + + int getInt(long addr); + + void putInt(long addr, int val); + + long getLong(long addr); + + void putLong(long addr, long val); + + String toString(); + + public static class Factory { + private static final Logger LOG = LoggerFactory.getLogger(Factory.class); + + /** + * Create a MemoryManager from a Configuration. + * + * @param conf The Configuration + * + * @return The MemoryManager. + */ + public static MemoryManager create(String name, Configuration conf) { + String memoryManagerKey = conf.get( + CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_KEY, + CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_DEFAULT); + if (memoryManagerKey == null) { + memoryManagerKey = NativeMemoryManager.class.getCanonicalName(); + } + Class<? extends MemoryManager> clazz = + (Class<? extends MemoryManager>)conf. + getClassByNameOrNull(memoryManagerKey); + if (clazz == null) { + LOG.error("Unable to locate {}: falling back on {}.", + memoryManagerKey, ByteArrayMemoryManager.class.getCanonicalName()); + } else if (clazz != ByteArrayMemoryManager.class) { + try { + return clazz.getConstructor(String.class).newInstance(name); + } catch (Throwable t) { + LOG.error("Unable to create {}. Falling back on {}", memoryManagerKey, + ByteArrayMemoryManager.class.getCanonicalName(), t); + } + } + return new ByteArrayMemoryManager(name); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java new file mode 100644 index 0000000..17a77e6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/NativeMemoryManager.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.Throwable; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +/** + * NativeMemoryManager is a memory manager which uses sun.misc.Unsafe to + * allocate memory off-heap. This memory will be allocated using the current + * platform's equivalent of malloc(). + */ +@Private +@Unstable +public class NativeMemoryManager implements MemoryManager { + static final Logger LOG = + LoggerFactory.getLogger(NativeMemoryManager.class); + + private final static Unsafe unsafe; + + private final static String loadingFailureReason; + + static { + Unsafe myUnsafe = null; + String myLoadingFailureReason = null; + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + myUnsafe = (Unsafe)f.get(null); + } catch (Throwable e) { + myLoadingFailureReason = e.getMessage(); + } finally { + unsafe = myUnsafe; + loadingFailureReason = myLoadingFailureReason; + } + } + + private final String name; + + public static boolean isAvailable() { + return loadingFailureReason == null; + } + + public NativeMemoryManager(String name) { + if (loadingFailureReason != null) { + LOG.error("Failed to load sun.misc.Unsafe: " + loadingFailureReason); + throw new RuntimeException("Failed to load sun.misc.Unsafe: " + + loadingFailureReason); + } + this.name = name; + LOG.debug("Created {}.", this); + } + + @Override + public void close() throws IOException { + // Nothing to do + LOG.debug("Closed {}.", this); + } + + @Override + public long allocate(long size) { + return unsafe.allocateMemory(size); + } + + @Override + public long allocateZeroed(long size) { + long addr = unsafe.allocateMemory(size); + unsafe.setMemory(addr, size, (byte)0); + return addr; + } + + @Override + public void free(long addr) { + unsafe.freeMemory(addr); + } + + @Override + public byte getByte(long addr) { + return unsafe.getByte(null, addr); + } + + @Override + public void putByte(long addr, byte val) { + unsafe.putByte(null, addr, val); + } + + @Override + public short getShort(long addr) { + return unsafe.getShort(null, addr); + } + + @Override + public void putShort(long addr, short val) { + unsafe.putShort(addr, val); + } + + @Override + public int getInt(long addr) { + return unsafe.getInt(null, addr); + } + + @Override + public void putInt(long addr, int val) { + unsafe.putInt(null, addr, val); + } + + @Override + public long getLong(long addr) { + return unsafe.getLong(null, addr); + } + + @Override + public void putLong(long addr, long val) { + unsafe.putLong(null, addr, val); + } + + @Override + public String toString() { + return "NativeMemoryManager(" + name + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java new file mode 100644 index 0000000..d0ac2f8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/offheap/ProbingHashTable.java @@ -0,0 +1,636 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A hash table which can be off-heaped and uses probing.<p/> + * + * Not thread-safe. Requires external synchronization.<p/> + * + * Each entry must be stored in a slot which takes a fixed number of bytes. We + * assume that slots which are zeroed are empty.<p/> + * + * This hash table does not implement the Java collection interface, because we + * want to avoid some of the limitations of that interface. For example, we + * want to be able to have more than 2^^32 entries and to be able to use a hash + * function which is wider than 32 bits.<p/> + * + * This hash table uses linear probing rather than separate chaining to handle + * hash collisions. When we hit a collision when inserting, we put the new + * element into the next open slot.<p/> + * + * When the hash table gets more than a certain percent full, we double the size + * of the table. This requires moving all existing entries.<p/> + */ +@Public +@Unstable +public class ProbingHashTable<K extends ProbingHashTable.Key, + E extends ProbingHashTable.Entry<K>> implements Closeable { + static final Logger LOG = + LoggerFactory.getLogger(ProbingHashTable.class); + + /** + * Adapts a given entry class to work with this hash table.<p/> + * + * Specifically, the Adaptor handles storing elements into slots, + * retrieving them, clearing slots, and getting the hash code for entries.<p/> + */ + public interface Adaptor<E> { + /** + * Get the slot size to use for this hash table. + * + * @return How many bytes each slot is in the hash table. + */ + int getSlotSize(); + + /** + * Load an entry from memory. + * + * @param addr The address to use to create the entry. + * + * @return null if the slot was empty; the entry, otherwise. + */ + E load(long addr); + + /** + * Store an entry to memory. + * + * @param e The element to store to memory. + * @param addr The address to store the element to. + */ + void store(E e, long addr); + + /** + * Clear a slot. + * + * @param addr The address to clear. + */ + void clear(long addr); + } + + public interface Key { + /** + * Get a 64-bit hash code for this key. + */ + long longHash(); + + /** + * Determine if this key equals another key. + */ + boolean equals(Object other); + + /** + * Get a human-readable representation of this key. + */ + String toString(); + } + + /** + * An entry in the ProbingHashTable. + */ + public interface Entry<K extends Key> { + /** + * Get the key for this entry. + */ + K getKey(); + } + + /** + * The minimum size to allow. + */ + private static long MIN_SIZE = 4; + + /** + * The name of this hash table. + */ + private final String name; + + /** + * The memory manager for this hash table. + */ + private final MemoryManager mman; + + /** + * The size of each slot in bytes. + */ + private final int slotSize; + + /** + * The adaptor to use. + */ + private final Adaptor<E> adaptor; + + /** + * The base address of the hash table. + */ + private long base; + + /** + * The current number of slots in the hash table. + */ + private long numSlots; + + /** + * The current number of entries in the hash table. + */ + private long numEntries; + + /** + * The maximum load factor for this hash table. + */ + private float maxLoadFactor; + + /** + * The number of entries we should double at. + */ + private long expansionThreshold; + + public static long roundUpToPowerOf2(long i) { + long r = 1; + while (r < i) { + r = r << 1; + } + return r; + } + + /** + * Create a new ProbingHashTable. + * + * @param name The name of the ProbingHashTable. + * @param mman The memory manager to use. + * @param adaptor The entry factory to use. + * @param initialSize The initial size of the hash table (in number of + * slots, not elements.) Will be rounded up to a + * power of 2. + * @param maxLoadFactor The maximum load factor to allow before doubling + * the hash table size. + */ + public ProbingHashTable(String name, MemoryManager mman, Adaptor<E> adaptor, + long initialSize, float maxLoadFactor) { + this.name = name; + this.mman = mman; + this.slotSize = adaptor.getSlotSize(); + this.adaptor = adaptor; + if (initialSize < MIN_SIZE) { + initialSize = MIN_SIZE; + } + this.numSlots = roundUpToPowerOf2((long)(initialSize / maxLoadFactor)); + long allocLen = numSlots * slotSize; + this.base = mman.allocateZeroed(allocLen); + this.numEntries = 0; + this.maxLoadFactor = maxLoadFactor; + this.expansionThreshold = (long)(numSlots * maxLoadFactor); + LOG.debug("Created ProbingHashTable(name={}, mman={}, slotSize={}, " + + "adaptor={}, numSlots={}, base=0x{}, allocLen=0x{}," + + "maxLoadFactor={}, expansionThreshold={})", + name, mman.toString(), slotSize, + adaptor.getClass().getCanonicalName(), numSlots, + Long.toHexString(base), Long.toHexString(allocLen), maxLoadFactor, + expansionThreshold); + Preconditions.checkArgument(maxLoadFactor > 0.0f); + Preconditions.checkArgument(maxLoadFactor < 1.0f); + } + + /** + * Frees the memory associated with this hash table and does error checking. + */ + public void close() throws IOException { + ProbingHashTableIterator iter = iterator(); + if (iter.hasNext()) { + StringBuilder bld = new StringBuilder(); + K k = iter.next(); + bld.append(k.toString()); + int numPrinted = 1; + while (iter.hasNext()) { + if (numPrinted >= 10) { + bld.append("..."); + break; + } + bld.append(", ").append(iter.next().toString()); + numPrinted++; + } + throw new RuntimeException("Attempted to close the hash table " + + " before all entries were removed. There are still " + numEntries + + " entries remaining, including " + bld.toString()); + } + free(); + } + + /** + * Frees the memory associated with this hash table. + */ + void free() throws IOException { + if (this.base != 0) { + LOG.debug("Freeing {}.", this); + mman.free(this.base); + this.base = 0; + } + } + + protected void finalize() throws Throwable { + try { + if (this.base != 0) { + LOG.error("Hash table {} was never closed.", this); + free(); + } + } finally { + super.finalize(); + } + } + + private long getSlot(K key, long nSlots) { + long hash = key.longHash(); + if (hash < 0) { + hash = -hash; + } + return hash % nSlots; + } + + private E getInternal(K key, boolean remove) { + long originalSlot = getSlot(key, numSlots); + long slot = originalSlot; + long addr; + E target = null; + K targetKey = null; + while (true) { + addr = this.base + (slot * slotSize); + target = adaptor.load(addr); + if (target == null) { + // By the compactness invariant, we're done. See below for more + // discussion. + LOG.trace("{}: getInternal(key={}, remove={}) found nothing.", + this, key, remove); + return null; + } + targetKey = target.getKey(); + if (targetKey.equals(key)) { + break; + } + slot++; + if (slot == numSlots) { + slot = 0; + } + if (slot == originalSlot) { + LOG.trace("{}: getInternal(key={}, remove={}) found nothing", + this, key, remove); + return null; + } + } + if (remove) { + adaptor.clear(addr); + numEntries--; + maintainCompactness(slot); + } + LOG.trace("{}: getInternal(key={}, remove={}) found {}", + this, key, remove, targetKey); + return target; + } + + /** + * Maintain the compactness invariant.<p/> + * + * In order to avoid doing a full array search when looking for an element + * that may not be in the hash table, we maintain a compactness invariant. + * The compactness invariant states that if we start at slot N and continue + * searching until we hit an empty slot, we will have searched all the + * possible places where the element could be. We maintain the compactness + * invariant by doing a little bit of extra work each time we delete an entry. + * Specifically, we search forwards from the deleted entry, moving any keys + * that need to be moved to maintain the invariant. We can stop searching + * when we hit an empty slot.<p/> + * + * Although maintaining the compactness invariant is O(N) in the worst case, + * it should be O(1) in the average case. This is because the hash table is + * half empty at all times. Assuming good hash dispersion, on average every + * other slot should be empty. Therefore, the average number of entries we + * move here should be less than 1.<p/> + */ + private void maintainCompactness(long startSlot) { + long slot = startSlot; + while (true) { + slot++; + if (slot == numSlots) { + slot = 0; + } + if (slot == startSlot) { + return; + } + long addr = this.base + (slot * slotSize); + E e = adaptor.load(addr); + if (e == null) { + return; + } + E prevE = putInternal(e, false); + if (prevE != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: {} was already in the right place.", + this, e.getKey()); + } + } else { + // The put didn't actually add anything, it just moved something. + // So decrement numEntries to its previous value. + numEntries--; + adaptor.clear(addr); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: moved {} to the right place.", + this, e.getKey()); + } + } + } + } + + public E remove(K key) { + return getInternal(key, true); + } + + public E get(K key) { + return getInternal(key, false); + } + + private void expandTable(long newNumSlots) { + LOG.info("{}: Expanding table from {} slots to {}...", + this, numSlots, newNumSlots); + long newBase = mman.allocateZeroed(newNumSlots * slotSize); + long oldNumSlots = this.numSlots; + long oldExpansionThreshold = this.expansionThreshold; + long oldBase = this.base; + long oldNumEntries = this.numEntries; + try { + // Switch the hash table over to using the new memory region. + long entriesRemaining = oldNumEntries; + this.numSlots = newNumSlots; + this.expansionThreshold = (long)(newNumSlots * maxLoadFactor); + this.base = newBase; + this.numEntries = 0; + + for (long slot = 0; slot < oldNumSlots; slot++) { + long addr = oldBase + (slot * slotSize); + E e = adaptor.load(addr); + if (e != null) { + E prevEntry = putInternal(e, false); + if (prevEntry != null) { + LOG.error("{}: Unexpected duplicate encountered when resizing " + + "hash table: entry {} duplicates {}.", this, + e.getKey(), prevEntry.getKey() + ); + } + entriesRemaining--; + } + } + if (entriesRemaining != 0) { + LOG.error("{}: Unexpectedly failed to locate {} entries that we " + + "thought we needed to move when resizing the hash table.", + this, entriesRemaining + ); + } + LOG.info("{}: Finished expanding hash table from {} slots to {}. " + + "Moved {} keys. Freed old memory base 0x{}. Using new memory " + + "base 0x{}.", this, oldNumSlots, numSlots, numEntries, + Long.toHexString(oldBase), Long.toHexString(newBase)); + } catch (Throwable t) { + // In general we should never get here, since the functions used + // above should not throw exceptions. But it's nice to be safe. + LOG.error("{}: expanding failed! Restoring old memory region.", this, t); + + // Switch back to using the old memory region. + this.numSlots = oldNumSlots; + this.expansionThreshold = oldExpansionThreshold; + this.base = oldBase; + this.numEntries = oldNumEntries; + mman.free(newBase); + throw new RuntimeException("Failed to expand " + this, t); + } + mman.free(oldBase); + } + + /** + * Expand the hash table if it would need to expand to hold another key. + */ + private void expandTableIfNeeded() { + if (numEntries > expansionThreshold) { + expandTable(numSlots * 2L); + } + } + + /** + * Put the entry into the hash table if there is no entry in the hash table + * which is equivalent. + * + * @param putEntry The entry to add if absent. + * @param overwrite If true, we will overwrite the entry which is equal + * to putEntry (if there is one.) If false, we will + * simply return that entry, but not overwrite it. + * + * @return The previous entry in the hash table that was equal + * to the one we wanted to insert. null if there + * was no such entry. + */ + private E putInternal(E putEntry, boolean overwrite) { + long slot = getSlot(putEntry.getKey(), numSlots); + K putKey = putEntry.getKey(); + + while (true) { + long addr = this.base + (slot * slotSize); + E e = adaptor.load(addr); + if (e == null) { + adaptor.store(putEntry, addr); + numEntries++; + if (LOG.isTraceEnabled()) { + LOG.trace("{}: stored {} into slot {} (addr 0x{})", + this, putKey, slot, Long.toHexString(addr)); + } + return null; + } + K k = e.getKey(); + if (k.equals(putKey)) { + if (!overwrite) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: could not store {} because we found an " + + "equivalent key {} in slot {} (addr 0x{})", + this, putKey, k, slot, Long.toHexString(addr)); + } + return e; + } + // Overwrite the existing entry. + adaptor.store(putEntry, addr); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: stored {} by overwriting the equivalent key {} " + + "in slot {} (addr 0x{})", this, putKey, k, slot, + Long.toHexString(addr)); + } + return e; + } + slot++; + if (slot == numSlots) { + slot = 0; + } + } + } + + /** + * Put the entry into the hash table if there is no entry in the hash table + * which is equivalent. + * + * @param putEntry The entry to add. + * + * @return Null if the element was inserted. + * Otherwise, returns the previous element that compares + * to be the same as the one we unsuccessfully tried to + * add. + */ + public E putIfAbsent(E putEntry) { + expandTableIfNeeded(); // call this first in case it fails (very unlikely) + return putInternal(putEntry, false); + } + + /** + * Put an entry into the hash table, overwriting any existing element + * which is equivalent. + * + * @param putEntry The entry to add. + * + * @return null if there was no element in the table which was + * equivalent... the existing element which was + * equivalent, otherwise. The existing element will + * be removed. + */ + public E put(E putEntry) { + expandTableIfNeeded(); // call this first in case it fails (very unlikely) + return putInternal(putEntry, true); + } + + /** + * Returns the current number of slots in the hash table. + */ + public long numSlots() { + return numSlots; + } + + /** + * Returns the size of the table. + */ + public long size() { + return numEntries; + } + + /** + * Returns true if the table is empty. + */ + public boolean isEmpty() { + return numEntries == 0; + } + + /** + * An iterator for the ProbingHashTable.<p/> + * + * Since ProbingHashTable has no internal synchronization, you are responsible + * for ensuring that there are no concurrent write operations on the hash + * table while an iterator function is being called. The easiest way to do + * this is with external locking.<p/> + * + * You can still perform write operations after creating this iterator + * without invalidating the iterator object. There are a few caveats:<p/> + * 1. Keys inserted after the iterator was created may or may not be + * returned by the iterator.<p/> + * 2. If the hash table is enlarged due to adding more keys, this iterator + * may return keys more than once, and return some keys not at all.<p/> + */ + private class ProbingHashTableIterator implements Iterator<K> { + private long slotId = 0; + private K curKey; + + private boolean refillCurKey() { + while (slotId < ProbingHashTable.this.numSlots) { + long addr = base + (slotId * slotSize); + E e = adaptor.load(addr); + slotId++; + if (e != null) { + curKey = e.getKey(); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: iterator found another key {} at slot {} " + + "(address 0x{})", ProbingHashTable.this.toString(), curKey, + (slotId - 1), Long.toHexString(addr)); + } + return true; + } + } + LOG.trace("{}: no more keys to iterate over after reading all {} " + + "slots.", ProbingHashTable.this.toString(), slotId); + // Set slotId to Long.MAX_VALUE so that even if the hash table enlarges + // in the future, this iterator will continue to be at the end. + slotId = Long.MAX_VALUE; + return false; + } + + @Override + public boolean hasNext() { + if (curKey != null) { + return true; + } + return refillCurKey(); + } + + @Override + public K next() { + if (curKey == null) { + if (!refillCurKey()) { + throw new IllegalStateException(); + } + } + K key = curKey; + curKey = null; + return key; + } + + @Override + public void remove() { + if (curKey == null) { + throw new IllegalStateException(); + } + K key = curKey; + curKey = null; + if (ProbingHashTable.this.remove(key) == null) { + throw new NoSuchElementException("No such element as " + + key.toString()); + } + } + } + + public ProbingHashTableIterator iterator() { + return new ProbingHashTableIterator(); + } + + @Override + public String toString() { + return "ProbingHashTable(" + name + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java new file mode 100644 index 0000000..6ffd37f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestMemoryManager.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestMemoryManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestMemoryManager.class); + + @Test(timeout=60000) + public void testAllocateAndFreeOnHeap() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + testAllocateAndFree(mman); + mman.close(); + } + + @Test(timeout=60000) + public void testAllocateAndFreeOffHeap() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + NativeMemoryManager mman = new NativeMemoryManager("test"); + testAllocateAndFree(mman); + mman.close(); + } + + private void testAllocateAndFree(MemoryManager mman) throws Exception { + long addr = mman.allocate(100); + Assert.assertTrue("Expected addr to be non-zero.", addr != 0); + mman.free(addr); + } + + @Test(timeout=60000) + public void testGetAndSetOnHeap() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + testGetAndSet(mman); + mman.close(); + } + + @Test(timeout=60000) + public void testGetAndSetOffHeap() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + NativeMemoryManager mman = new NativeMemoryManager("test"); + testGetAndSet(mman); + mman.close(); + } + + private void testGetAndSet(MemoryManager mman) throws Exception { + LOG.info("testingGetAndSet with " + mman.getClass().getCanonicalName()); + long byteAddr = mman.allocateZeroed(1); + Assert.assertTrue("Expected addr to be non-zero.", byteAddr != 0); + byte b = mman.getByte(byteAddr); + Assert.assertEquals((byte)0, b); + mman.putByte(byteAddr, (byte) 42); + b = mman.getByte(byteAddr); + Assert.assertEquals((byte)42, b); + + long intAddr = mman.allocateZeroed(4); + Assert.assertTrue("Expected addr to be non-zero.", intAddr != 0); + int i = mman.getInt(intAddr); + Assert.assertEquals(0, i); + mman.putInt(intAddr, 0xfea01234); + i = mman.getInt(intAddr); + Assert.assertEquals(0xfea01234, i); + + long shortAddr = mman.allocateZeroed(2); + Assert.assertTrue("Expected addr to be non-zero.", shortAddr != 0); + short s = mman.getShort(shortAddr); + Assert.assertEquals(0, s); + mman.putShort(shortAddr, (short) 0xeecc); + s = mman.getShort(shortAddr); + Assert.assertEquals((short)0xeecc, s); + + long longAddr = mman.allocateZeroed(8); + Assert.assertTrue("Expected addr to be non-zero.", longAddr != 0); + long l = mman.getLong(longAddr); + Assert.assertEquals(0, l); + long testVal = 0x3ea0123400112233L; + LOG.info("longAddr = " + longAddr + ", testVal = " + testVal); + mman.putLong(longAddr, testVal); + l = mman.getLong(longAddr); + LOG.info("got back " + l + " from " + longAddr); + Assert.assertEquals(testVal, l); + + mman.free(byteAddr); + mman.free(intAddr); + mman.free(shortAddr); + mman.free(longAddr); + } + + @Test(timeout=60000) + public void testCatchInvalidPuts() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + long addr = mman.allocate(1); + mman.putByte(addr, (byte)1); // should succeed + try { + mman.putInt(addr, 0xdeadbeef); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + try { + mman.putByte(addr + 1, (byte) 1); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + try { + mman.putLong(addr, 11111111111L); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + mman.free(addr); + try { + mman.putByte(addr, (byte)1); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + try { + mman.putShort(addr, (short) 101); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + try { + mman.putInt(addr + 1, 0xfaceface); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + try { + mman.putLong(addr, 0xf00L); + Assert.fail("expected to catch invalid put"); + } catch (RuntimeException e) { + } + mman.close(); + } + + private void testMemoryManagerCreate( + String className, String createdClassName) throws Exception { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.HADOOP_MEMORY_MANAGER_KEY, className); + MemoryManager mman = MemoryManager.Factory.create("test", conf); + Assert.assertNotNull(mman); + Assert.assertEquals(createdClassName, mman.getClass().getCanonicalName()); + mman.close(); + } + + @Test(timeout=60000) + public void testByteBufferMemoryManagerCreate() throws Exception { + testMemoryManagerCreate( + "org.apache.hadoop.util.offheap.ByteArrayMemoryManager", + "org.apache.hadoop.util.offheap.ByteArrayMemoryManager"); + } + + @Test(timeout=60000) + public void testNativeMemoryManagerCreate() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + testMemoryManagerCreate( + "org.apache.hadoop.util.offheap.NativeMemoryManager", + "org.apache.hadoop.util.offheap.NativeMemoryManager"); + } + + @Test(timeout=60000) + public void testDefaultMemoryManagerCreate() throws Exception { + testMemoryManagerCreate( + "org.apache.hadoop.util.offheap.NonExistentMemoryManager", + "org.apache.hadoop.util.offheap.ByteArrayMemoryManager"); + } + + @Test(timeout=60000) + public void testByteBufferMemoryDirtyClose() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + long addr = mman.allocate(1); + try { + mman.close(); + Assert.fail("expected close to fail since we did not free all " + + "allocations first."); + } catch (RuntimeException e) { + GenericTestUtils.assertExceptionContains("There are still unfreed " + + "buffers", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/08761120/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java new file mode 100644 index 0000000..bb68f35 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/offheap/TestProbingHashTable.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.offheap; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; + +public class TestProbingHashTable { + private static final Logger LOG = + LoggerFactory.getLogger(TestProbingHashTable.class); + + @Before + public void before() { + GenericTestUtils.setLogLevel(ProbingHashTable.LOG, Level.ALL); + GenericTestUtils.setLogLevel(NativeMemoryManager.LOG, Level.ALL); + GenericTestUtils.setLogLevel(ByteArrayMemoryManager.LOG, Level.ALL); + } + + private static class TestBlockId implements ProbingHashTable.Key { + private static final HashFunction hashFunction = Hashing.goodFastHash(64); + + private final long id; + + TestBlockId(long id) { + this.id = id; + } + + @Override + public long longHash() { + return hashFunction.newHasher().putLong(id).hash().asLong(); + } + + @Override + public boolean equals(Object o) { + if (o.getClass() != this.getClass()) { + return false; + } + return id == ((TestBlockId)o).id; + } + + @Override + public String toString() { + return "TestBlockId(0x" + Long.toHexString(id) + ")"; + } + + // Just for use with java.util.HashMap + @Override + public int hashCode() { + return (int)(id & 0xfffffff) ^ (int)((id >> 32) & 0xffffffff); + } + } + + private static class TestBlockInfo + implements ProbingHashTable.Entry<TestBlockId>, Closeable { + /** + * The memory manager to use. + */ + private MemoryManager mman; + + /** + * The address of the block reference. + */ + private long addr; + + private static final long BLOCK_ID_OFF = 0; + + private static final long TOTAL_LEN = 8; + + static TestBlockInfo allocZeroed(MemoryManager mman) { + long addr = mman.allocateZeroed(TOTAL_LEN); + return new TestBlockInfo(mman, addr); + } + + TestBlockInfo(MemoryManager mman, long addr) { + this.mman = mman; + this.addr = addr; + } + + public long getBlockId() { + return mman.getLong(addr + BLOCK_ID_OFF); + } + + public void setBlockId(long blockId) { + mman.putLong(addr + BLOCK_ID_OFF, blockId); + } + + public void close() throws IOException { + mman.free(addr); + } + + @Override + public boolean equals(Object o) { + if (o.getClass() != TestBlockInfo.class) { + return false; + } + TestBlockInfo other = (TestBlockInfo)o; + return (other.getBlockId() == getBlockId()); + } + + @Override + public TestBlockId getKey() { + return new TestBlockId(getBlockId()); + } + } + + private static class TestBlockInfoAdaptor + implements ProbingHashTable.Adaptor<TestBlockInfo> { + private final MemoryManager mman; + + TestBlockInfoAdaptor(MemoryManager mman) { + this.mman = mman; + } + + @Override + public int getSlotSize() { + return 8; + } + + @Override + public TestBlockInfo load(long addr) { + long infoAddr = mman.getLong(addr); + if (infoAddr == 0) { + return null; + } + return new TestBlockInfo(mman, infoAddr); + } + + @Override + public void store(TestBlockInfo info, long addr) { + mman.putLong(addr, info.addr); + } + + @Override + public void clear(long addr) { + mman.putLong(addr, 0L); + } + } + + private void testAllocateAndFree(MemoryManager mman) throws Exception { + TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman); + ProbingHashTable<TestBlockId, TestBlockInfo> htable = + new ProbingHashTable<TestBlockId, TestBlockInfo>( + "testAllocateAndFreeTable", mman, adaptor, 100, 0.5f); + // should have been rounded up to 256 + Assert.assertEquals(256, htable.numSlots()); + htable.close(); + } + + @Test(timeout=60000) + public void testAllocateAndFreeOnHeap() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + testAllocateAndFree(mman); + mman.close(); + } + + @Test(timeout=60000) + public void testAllocateAndFreeOffHeap() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + NativeMemoryManager mman = new NativeMemoryManager("test"); + testAllocateAndFree(mman); + mman.close(); + } + + private static TestBlockInfo[] createBlockInfos(MemoryManager mman, + int initialBlockId, int numBlocks) { + TestBlockInfo infos[] = new TestBlockInfo[numBlocks]; + boolean success = false; + try { + for (int i = 0; i < numBlocks; i++) { + infos[i] = TestBlockInfo.allocZeroed(mman); + infos[i].setBlockId(initialBlockId + i); + LOG.info("allocated infos[{}] with id {}", i, infos[i].getBlockId()); + } + success = true; + return infos; + } finally { + if (!success) { + freeBlockInfos(infos); + } + } + } + + private static void freeBlockInfos(TestBlockInfo[] infos) { + if (infos != null) { + for (int i = 0; i < infos.length; i++) { + if (infos[i] != null) { + IOUtils.cleanup(null, infos[i]); + } + } + } + } + + private void testAddRemove(MemoryManager mman) throws Exception { + TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman); + ProbingHashTable<TestBlockId, TestBlockInfo> htable = + new ProbingHashTable<TestBlockId, TestBlockInfo>( + "testAddRemoveTable", mman, adaptor, 10, 0.5f); + TestBlockInfo infos[] = null; + Assert.assertEquals(32, htable.numSlots()); + Assert.assertTrue(htable.isEmpty()); + Assert.assertEquals(0, htable.size()); + infos = createBlockInfos(mman, 1, 6); + for (int i = 0; i < infos.length; i++) { + LOG.info("Putting {} into {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.putIfAbsent(infos[i]); + Assert.assertEquals(null, prev); + } + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(infos.length, htable.size()); + + // Test that we can iterate over all elements in the hash table. + Iterator<TestBlockId> iter = htable.iterator(); + Assert.assertNotNull(iter); + HashSet<TestBlockId> contents = new HashSet<TestBlockId>(); + for (TestBlockInfo info : infos) { + contents.add(info.getKey()); + } + for (int i = 0; i < infos.length; i++) { + Assert.assertTrue(iter.hasNext()); + TestBlockId blockId = iter.next(); + Assert.assertTrue("Iterator returned " + blockId + ", which was " + + "not inserted into the HashTable.", contents.remove(blockId)); + } + Assert.assertFalse(iter.hasNext()); + Assert.assertEquals("Did not find " + contents.size() + " entries " + + "from the hash table during iteration.", 0, contents.size()); + + for (int i = 0; i < infos.length; i++) { + LOG.info("Removing {} from {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.remove(infos[i].getKey()); + Assert.assertNotNull("unable to remove " + infos[i].getKey() + + " from the ProbingHashTable.", prev); + } + Assert.assertTrue(htable.isEmpty()); + freeBlockInfos(infos); + htable.close(); + } + + @Test(timeout=60000) + public void testAddRemoveOnHeap() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + testAddRemove(mman); + mman.close(); + } + + @Test(timeout=60000) + public void testAddRemoveOffHeap() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + NativeMemoryManager mman = new NativeMemoryManager("test"); + testAddRemove(mman); + mman.close(); + } + + private void testEnlargeHashTable(MemoryManager mman) throws Exception { + TestBlockInfoAdaptor adaptor = new TestBlockInfoAdaptor(mman); + ProbingHashTable<TestBlockId, TestBlockInfo> htable = + new ProbingHashTable<TestBlockId, TestBlockInfo>( + "testEnlargeHashTable", mman, adaptor, 4, 0.5f); + TestBlockInfo infos[] = null; + Assert.assertEquals(8, htable.numSlots()); + Assert.assertTrue(htable.isEmpty()); + Assert.assertEquals(0, htable.size()); + infos = createBlockInfos(mman, 1, 33); + for (int i = 0; i < 4; i++) { + LOG.info("Putting {} into {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.putIfAbsent(infos[i]); + Assert.assertEquals(null, prev); + } + Assert.assertEquals(8, htable.numSlots()); + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(4, htable.size()); + for (int i = 4; i < 8; i++) { + LOG.info("Putting {} into {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.putIfAbsent(infos[i]); + Assert.assertEquals(null, prev); + } + Assert.assertEquals(16, htable.numSlots()); + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(8, htable.size()); + + for (int i = 8; i < 16; i++) { + LOG.info("Putting {} into {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.putIfAbsent(infos[i]); + Assert.assertEquals(null, prev); + } + Assert.assertEquals(32, htable.numSlots()); + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(16, htable.size()); + + for (int i = 16; i < infos.length; i++) { + LOG.info("Putting {} into {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.putIfAbsent(infos[i]); + Assert.assertEquals(null, prev); + } + Assert.assertEquals(64, htable.numSlots()); + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(33, htable.size()); + + // Delete every other element + for (int i = 0; i < infos.length; i+=2) { + LOG.info("Removing {} from {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.remove(infos[i].getKey()); + Assert.assertNotNull("unable to remove " + infos[i].getKey() + + " from the ProbingHashTable.", prev); + } + Assert.assertEquals(64, htable.numSlots()); + Assert.assertFalse(htable.isEmpty()); + Assert.assertEquals(16, htable.size()); + + // Test that we can iterate over all remaining elements in the hash set. + Iterator<TestBlockId> iter = htable.iterator(); + Assert.assertNotNull(iter); + HashSet<TestBlockId> contents = new HashSet<TestBlockId>(); + for (int i = 1; i < infos.length; i+=2) { + contents.add(infos[i].getKey()); + } + for (int i = 1; i < infos.length; i+=2) { + Assert.assertTrue(iter.hasNext()); + TestBlockId blockId = iter.next(); + Assert.assertTrue("Iterator returned " + blockId + ", which was " + + "not inserted into the HashTable.", contents.remove(blockId)); + } + Assert.assertFalse(iter.hasNext()); + Assert.assertEquals("Did not find " + contents.size() + " entries " + + "from the hash table during iteration.", 0, contents.size()); + + // Delete remaining elements + for (int i = 1; i < infos.length; i+=2) { + LOG.info("Removing {} from {}", infos[i].getKey(), htable); + TestBlockInfo prev = htable.remove(infos[i].getKey()); + Assert.assertNotNull("unable to remove " + infos[i].getKey() + + " from the ProbingHashTable.", prev); + } + Assert.assertEquals(64, htable.numSlots()); + Assert.assertTrue(htable.isEmpty()); + Assert.assertEquals(0, htable.size()); + + iter = htable.iterator(); + Assert.assertNotNull(iter); + Assert.assertFalse(iter.hasNext()); + + freeBlockInfos(infos); + htable.close(); + } + + @Test(timeout=60000) + public void testEnlargeHashTableOnHeap() throws Exception { + ByteArrayMemoryManager mman = new ByteArrayMemoryManager("test"); + testEnlargeHashTable(mman); + mman.close(); + } + + @Test(timeout=60000) + public void testEnlargeHashTableOffHeap() throws Exception { + Assume.assumeTrue(NativeMemoryManager.isAvailable()); + NativeMemoryManager mman = new NativeMemoryManager("test"); + testEnlargeHashTable(mman); + mman.close(); + } +}
