HIVE-20513: Vectorization: Improve Fast Vector MapJoin Bytes Hash Tables (Matt 
McCline, reviewed by Zoltan Haindrich)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ff98a30a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ff98a30a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ff98a30a

Branch: refs/heads/master
Commit: ff98a30ab49c4eafe53974e03c9dd205c14ffee7
Parents: 494b771
Author: Matt McCline <[email protected]>
Authored: Mon Sep 10 04:24:35 2018 -0500
Committer: Matt McCline <[email protected]>
Committed: Mon Sep 10 04:24:35 2018 -0500

----------------------------------------------------------------------
 .../fast/VectorMapJoinFastBytesHashKeyRef.java  | 178 ++++++
 .../fast/VectorMapJoinFastBytesHashMap.java     | 141 +++--
 .../VectorMapJoinFastBytesHashMapStore.java     | 559 +++++++++++++++++++
 .../VectorMapJoinFastBytesHashMultiSet.java     | 132 ++++-
 ...VectorMapJoinFastBytesHashMultiSetStore.java | 280 ++++++++++
 .../fast/VectorMapJoinFastBytesHashSet.java     | 124 +++-
 .../VectorMapJoinFastBytesHashSetStore.java     | 219 ++++++++
 .../fast/VectorMapJoinFastBytesHashTable.java   | 148 ++---
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |   6 +-
 .../fast/TestVectorMapJoinFastBytesHashMap.java |   3 +
 .../fast/TestVectorMapJoinFastLongHashMap.java  |   3 +
 .../clientpositive/bucket_map_join_tez2.q       |   2 +-
 .../test/queries/clientpositive/tez_smb_main.q  |   3 +-
 .../results/clientpositive/llap/orc_llap.q.out  |  59 +-
 .../apache/hadoop/hive/serde2/WriteBuffers.java |  53 ++
 15 files changed, 1661 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashKeyRef.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashKeyRef.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashKeyRef.java
new file mode 100644
index 0000000..dbfe518
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashKeyRef.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.serde2.WriteBuffers;
+// import com.google.common.base.Preconditions;
+
+public class VectorMapJoinFastBytesHashKeyRef {
+
+  public static boolean equalKey(long refWord, byte[] keyBytes, int keyStart, 
int keyLength,
+      WriteBuffers writeBuffers, WriteBuffers.Position readPos) {
+
+    // Preconditions.checkState((refWord & KeyRef.IsInvalidFlag.flagOnMask) == 
0);
+
+    final long absoluteOffset = KeyRef.getAbsoluteOffset(refWord);
+
+    writeBuffers.setReadPoint(absoluteOffset, readPos);
+
+    int actualKeyLength = KeyRef.getSmallKeyLength(refWord);
+    boolean isKeyLengthSmall = (actualKeyLength != 
KeyRef.SmallKeyLength.allBitsOn);
+    if (!isKeyLengthSmall) {
+
+      // And, if current value is big we must read it.
+      actualKeyLength = writeBuffers.readVInt(readPos);
+    }
+
+    if (actualKeyLength != keyLength) {
+      return false;
+    }
+
+    // Our reading was positioned to the key.
+    if (!writeBuffers.isEqual(keyBytes, keyStart, readPos, keyLength)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public static int calculateHashCode(long refWord, WriteBuffers writeBuffers,
+      WriteBuffers.Position readPos) {
+
+    // Preconditions.checkState((refWord & KeyRef.IsInvalidFlag.flagOnMask) == 
0);
+
+    final long absoluteOffset = KeyRef.getAbsoluteOffset(refWord);
+
+    int actualKeyLength = KeyRef.getSmallKeyLength(refWord);
+    boolean isKeyLengthSmall = (actualKeyLength != 
KeyRef.SmallKeyLength.allBitsOn);
+    final long keyAbsoluteOffset;
+    if (!isKeyLengthSmall) {
+
+      // Position after next relative offset (fixed length) to the key.
+      writeBuffers.setReadPoint(absoluteOffset, readPos);
+
+      // And, if current value is big we must read it.
+      actualKeyLength = writeBuffers.readVInt(readPos);
+      keyAbsoluteOffset = absoluteOffset + actualKeyLength;
+    } else {
+      keyAbsoluteOffset = absoluteOffset;
+    }
+
+    return writeBuffers.unsafeHashCode(keyAbsoluteOffset, actualKeyLength);
+  }
+
+  public static final class KeyRef {
+
+    // Lowest field.
+    public static final class PartialHashCode {
+      public static final int bitLength = 15;
+      public static final long allBitsOn = (1L << bitLength) - 1;
+      public static final long bitMask = allBitsOn;
+
+      // Choose the high bits of the hash code KNOWING it was calculated as an 
int.
+      //
+      // We want the partial hash code to be different than the
+      // lower bits used for our hash table slot calculations.
+      public static final int intChooseBitShift = Integer.SIZE - bitLength;
+    }
+
+    public static long getPartialHashCode(long refWord) {
+      // No shift needed since this is the lowest field.
+      return refWord & PartialHashCode.bitMask;
+    }
+
+    // Can make the 64 bit reference non-zero if this is non-zero.  E.g. for 
hash map and
+    // hash multi-set, the offset is to the first key which is always preceded 
by a 5 byte next
+    // relative value offset or 4 byte count.
+    public static final class AbsoluteOffset {
+      public static final int bitLength = 39;
+      public static final int byteLength = (bitLength + Byte.SIZE -1) / 
Byte.SIZE;
+      public static final long allBitsOn = (1L << bitLength) - 1;
+      public static final int bitShift = PartialHashCode.bitLength;
+      public static final long bitMask = ((long) allBitsOn) << bitShift;
+
+      // Make it a power of 2.
+      public static final long maxSize = 1L << (bitLength - 2);
+    }
+
+    public static long getAbsoluteOffset(long refWord) {
+      return (refWord & KeyRef.AbsoluteOffset.bitMask) >> 
AbsoluteOffset.bitShift;
+    }
+
+    // When this field equals SmallKeyLength.allBitsOn, the key length is 
serialized at the
+    // beginning of the key.
+    public static final class SmallKeyLength {
+      public static final int bitLength = 8;
+      public static final int allBitsOn = (1 << bitLength) - 1;
+      public static final int threshold = allBitsOn;
+      public static final int bitShift = AbsoluteOffset.bitShift + 
AbsoluteOffset.bitLength;
+      public static final long bitMask = ((long) allBitsOn) << bitShift;
+      public static final long allBitsOnBitShifted = ((long) allBitsOn) << 
bitShift;
+    }
+
+    public static int getSmallKeyLength(long refWord) {
+      return (int) ((refWord & SmallKeyLength.bitMask) >> 
SmallKeyLength.bitShift);
+    }
+
+    public static final class IsSingleFlag {
+      public static final int bitShift = SmallKeyLength.bitShift + 
SmallKeyLength.bitLength;
+      public static final long flagOnMask = 1L << bitShift;
+      public static final long flagOffMask = ~flagOnMask;
+    }
+
+    public static boolean getIsSingleFlag(long refWord) {
+      return (refWord & IsSingleFlag.flagOnMask) != 0;
+    }
+
+    // This bit should not be on for valid value references.  We use -1 for a 
no value marker.
+    public static final class IsInvalidFlag {
+      public static final int bitShift = 63;
+      public static final long flagOnMask = 1L << bitShift;
+    }
+
+    public static boolean getIsInvalidFlag(long refWord) {
+      return (refWord & IsInvalidFlag.flagOnMask) != 0;
+    }
+  }
+
+
+  /**
+   * Extract partial hash code from the full hash code.
+   *
+   * Choose the high bits of the hash code KNOWING it was calculated as an int.
+   *
+   * We want the partial hash code to be different than the
+   * lower bits used for our hash table slot calculations.
+   *
+   * @param hashCode
+   * @return
+   */
+  public static long extractPartialHashCode(long hashCode) {
+    return (hashCode >>> KeyRef.PartialHashCode.intChooseBitShift) & 
KeyRef.PartialHashCode.bitMask;
+  }
+
+  /**
+   * Get partial hash code from the reference word.
+   * @param hashCode
+   * @return
+   */
+  public static long getPartialHashCodeFromRefWord(long refWord) {
+    return KeyRef.getPartialHashCode(refWord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 32e0395..5969460 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -37,78 +37,141 @@ public abstract class VectorMapJoinFastBytesHashMap
 
   private static final Logger LOG = 
LoggerFactory.getLogger(VectorMapJoinFastBytesHashMap.class);
 
-  private VectorMapJoinFastValueStore valueStore;
+  private VectorMapJoinFastBytesHashMapStore hashMapStore;
 
   protected BytesWritable testValueBytesWritable;
 
   @Override
   public VectorMapJoinHashMapResult createHashMapResult() {
-    return new VectorMapJoinFastValueStore.HashMapResult();
+    return new VectorMapJoinFastBytesHashMapStore.HashMapResult();
   }
 
-  @Override
-  public void assignSlot(int slot, byte[] keyBytes, int keyStart, int 
keyLength,
-          long hashCode, boolean isNewKey, BytesWritable currentValue) {
+  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable 
currentValue) {
+
+    if (resizeThreshold <= keysAssigned) {
+      expandAndRehash();
+    }
+
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    boolean isNewKey;
+    long refWord;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      refWord = slots[slot];
+      if (refWord == 0) {
+        isNewKey = true;
+        break;
+      }
+      if 
(VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode &&
+          VectorMapJoinFastBytesHashKeyRef.equalKey(
+                  refWord, keyBytes, keyStart, keyLength, writeBuffers, 
unsafeReadPos)) {
+        isNewKey = false;
+        break;
+      }
+      ++metricPutConflict;
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
+
+    if (largestNumberOfSteps < i) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Probed " + i + " slots (the longest so far) to find space");
+      }
+      largestNumberOfSteps = i;
+      // debugDumpKeyProbe(keyOffset, keyLength, hashCode, slot);
+    }
 
     byte[] valueBytes = currentValue.getBytes();
     int valueLength = currentValue.getLength();
 
-    int tripleIndex = 3 * slot;
     if (isNewKey) {
-      // First entry.
-      slotTriples[tripleIndex] = keyStore.add(keyBytes, keyStart, keyLength);
-      slotTriples[tripleIndex + 1] = hashCode;
-      slotTriples[tripleIndex + 2] = valueStore.addFirst(valueBytes, 0, 
valueLength);
-      // LOG.debug("VectorMapJoinFastBytesHashMap add first keyRefWord " + 
Long.toHexString(slotTriples[tripleIndex]) + " hashCode " + 
Long.toHexString(slotTriples[tripleIndex + 1]) + " valueRefWord " + 
Long.toHexString(slotTriples[tripleIndex + 2]));
+      slots[slot] =
+          hashMapStore.addFirst(
+              partialHashCode, keyBytes, keyStart, keyLength, valueBytes, 0, 
valueLength);
+      keysAssigned++;
     } else {
-      // Add another value.
-      // LOG.debug("VectorMapJoinFastBytesHashMap add more keyRefWord " + 
Long.toHexString(slotTriples[tripleIndex]) + " hashCode " + 
Long.toHexString(slotTriples[tripleIndex + 1]) + " valueRefWord " + 
Long.toHexString(slotTriples[tripleIndex + 2]));
-      slotTriples[tripleIndex + 2] = 
valueStore.addMore(slotTriples[tripleIndex + 2], valueBytes, 0, valueLength);
-      // LOG.debug("VectorMapJoinFastBytesHashMap add more new valueRefWord " 
+ Long.toHexString(slotTriples[tripleIndex + 2]));
+      final long newRefWord =
+          hashMapStore.addMore(
+              refWord, valueBytes, 0, valueLength, unsafeReadPos);
+      if (newRefWord != refWord) {
+        slots[slot] = newRefWord;
+      }
     }
   }
 
   @Override
-  public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int 
keyLength, VectorMapJoinHashMapResult hashMapResult) {
-    VectorMapJoinFastValueStore.HashMapResult optimizedHashMapResult =
-        (VectorMapJoinFastValueStore.HashMapResult) hashMapResult;
+  public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int 
keyLength,
+      VectorMapJoinHashMapResult hashMapResult) {
 
-    optimizedHashMapResult.forget();
+    VectorMapJoinFastBytesHashMapStore.HashMapResult fastHashMapResult =
+         (VectorMapJoinFastBytesHashMapStore.HashMapResult) hashMapResult;
 
-    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, 
hashMapResult.getReadPos());
-    JoinUtil.JoinResult joinResult;
-    if (valueRefWord == -1) {
-      joinResult = JoinUtil.JoinResult.NOMATCH;
-    } else {
-      // LOG.debug("VectorMapJoinFastBytesHashMap lookup hashCode " + 
Long.toHexString(hashCode) + " valueRefWord " + Long.toHexString(valueRefWord) 
+ " (valueStore != null) " + (valueStore != null));
+    fastHashMapResult.forget();
 
-      optimizedHashMapResult.set(valueStore, valueRefWord);
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
 
-      joinResult = JoinUtil.JoinResult.MATCH;
-    }
+    doHashMapMatch(
+        keyBytes, keyStart, keyLength, hashCode, fastHashMapResult);
 
-    optimizedHashMapResult.setJoinResult(joinResult);
+    return fastHashMapResult.joinResult();
+  }
 
-    return joinResult;
+  protected final void doHashMapMatch(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode,
+      VectorMapJoinFastBytesHashMapStore.HashMapResult fastHashMapResult) {
+
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      final long refWord = slots[slot];
+      if (refWord == 0) {
+
+        // Given that we do not delete, an empty slot means no match.
+        return;
+      } else if (
+          
VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode) {
+
+        // Finally, verify the key bytes match and remember read positions, 
etc in
+        // fastHashMapResult.
+        fastHashMapResult.setKey(hashMapStore, refWord);
+        if (fastHashMapResult.equalKey(keyBytes, keyStart, keyLength)) {
+          fastHashMapResult.setMatch();
+          return;
+        }
+      }
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      if (i > largestNumberOfSteps) {
+        // We know we never went that far when we were inserting.
+        return;
+      }
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
   }
 
   public VectorMapJoinFastBytesHashMap(
       int initialCapacity, float loadFactor, int writeBuffersSize, long 
estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
-
-    valueStore = new VectorMapJoinFastValueStore(writeBuffersSize);
-
-    // Share the same write buffers with our value store.
-    keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers());
+    hashMapStore = new VectorMapJoinFastBytesHashMapStore(writeBuffersSize);
+    writeBuffers = hashMapStore.getWriteBuffers();
   }
 
   @Override
   public long getEstimatedMemorySize() {
     long size = super.getEstimatedMemorySize();
-    size += valueStore.getEstimatedMemorySize();
-    // keyStore / valueStore back buffers are shared; so don't need:
-    // size += keyStore.getEstimatedMemorySize();
+    size += hashMapStore.getEstimatedMemorySize();
     return size;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java
new file mode 100644
index 0000000..dda4a85
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastBytesHashKeyRef.KeyRef;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
+import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef;
+import org.apache.hadoop.hive.serde2.WriteBuffers.Position;
+
+// import com.google.common.base.Preconditions;
+
+/*
+ * Used by VectorMapJoinFastBytesHashMap to store the key and values for a 
hash map with a bytes
+ * key.
+ */
+public class VectorMapJoinFastBytesHashMapStore implements MemoryEstimate {
+
+  private WriteBuffers writeBuffers;
+
+  /**
+   * A store for a key and a list of 1 or more arbitrary length values in 
memory.
+   *
+   * The memory is a "infinite" byte array as a WriteBuffers object.
+   *
+   * We give the client (e.g. hash map logic) a 64-bit key and value reference 
to keep that has
+   * the offset within the "infinite" byte array of the key.  The 64 bits 
includes about half
+   * of the upper hash code to help during matching.
+   *
+   * We optimize the common case when the key length is short and store that 
information in the
+   * 64 bit reference.
+   *
+   * When there are more than 1 value, the zero padding is overwritten with a 
relative offset to
+   * the next value.  The next value always includes the value length.
+   *
+   * Cases:
+   *
+   *  1) One element when key and is small (and stored in the reference word):
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      ---------------------------------
+   *                                       |
+   *                                       v
+   *       <5 0's for Next Relative Offset> <Key Bytes> <Value Length> <Value 
Bytes>
+   *                NEXT (NONE)                 KEY                        
VALUE
+   *
+   * NOTE: AbsoluteOffset.byteLength = 5
+   *
+   *  2) One element, general: shows optional big key length.
+   *
+   *   Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      ---------------------------------
+   *                                       |
+   *                                       v
+   *      <5 0's for Next Relative Offset> [Big Key Length] <Key Bytes> <Value 
Length> <Value Bytes>
+   *                NEXT (NONE)                optional        KEY             
           VALUE
+   *
+   *  3) Two elements when key length is small and stored in reference word:
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      ------------------------------------
+   *                                         |
+   *                                         v
+   *      <Next Value Rel Offset as 5 bytes> <Key Bytes> <Value Bytes>
+   *         |     NEXT                         KEY         VALUE
+   *         |
+   *         | first record absolute offset + relative offset
+   *         |
+   *         --------
+   *                 |
+   *                 v
+   *                <5 0's Padding for Next Value Ref> <Value Length> <Value 
Bytes>
+   *                     NEXT (NONE)                                     VALUE
+   *
+   *  4) Three elements showing how first record updated to point to new value 
and
+   *     new value points to most recent (additional) value:
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      ------------------------------------
+   *                                         |
+   *                                         v
+   *      <Next Value Rel Offset as 5 bytes> <Key Bytes> <Value Bytes>
+   *         |     NEXT                         KEY         VALUE
+   *         |
+   *         | first record absolute offset + relative offset
+   *         |
+   *         |
+   *         |      <5 0's Padding for Next Value Ref> <Value Length> <Value 
Bytes>
+   *         |      ^    NEXT (NONE)                                    VALUE
+   *         |      |
+   *         |      ------
+   *         |            |
+   *         |            | new record absolute offset - (minus) relative 
offset
+   *         |            |
+   *          -----><Next Value Rel Offset as 5 bytes> <Value Length> <Value 
Bytes>
+   *                     NEXT                                            VALUE
+   *
+   *
+   *   5) Four elements showing how first record is again updated to point to 
new value and
+   *     new value points to most recent (additional) value:
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      ------------------------------------
+   *                                         |
+   *                                         v
+   *      <Next Value Rel Offset as 5 bytes> <Key Bytes> <Value Length> <Value 
Bytes>
+   *         |     NEXT                          KEY                      VALUE
+   *         |
+   *         | first record absolute offset + relative offset
+   *         |
+   *         |
+   *         |      <5 0's Padding for Next Value Ref> <Value Length> <Value 
Bytes>
+   *         |      ^    NEXT (NONE)                                     VALUE
+   *         |      |
+   *         |      ------
+   *         |            | record absolute offset - (minus) relative offset
+   *         |            |
+   *         |      <Next Value Rel Offset as 5 bytes> <Value Length> <Value 
Bytes>
+   *         |      ^       NEXT                                         VALUE
+   *         |      |
+   *         |      ------
+   *         |            |
+   *         |            | new record absolute offset - (minus) relative 
offset
+   *         |            |
+   *          -----><Next Value Rel Offset as 5 bytes> <Value Length> <Value 
Bytes>
+   *                        NEXT                                         VALUE
+   *
+   *
+   *  You get the idea.
+   */
+
+  public WriteBuffers getWriteBuffers() {
+    return writeBuffers;
+  }
+
+  /**
+   * A hash map result that can read values stored by the key and value store, 
one-by-one.
+   * It also has support routines for checking the hash code and key equality.
+   *
+   * It implements the standard map join hash map result interface.
+   *
+   */
+  public static class HashMapResult extends VectorMapJoinHashMapResult {
+
+    private VectorMapJoinFastBytesHashMapStore hashMapStore;
+
+    private int keyLength;
+
+    private boolean hasRows;
+    private long refWord;
+    private boolean isSingleRow;
+    private long absoluteOffset;
+    private long keyAbsoluteOffset;
+    private long firstValueAbsoluteOffset;
+
+    private int readIndex;
+    private boolean isNextEof;
+
+    long nextAbsoluteValueOffset;
+
+    private ByteSegmentRef byteSegmentRef;
+    private Position readPos;
+
+    public HashMapResult() {
+      super();
+      refWord = -1;
+      hasRows = false;
+      byteSegmentRef = new ByteSegmentRef();
+      readPos = new Position();
+    }
+
+    /**
+     * Setup for reading the key of an entry with the equalKey method.
+     * @param hashMapStore
+     * @param part1Word
+     * @param part2Word
+     */
+    public void setKey(VectorMapJoinFastBytesHashMapStore hashMapStore, long 
refWord) {
+
+      // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+      this.hashMapStore = hashMapStore;
+
+      this.refWord = refWord;
+
+      absoluteOffset = KeyRef.getAbsoluteOffset(refWord);
+
+      // Position after next relative offset (fixed length) to the key.
+      hashMapStore.writeBuffers.setReadPoint(absoluteOffset, readPos);
+
+      keyLength = KeyRef.getSmallKeyLength(refWord);
+      boolean isKeyLengthSmall = (keyLength != 
KeyRef.SmallKeyLength.allBitsOn);
+      if (isKeyLengthSmall) {
+
+        keyAbsoluteOffset = absoluteOffset;
+      } else {
+
+        // And, if current value is big we must read it.
+        keyLength = hashMapStore.writeBuffers.readVInt(readPos);
+        keyAbsoluteOffset = hashMapStore.writeBuffers.getReadPoint(readPos);
+      }
+
+      // NOTE: Reading is now positioned before the key bytes.
+    }
+
+    /**
+     * Compare a key with the key positioned with the setKey method.
+     * @param keyBytes
+     * @param keyStart
+     * @param keyLength
+     * @return
+     */
+    public boolean equalKey(byte[] keyBytes, int keyStart, int keyLength) {
+
+      if (this.keyLength != keyLength) {
+        return false;
+      }
+
+      // Our reading was positioned to the key.
+      if (!hashMapStore.writeBuffers.isEqual(keyBytes, keyStart, readPos, 
keyLength)) {
+        return false;
+      }
+
+      // NOTE: WriteBuffers.isEqual does not advance the read position...
+
+      return true;
+    }
+
+    /**
+     * Mark the key matched with equalKey as a match and set up for reading 
the values.
+     * Afterward, methods isSingleRow, cappedCount, first, next, etc may be 
called.
+     */
+    public void setMatch() {
+      hasRows = true;
+      isSingleRow = KeyRef.getIsSingleFlag(refWord);
+
+      // We must set the position since equalKey does not leave us positioned 
correctly.
+      hashMapStore.writeBuffers.setReadPoint(
+          keyAbsoluteOffset + keyLength, readPos);
+
+      // Save first value absolute offset...
+      firstValueAbsoluteOffset = 
hashMapStore.writeBuffers.getReadPoint(readPos);
+
+      // Position to beginning.
+      readIndex = 0;
+      isNextEof = false;
+      setJoinResult(JoinResult.MATCH);
+    }
+
+    @Override
+    public boolean hasRows() {
+      return hasRows;
+    }
+
+    @Override
+    public boolean isSingleRow() {
+      if (!hasRows) {
+        return false;
+      }
+
+      return isSingleRow;
+    }
+
+    @Override
+    public boolean isCappedCountAvailable() {
+      return true;
+    }
+
+    @Override
+    public int cappedCount() {
+
+      // The return values are capped to return ==0, ==1 and >= 2.
+      return hasRows ? (isSingleRow ? 1 : 2) : 0;
+    }
+
+    @Override
+    public ByteSegmentRef first() {
+      if (!hasRows) {
+        return null;
+      }
+
+      // Position to beginning.
+      readIndex = 0;
+      isNextEof = false;
+
+      return internalRead();
+    }
+
+    @Override
+    public ByteSegmentRef next() {
+      if (!hasRows || isNextEof) {
+        return null;
+      }
+
+      return internalRead();
+    }
+
+    public ByteSegmentRef internalRead() {
+
+      int nextValueLength;
+
+      if (readIndex == 0) {
+        if (isSingleRow) {
+          isNextEof = true;
+          nextAbsoluteValueOffset = -1;
+        } else {
+
+          // Read the next relative offset the last inserted value record.
+          final long referenceAbsoluteOffset =
+              absoluteOffset - KeyRef.AbsoluteOffset.byteLength;
+          hashMapStore.writeBuffers.setReadPoint(
+              referenceAbsoluteOffset, readPos);
+          long relativeNextValueOffset =
+              hashMapStore.writeBuffers.readNByteLong(
+                  KeyRef.AbsoluteOffset.byteLength, readPos);
+          // Preconditions.checkState(relativeNextValueOffset != 0);
+          isNextEof = false;
+
+          // Use positive relative offset from first record to last inserted 
value record.
+          nextAbsoluteValueOffset = referenceAbsoluteOffset + 
relativeNextValueOffset;
+        }
+
+        // Position past the key to first value.
+        hashMapStore.writeBuffers.setReadPoint(firstValueAbsoluteOffset, 
readPos);
+        nextValueLength = hashMapStore.writeBuffers.readVInt(readPos);
+      } else {
+
+        // Position to the next value record.
+        // Preconditions.checkState(nextAbsoluteValueOffset >= 0);
+        hashMapStore.writeBuffers.setReadPoint(nextAbsoluteValueOffset, 
readPos);
+
+        // Read the next relative offset.
+        long relativeNextValueOffset =
+            hashMapStore.writeBuffers.readNByteLong(
+                RelativeOffset.byteLength, readPos);
+        if (relativeNextValueOffset == 0) {
+          isNextEof = true;
+          nextAbsoluteValueOffset = -1;
+        } else {
+          isNextEof = false;
+
+          // The way we insert causes our chain to backwards from the last 
inserted value record...
+          nextAbsoluteValueOffset = nextAbsoluteValueOffset - 
relativeNextValueOffset;
+        }
+        nextValueLength = hashMapStore.writeBuffers.readVInt(readPos);
+
+        // Now positioned to the value.
+      }
+
+      // Capture a ByteSegmentRef to the current value position and length.
+      hashMapStore.writeBuffers.getByteSegmentRefToCurrent(
+          byteSegmentRef, nextValueLength, readPos);
+
+      readIndex++;
+      return byteSegmentRef;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(" + super.toString() + ", ");
+      sb.append("cappedCount " + cappedCount() + ")");
+      return sb.toString();
+    }
+
+    /**
+     * Get detailed HashMap result position information to help diagnose 
exceptions.
+     */
+    @Override
+    public String getDetailedHashMapResultPositionString() {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("Read index ");
+      sb.append(readIndex);
+      if (isSingleRow) {
+        sb.append(" single row");
+      } else {
+        sb.append(" multiple rows ");
+      }
+
+      if (readIndex > 0) {
+        sb.append(" byteSegmentRef is byte[] of length ");
+        sb.append(byteSegmentRef.getBytes().length);
+        sb.append(" at offset ");
+        sb.append(byteSegmentRef.getOffset());
+        sb.append(" for length ");
+        sb.append(byteSegmentRef.getLength());
+        if (!isSingleRow) {
+          sb.append(" (isNextEof ");
+          sb.append(isNextEof);
+          sb.append(" nextAbsoluteValueOffset ");
+          sb.append(nextAbsoluteValueOffset);
+          sb.append(")");
+        }
+      }
+
+      return sb.toString();
+    }
+  }
+
+  private static final class RelativeOffset {
+    private static final int byteLength = KeyRef.AbsoluteOffset.byteLength;
+
+    // Relative offset zero padding.
+    private static final byte[] zeroPadding = new byte[] { 0,0,0,0,0 };
+  }
+
+  /**
+   * Two 64-bit long result is the key and value reference.
+   * @param partialHashCode
+   * @param keyBytes
+   * @param keyStart
+   * @param keyLength
+   * @param valueBytes
+   * @param valueStart
+   * @param valueLength
+   */
+  public long addFirst(long partialHashCode, byte[] keyBytes, int keyStart, 
int keyLength,
+      byte[] valueBytes, int valueStart, int valueLength) {
+
+    // Zero pad out bytes for fixed size next relative offset if more values 
are added later.
+    writeBuffers.write(RelativeOffset.zeroPadding);
+
+    // We require the absolute offset to be non-zero so the 64 key and value 
reference is non-zero.
+    // So, we make it the offset after the relative offset and to the key.
+    final long absoluteOffset = writeBuffers.getWritePoint();
+    // Preconditions.checkState(absoluteOffset > 0);
+
+    boolean isKeyLengthBig = (keyLength >= KeyRef.SmallKeyLength.threshold);
+    if (isKeyLengthBig) {
+      writeBuffers.writeVInt(keyLength);
+    }
+    writeBuffers.write(keyBytes, keyStart, keyLength);
+
+    writeBuffers.writeVInt(valueLength);
+    writeBuffers.write(valueBytes, valueStart, valueLength);
+
+    /*
+     * Form 64 bit key and value reference.
+     */
+    long refWord = partialHashCode;
+
+    refWord |= absoluteOffset << KeyRef.AbsoluteOffset.bitShift;
+
+    if (isKeyLengthBig) {
+      refWord |= KeyRef.SmallKeyLength.allBitsOnBitShifted;
+    } else {
+      refWord |= ((long) keyLength) << KeyRef.SmallKeyLength.bitShift;
+    }
+
+    refWord |= KeyRef.IsSingleFlag.flagOnMask;
+
+    // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+    return refWord;
+  }
+
+  /**
+   * @param refWord
+   * @param valueBytes
+   * @param valueStart
+   * @param valueLength
+   */
+  public long addMore(long refWord, byte[] valueBytes, int valueStart, int 
valueLength,
+      WriteBuffers.Position unsafeReadPos) {
+
+    // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+    /*
+     * Extract information from the reference word.
+     */
+    final long referenceAbsoluteOffset =
+        KeyRef.getAbsoluteOffset(refWord) - KeyRef.AbsoluteOffset.byteLength;
+
+    // Where the new value record will be written.
+    long nextAbsoluteValueOffset = writeBuffers.getWritePoint();
+
+    if (KeyRef.getIsSingleFlag(refWord)) {
+
+      // Mark reference as having more than 1 value.
+      refWord &= KeyRef.IsSingleFlag.flagOffMask;
+
+      // Write zeros to indicate no 3rd record.
+      writeBuffers.write(RelativeOffset.zeroPadding);
+    } else {
+
+      // To insert next value record above count 2:
+
+      // 1) Read next relative offset in first record (this is a positive 
relative offset) to
+      //    last inserted value record.
+      long oldPrevRelativeValueOffset =
+          writeBuffers.readNByteLong(
+              referenceAbsoluteOffset, RelativeOffset.byteLength, 
unsafeReadPos);
+
+      // 2) Relative offset is positive from first record to last inserted 
value record.
+      long prevAbsoluteValueOffset = referenceAbsoluteOffset + 
oldPrevRelativeValueOffset;
+
+      // 3) Since previous record is before the new one, subtract because we 
store relative offsets
+      //    as unsigned.
+      long newPrevRelativeValueOffset = nextAbsoluteValueOffset - 
prevAbsoluteValueOffset;
+      // Preconditions.checkState(newPrevRelativeValueOffset >= 0);
+      writeBuffers.writeFiveByteULong(newPrevRelativeValueOffset);
+    }
+
+    writeBuffers.writeVInt(valueLength);
+    writeBuffers.write(valueBytes, valueStart, valueLength);
+
+    // Overwrite relative offset in first record.
+    long newRelativeOffset = nextAbsoluteValueOffset - referenceAbsoluteOffset;
+    // Preconditions.checkState(newRelativeOffset >= 0);
+    writeBuffers.writeFiveByteULong(referenceAbsoluteOffset, 
newRelativeOffset);
+
+    return refWord;
+  }
+
+  public VectorMapJoinFastBytesHashMapStore(int writeBuffersSize) {
+    writeBuffers = new WriteBuffers(writeBuffersSize, 
KeyRef.AbsoluteOffset.maxSize);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index 726fd29..849eeb4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -42,26 +42,67 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
 
   private static final Logger LOG = 
LoggerFactory.getLogger(VectorMapJoinFastBytesHashMultiSet.class);
 
+  private VectorMapJoinFastBytesHashMultiSetStore hashMultiSetStore;
+
   @Override
   public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
-    return new VectorMapJoinFastHashMultiSet.HashMultiSetResult();
+    return new VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult();
   }
 
-  @Override
-  public void assignSlot(int slot, byte[] keyBytes, int keyStart, int 
keyLength,
-          long hashCode, boolean isNewKey, BytesWritable currentValue) {
+  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable 
currentValue) {
+
+    if (resizeThreshold <= keysAssigned) {
+      expandAndRehash();
+    }
+
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    boolean isNewKey;
+    long refWord;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      refWord = slots[slot];
+      if (refWord == 0) {
+        isNewKey = true;
+        break;
+      }
+      if 
(VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode &&
+          VectorMapJoinFastBytesHashKeyRef.equalKey(
+              refWord, keyBytes, keyStart, keyLength, writeBuffers, 
unsafeReadPos)) {
+        isNewKey = false;
+        break;
+      }
+      ++metricPutConflict;
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
+
+    if (largestNumberOfSteps < i) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Probed " + i + " slots (the longest so far) to find space");
+      }
+      largestNumberOfSteps = i;
+      // debugDumpKeyProbe(keyOffset, keyLength, hashCode, slot);
+    }
 
-    int tripleIndex = 3 * slot;
     if (isNewKey) {
-      // First entry.
-      slotTriples[tripleIndex] = keyStore.add(keyBytes, keyStart, keyLength);
-      slotTriples[tripleIndex + 1] = hashCode;
-      slotTriples[tripleIndex + 2] = 1;    // Count.
-      // LOG.debug("VectorMapJoinFastBytesHashMap add first keyRefWord " + 
Long.toHexString(slotTriples[tripleIndex]) + " hashCode " + 
Long.toHexString(slotTriples[tripleIndex + 1]) + " valueRefWord " + 
Long.toHexString(slotTriples[tripleIndex + 2]));
+      slots[slot] =
+          hashMultiSetStore.addFirst(
+              partialHashCode, keyBytes, keyStart, keyLength);
+      keysAssigned++;
     } else {
-      // Add another value.
-      // LOG.debug("VectorMapJoinFastBytesHashMap add more keyRefWord " + 
Long.toHexString(slotTriples[tripleIndex]) + " hashCode " + 
Long.toHexString(slotTriples[tripleIndex + 1]) + " valueRefWord " + 
Long.toHexString(slotTriples[tripleIndex + 2]));
-      slotTriples[tripleIndex + 2]++;
+      final long newRefWord =
+          hashMultiSetStore.bumpCount(
+              refWord, unsafeReadPos);
+      if (newRefWord != refWord) {
+        slots[slot] = newRefWord;
+      }
     }
   }
 
@@ -69,37 +110,68 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
   public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int 
keyLength,
           VectorMapJoinHashMultiSetResult hashMultiSetResult) {
 
-    VectorMapJoinFastHashMultiSet.HashMultiSetResult 
optimizedHashMultiSetResult =
-        (VectorMapJoinFastHashMultiSet.HashMultiSetResult) hashMultiSetResult;
+    VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult 
fastHashMultiSetResult =
+        (VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult) 
hashMultiSetResult;
 
-    optimizedHashMultiSetResult.forget();
+    fastHashMultiSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, 
hashMultiSetResult.getReadPos());
-    JoinUtil.JoinResult joinResult;
-    if (count == -1) {
-      joinResult = JoinUtil.JoinResult.NOMATCH;
-    } else {
 
-      optimizedHashMultiSetResult.set(count);
-
-      joinResult = JoinUtil.JoinResult.MATCH;
-    }
+    doHashMultiSetContains(
+        keyBytes, keyStart, keyLength, hashCode, fastHashMultiSetResult);
 
-    optimizedHashMultiSetResult.setJoinResult(joinResult);
+    return fastHashMultiSetResult.joinResult();
+  }
 
-    return joinResult;
+  protected final void doHashMultiSetContains(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode,
+      VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult 
fastHashMultiSetResult) {
+
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      final long refWord = slots[slot];
+      if (refWord == 0) {
+
+        // Given that we do not delete, an empty slot means no match.
+        return;
+      } else if (
+          
VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode) {
+
+        // Finally, verify the key bytes match and remember the set membership 
count in
+        // fastHashMultiSetResult.
+        fastHashMultiSetResult.setKey(hashMultiSetStore, refWord);
+        if (fastHashMultiSetResult.equalKey(keyBytes, keyStart, keyLength)) {
+          fastHashMultiSetResult.setContains();
+          return;
+        }
+      }
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      if (i > largestNumberOfSteps) {
+        // We know we never went that far when we were inserting.
+        return;
+      }
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
   }
 
   public VectorMapJoinFastBytesHashMultiSet(
       int initialCapacity, float loadFactor, int writeBuffersSize, long 
estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
-
-    keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
+    hashMultiSetStore = new 
VectorMapJoinFastBytesHashMultiSetStore(writeBuffersSize);
+    writeBuffers = hashMultiSetStore.getWriteBuffers();
   }
 
   @Override
   public long getEstimatedMemorySize() {
-    return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+    long size = super.getEstimatedMemorySize();
+    size += hashMultiSetStore.getEstimatedMemorySize();
+    return size;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSetStore.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSetStore.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSetStore.java
new file mode 100644
index 0000000..20fa03a
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSetStore.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastBytesHashKeyRef.KeyRef;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
+import org.apache.hadoop.hive.serde2.WriteBuffers.Position;
+
+// import com.google.common.base.Preconditions;
+
+/*
+ * Used by VectorMapJoinFastBytesHashMultiSet to store the key and count for a 
hash multi-set with
+ * a bytes key.
+ */
+public class VectorMapJoinFastBytesHashMultiSetStore implements MemoryEstimate 
{
+
+  private WriteBuffers writeBuffers;
+
+  /**
+   * A store for a key and set membership count in memory.
+   *
+   * The memory is a "infinite" byte array as a WriteBuffers object.
+   *
+   * We give the client (e.g. hash multi-set logic) a 64-bit key and count 
reference to keep that
+   * has the offset within the "infinite" byte array of the key.  The 64 bits 
includes about half
+   * of the upper hash code to help during matching.
+   *
+   * We optimize the common case when the key length is short and store that 
information in the
+   * 64 bit reference.
+   *
+   * Cases:
+   *
+   *  1) One element when key and is small (and stored in the reference word):
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      --------------------------------------
+   *                                           |
+   *                                           v
+   *       <4 bytes's for set membership count> <Key Bytes>
+   *            COUNT                              KEY
+   *
+   * NOTE: MultiSetCount.byteLength = 4
+   *
+   *  2) One element, general: shows optional big key length.
+   *
+   *   Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      -------------------------------------
+   *                                          |
+   *                                          v
+   *      <4 byte's for set membership count> [Big Key Length] <Key Bytes>
+   *                NEXT (NONE)                optional           KEY
+   */
+
+  public WriteBuffers getWriteBuffers() {
+    return writeBuffers;
+  }
+
+  /**
+   * A hash multi-set result that can read the set membership count for the 
key.
+   * It also has support routines for checking the hash code and key equality.
+   *
+   * It implements the standard map join hash multi-set result interface.
+   *
+   */
+  public static class HashMultiSetResult extends 
VectorMapJoinHashMultiSetResult {
+
+    private VectorMapJoinFastBytesHashMultiSetStore multiSetStore;
+
+    private int keyLength;
+    private boolean isSingleCount;
+
+    private long refWord;
+
+    private long absoluteOffset;
+
+    private Position readPos;
+
+    public HashMultiSetResult() {
+      super();
+      refWord = -1;
+      readPos = new Position();
+    }
+
+    /**
+     * Setup for reading the key of an entry with the equalKey method.
+     * @param multiSetStore
+     * @param refWord
+     */
+    public void setKey(VectorMapJoinFastBytesHashMultiSetStore multiSetStore, 
long refWord) {
+
+      // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+      this.multiSetStore = multiSetStore;
+
+      this.refWord = refWord;
+
+      absoluteOffset = KeyRef.getAbsoluteOffset(refWord);
+
+      // Position after next relative offset (fixed length) to the key.
+      multiSetStore.writeBuffers.setReadPoint(absoluteOffset, readPos);
+
+      keyLength = KeyRef.getSmallKeyLength(refWord);
+      boolean isKeyLengthSmall = (keyLength != 
KeyRef.SmallKeyLength.allBitsOn);
+      if (!isKeyLengthSmall) {
+
+        // And, if current value is big we must read it.
+        keyLength = multiSetStore.writeBuffers.readVInt(readPos);
+      }
+
+      // NOTE: Reading is now positioned before the key bytes.
+    }
+
+    /**
+     * Compare a key with the key positioned with the setKey method.
+     * @param keyBytes
+     * @param keyStart
+     * @param keyLength
+     * @return
+     */
+    public boolean equalKey(byte[] keyBytes, int keyStart, int keyLength) {
+
+      if (this.keyLength != keyLength) {
+        return false;
+      }
+
+      // Our reading was positioned to the key.
+      if (!multiSetStore.writeBuffers.isEqual(keyBytes, keyStart, readPos, 
keyLength)) {
+        return false;
+      }
+
+      // NOTE: WriteBuffers.isEqual does not advance the read position...
+
+      return true;
+    }
+
+    /**
+     * Mark the key matched with equalKey as a match and read the set 
membership count,
+     * if necessary.
+     */
+    public void setContains() {
+      isSingleCount = KeyRef.getIsSingleFlag(refWord);
+
+      if (isSingleCount) {
+        count = 1;
+      } else {
+        count =
+            multiSetStore.writeBuffers.readInt(
+                absoluteOffset - MultiSetCount.byteLength, readPos);
+      }
+      setJoinResult(JoinResult.MATCH);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(" + super.toString() + ", ");
+      sb.append("count " + count + ")");
+      return sb.toString();
+    }
+  }
+
+  private static final class MultiSetCount {
+    private static final int byteLength = Integer.SIZE / Byte.SIZE;
+
+    // Relative offset zero padding.
+    private static final byte[] oneCount = new byte[] { 0,0,0,1 };
+  }
+
+  /**
+   * Two 64-bit long result is the key and value reference.
+   * @param partialHashCode
+   * @param keyBytes
+   * @param keyStart
+   * @param keyLength
+   */
+  public long addFirst(long partialHashCode, byte[] keyBytes, int keyStart, 
int keyLength) {
+
+    // Zero pad out bytes for fixed size next relative offset if more values 
are added later.
+    writeBuffers.write(MultiSetCount.oneCount);
+
+    // We require the absolute offset to be non-zero so the 64 key and value 
reference is non-zero.
+    // So, we make it the offset after the relative offset and to the key.
+    final long absoluteOffset = writeBuffers.getWritePoint();
+    // Preconditions.checkState(absoluteOffset > 0);
+
+    boolean isKeyLengthBig = (keyLength >= KeyRef.SmallKeyLength.threshold);
+    if (isKeyLengthBig) {
+      writeBuffers.writeVInt(keyLength);
+    }
+    writeBuffers.write(keyBytes, keyStart, keyLength);
+
+    /*
+     * Form 64 bit key and value reference.
+     */
+    long refWord = partialHashCode;
+
+    refWord |= absoluteOffset << KeyRef.AbsoluteOffset.bitShift;
+
+    if (isKeyLengthBig) {
+      refWord |= KeyRef.SmallKeyLength.allBitsOnBitShifted;
+    } else {
+      refWord |= ((long) keyLength) << KeyRef.SmallKeyLength.bitShift;
+    }
+
+    refWord |= KeyRef.IsSingleFlag.flagOnMask;
+
+    // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+    return refWord;
+  }
+
+  /**
+   * @param refWord
+   */
+  public long bumpCount(long refWord, WriteBuffers.Position unsafeReadPos) {
+
+    // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+    /*
+     * Extract information from the reference word.
+     */
+    final long countAbsoluteOffset =
+        KeyRef.getAbsoluteOffset(refWord) - MultiSetCount.byteLength;
+
+    final int currentCount =
+        writeBuffers.readInt(
+            countAbsoluteOffset, unsafeReadPos);
+
+    // Mark reference as having more than 1 as the count.
+    refWord &= KeyRef.IsSingleFlag.flagOffMask;
+
+    // Save current write position.
+    final long saveAbsoluteOffset = writeBuffers.getWritePoint();
+
+    writeBuffers.setWritePoint(countAbsoluteOffset);
+    writeBuffers.writeInt(
+        countAbsoluteOffset, currentCount + 1);
+
+    // Restore current write position.
+    writeBuffers.setWritePoint(saveAbsoluteOffset);
+
+    return refWord;
+  }
+
+  public VectorMapJoinFastBytesHashMultiSetStore(int writeBuffersSize) {
+    writeBuffers = new WriteBuffers(writeBuffersSize, 
KeyRef.AbsoluteOffset.maxSize);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 5d750a8..737b4d0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -37,21 +37,63 @@ public abstract class VectorMapJoinFastBytesHashSet
 
   private static final Logger LOG = 
LoggerFactory.getLogger(VectorMapJoinFastBytesHashSet.class);
 
+  private VectorMapJoinFastBytesHashSetStore hashSetStore;
+
   @Override
   public VectorMapJoinHashSetResult createHashSetResult() {
-    return new VectorMapJoinFastHashSet.HashSetResult();
+    return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
   }
 
-  @Override
-  public void assignSlot(int slot, byte[] keyBytes, int keyStart, int 
keyLength,
-          long hashCode, boolean isNewKey, BytesWritable currentValue) {
+  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable 
currentValue) {
+
+    if (resizeThreshold <= keysAssigned) {
+      expandAndRehash();
+    }
+
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    boolean isNewKey;
+    long refWord;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      refWord = slots[slot];
+      if (refWord == 0) {
+        isNewKey = true;
+        break;
+      }
+      if 
(VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode &&
+          VectorMapJoinFastBytesHashKeyRef.equalKey(
+              refWord, keyBytes, keyStart, keyLength, writeBuffers, 
unsafeReadPos)) {
+        isNewKey = false;
+        break;
+      }
+      ++metricPutConflict;
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
+
+    if (largestNumberOfSteps < i) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Probed " + i + " slots (the longest so far) to find space");
+      }
+      largestNumberOfSteps = i;
+      // debugDumpKeyProbe(keyOffset, keyLength, hashCode, slot);
+    }
 
-    int tripleIndex = 3 * slot;
     if (isNewKey) {
-      // First entry.
-      slotTriples[tripleIndex] = keyStore.add(keyBytes, keyStart, keyLength);
-      slotTriples[tripleIndex + 1] = hashCode;
-      slotTriples[tripleIndex + 2] = 1;    // Existence
+      slots[slot] =
+          hashSetStore.add(
+              partialHashCode, keyBytes, keyStart, keyLength);
+      keysAssigned++;
+    } else {
+
+      // Key already exists -- do nothing.
     }
   }
 
@@ -59,34 +101,68 @@ public abstract class VectorMapJoinFastBytesHashSet
   public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int 
keyLength,
           VectorMapJoinHashSetResult hashSetResult) {
 
-    VectorMapJoinFastHashSet.HashSetResult optimizedHashSetResult =
-        (VectorMapJoinFastHashSet.HashSetResult) hashSetResult;
+    VectorMapJoinFastBytesHashSetStore.HashSetResult fastHashSetResult =
+        (VectorMapJoinFastBytesHashSetStore.HashSetResult) hashSetResult;
 
-    optimizedHashSetResult.forget();
+    fastHashSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, 
hashSetResult.getReadPos());
-    JoinUtil.JoinResult joinResult;
-    if (existance == -1) {
-      joinResult = JoinUtil.JoinResult.NOMATCH;
-    } else {
-      joinResult = JoinUtil.JoinResult.MATCH;
-    }
 
-    optimizedHashSetResult.setJoinResult(joinResult);
+    doHashSetContains(
+        keyBytes, keyStart, keyLength, hashCode, fastHashSetResult);
 
-    return joinResult;
+    return fastHashSetResult.joinResult();
+  }
+
+  protected final void doHashSetContains(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode,
+      VectorMapJoinFastBytesHashSetStore.HashSetResult fastHashSetResult) {
+
+    int intHashCode = (int) hashCode;
+    int slot = (intHashCode & logicalHashBucketMask);
+    long probeSlot = slot;
+    int i = 0;
+    final long partialHashCode =
+        VectorMapJoinFastBytesHashKeyRef.extractPartialHashCode(hashCode);
+    while (true) {
+      final long refWord = slots[slot];
+      if (refWord == 0) {
+
+        // Given that we do not delete, an empty slot means no match.
+        return;
+      } else if (
+          
VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) ==
+              partialHashCode) {
+
+        // Finally, verify the key bytes match and implicitly remember the set 
existence in
+        // fastHashSetResult.
+        fastHashSetResult.setKey(hashSetStore, refWord);
+        if (fastHashSetResult.equalKey(keyBytes, keyStart, keyLength)) {
+          fastHashSetResult.setContains();
+          return;
+        }
+      }
+      // Some other key (collision) - keep probing.
+      probeSlot += (++i);
+      if (i > largestNumberOfSteps) {
+        // We know we never went that far when we were inserting.
+        return;
+      }
+      slot = (int) (probeSlot & logicalHashBucketMask);
+    }
   }
 
   public VectorMapJoinFastBytesHashSet(
       int initialCapacity, float loadFactor, int writeBuffersSize, long 
estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
-
-    keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
+    hashSetStore = new VectorMapJoinFastBytesHashSetStore(writeBuffersSize);
+    writeBuffers = hashSetStore.getWriteBuffers();
   }
 
   @Override
   public long getEstimatedMemorySize() {
-    return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+    long size = super.getEstimatedMemorySize();
+    size += hashSetStore.getEstimatedMemorySize();
+    return size;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSetStore.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSetStore.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSetStore.java
new file mode 100644
index 0000000..1a78688
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSetStore.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastBytesHashKeyRef.KeyRef;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
+import org.apache.hadoop.hive.serde2.WriteBuffers.Position;
+
+// import com.google.common.base.Preconditions;
+
+/*
+ * Used by VectorMapJoinFastBytesHashSet to store the key and count for a hash 
set with
+ * a bytes key.
+ */
+public class VectorMapJoinFastBytesHashSetStore implements MemoryEstimate {
+
+  private WriteBuffers writeBuffers;
+
+  /**
+   * A store for a bytes key for a hash set in memory.
+   *
+   * The memory is a "infinite" byte array as a WriteBuffers object.
+   *
+   * We give the client (e.g. hash set logic) a 64-bit key and count reference 
to keep that
+   * has the offset within the "infinite" byte array of the key.  The 64 bits 
includes about half
+   * of the upper hash code to help during matching.
+   *
+   * We optimize the common case when the key length is short and store that 
information in the
+   * 64 bit reference.
+   *
+   * Cases:
+   *
+   *  1) One element when key and is small (and stored in the reference word):
+   *
+   *    Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      |
+   *      v
+   *      <Key Bytes>
+   *        KEY
+   *
+   *  2) One element, general: shows optional big key length.
+   *
+   *   Key and Value Reference
+   *      |
+   *      | absoluteOffset
+   *      |
+   *      |
+   *      v
+   *      [Big Key Length] <Key Bytes>
+   *        optional           KEY
+   */
+
+  public WriteBuffers getWriteBuffers() {
+    return writeBuffers;
+  }
+
+  /**
+   * A hash set result for the key.
+   * It also has support routines for checking the hash code and key equality.
+   *
+   * It implements the standard map join hash set result interface.
+   *
+   */
+  public static class HashSetResult extends VectorMapJoinHashSetResult {
+
+    private VectorMapJoinFastBytesHashSetStore setStore;
+
+    private int keyLength;
+
+    private long absoluteOffset;
+
+    private Position readPos;
+
+    public HashSetResult() {
+      super();
+      readPos = new Position();
+    }
+
+    /**
+     * Setup for reading the key of an entry with the equalKey method.
+     * @param setStore
+     * @param refWord
+     */
+    public void setKey(VectorMapJoinFastBytesHashSetStore setStore, long 
refWord) {
+
+      // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+      this.setStore = setStore;
+
+      absoluteOffset = KeyRef.getAbsoluteOffset(refWord);
+
+      // Position after next relative offset (fixed length) to the key.
+      setStore.writeBuffers.setReadPoint(absoluteOffset, readPos);
+
+      keyLength = KeyRef.getSmallKeyLength(refWord);
+      boolean isKeyLengthSmall = (keyLength != 
KeyRef.SmallKeyLength.allBitsOn);
+      if (!isKeyLengthSmall) {
+
+        // And, if current value is big we must read it.
+        keyLength = setStore.writeBuffers.readVInt(readPos);
+      }
+
+      // NOTE: Reading is now positioned before the key bytes.
+    }
+
+    /**
+     * Compare a key with the key positioned with the setKey method.
+     * @param keyBytes
+     * @param keyStart
+     * @param keyLength
+     * @return
+     */
+    public boolean equalKey(byte[] keyBytes, int keyStart, int keyLength) {
+
+      if (this.keyLength != keyLength) {
+        return false;
+      }
+
+      // Our reading was positioned to the key.
+      if (!setStore.writeBuffers.isEqual(keyBytes, keyStart, readPos, 
keyLength)) {
+        return false;
+      }
+
+      // NOTE: WriteBuffers.isEqual does not advance the read position...
+
+      return true;
+    }
+
+    /**
+     * Mark the key matched with equalKey as a match and read the set 
membership count,
+     * if necessary.
+     */
+    public void setContains() {
+      setJoinResult(JoinResult.MATCH);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(super.toString());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Two 64-bit long result is the key and value reference.
+   * @param partialHashCode
+   * @param keyBytes
+   * @param keyStart
+   * @param keyLength
+   */
+  public long add(long partialHashCode, byte[] keyBytes, int keyStart, int 
keyLength) {
+
+    // We require the absolute offset to be non-zero so the 64 key and value 
reference is non-zero.
+    // So, we make it the offset after the relative offset and to the key.
+    final long absoluteOffset = writeBuffers.getWritePoint();
+
+    // NOTE: In order to guarantee the reference word is non-zero, later we 
will set the
+    // NOTE: single flag.
+
+    boolean isKeyLengthBig = (keyLength >= KeyRef.SmallKeyLength.threshold);
+    if (isKeyLengthBig) {
+      writeBuffers.writeVInt(keyLength);
+    }
+    writeBuffers.write(keyBytes, keyStart, keyLength);
+
+    /*
+     * Form 64 bit key and value reference.
+     */
+    long refWord = partialHashCode;
+
+    refWord |= absoluteOffset << KeyRef.AbsoluteOffset.bitShift;
+
+    if (isKeyLengthBig) {
+      refWord |= KeyRef.SmallKeyLength.allBitsOnBitShifted;
+    } else {
+      refWord |= ((long) keyLength) << KeyRef.SmallKeyLength.bitShift;
+    }
+
+    refWord |= KeyRef.IsSingleFlag.flagOnMask;
+
+    // Preconditions.checkState(!KeyRef.getIsInvalidFlag(refWord));
+
+    return refWord;
+  }
+
+  public VectorMapJoinFastBytesHashSetStore(int writeBuffersSize) {
+    writeBuffers = new WriteBuffers(writeBuffersSize, 
KeyRef.AbsoluteOffset.maxSize);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index f2b794f..223eec3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -27,7 +27,6 @@ import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinByte
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hive.common.util.HashCodeUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -40,7 +39,9 @@ public abstract class VectorMapJoinFastBytesHashTable
 
   private static final Logger LOG = 
LoggerFactory.getLogger(VectorMapJoinFastBytesHashTable.class);
 
-  protected VectorMapJoinFastKeyStore keyStore;
+  protected WriteBuffers writeBuffers;
+
+  protected WriteBuffers.Position unsafeReadPos; // Thread-unsafe position 
used at write time.
 
   protected BytesWritable testKeyBytesWritable;
 
@@ -52,87 +53,36 @@ public abstract class VectorMapJoinFastBytesHashTable
     add(keyBytes, 0, keyLength, currentValue);
   }
 
-  protected abstract void assignSlot(int slot, byte[] keyBytes, int keyStart, 
int keyLength,
-          long hashCode, boolean isNewKey, BytesWritable currentValue);
-
-  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable 
currentValue) {
-
-    if (resizeThreshold <= keysAssigned) {
-      expandAndRehash();
-    }
-
-    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    int intHashCode = (int) hashCode;
-    int slot = (intHashCode & logicalHashBucketMask);
-    long probeSlot = slot;
-    int i = 0;
-    boolean isNewKey;
-    while (true) {
-      int tripleIndex = 3 * slot;
-      if (slotTriples[tripleIndex] == 0) {
-        // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + 
slot + " tripleIndex " + tripleIndex + " empty");
-        isNewKey = true;;
-        break;
-      }
-      if (hashCode == slotTriples[tripleIndex + 1] &&
-          keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, 
keyStart, keyLength)) {
-        // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + 
slot + " tripleIndex " + tripleIndex + " existing");
-        isNewKey = false;
-        break;
-      }
-      // TODO
-      ++metricPutConflict;
-      // Some other key (collision) - keep probing.
-      probeSlot += (++i);
-      slot = (int) (probeSlot & logicalHashBucketMask);
-    }
-
-    if (largestNumberOfSteps < i) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Probed " + i + " slots (the longest so far) to find space");
-      }
-      largestNumberOfSteps = i;
-      // debugDumpKeyProbe(keyOffset, keyLength, hashCode, slot);
-    }
+  public abstract void add(byte[] keyBytes, int keyStart, int keyLength,
+      BytesWritable currentValue);
 
-    assignSlot(slot, keyBytes, keyStart, keyLength, hashCode, isNewKey, 
currentValue);
+  protected void expandAndRehash() {
 
-    if (isNewKey) {
-      keysAssigned++;
-    }
-  }
-
-  private void expandAndRehash() {
-
-    // We allocate triples, so we cannot go above highest Integer power of 2 / 
6.
-    if (logicalHashBucketCount > ONE_SIXTH_LIMIT) {
-      throwExpandError(ONE_SIXTH_LIMIT, "Bytes");
+    // We cannot go above highest Integer power of 2.
+    if (logicalHashBucketCount > HIGHEST_INT_POWER_OF_2) {
+      throwExpandError(HIGHEST_INT_POWER_OF_2, "Bytes");
     }
     int newLogicalHashBucketCount = logicalHashBucketCount * 2;
     int newLogicalHashBucketMask = newLogicalHashBucketCount - 1;
     int newMetricPutConflict = 0;
     int newLargestNumberOfSteps = 0;
 
-    int newSlotTripleArraySize = newLogicalHashBucketCount * 3;
-    long[] newSlotTriples = new long[newSlotTripleArraySize];
+    long[] newSlots = new long[newLogicalHashBucketCount];
 
     for (int slot = 0; slot < logicalHashBucketCount; slot++) {
-      int tripleIndex = slot * 3;
-      long keyRef = slotTriples[tripleIndex];
-      if (keyRef != 0) {
-        long hashCode = slotTriples[tripleIndex + 1];
-        long valueRef = slotTriples[tripleIndex + 2];
+      final long refWord = slots[slot];
+      if (refWord != 0) {
+        final long hashCode =
+            VectorMapJoinFastBytesHashKeyRef.calculateHashCode(
+                refWord, writeBuffers, unsafeReadPos);
 
         // Copy to new slot table.
         int intHashCode = (int) hashCode;
         int newSlot = intHashCode & newLogicalHashBucketMask;
         long newProbeSlot = newSlot;
-        int newTripleIndex;
         int i = 0;
         while (true) {
-          newTripleIndex = newSlot * 3;
-          long newKeyRef = newSlotTriples[newTripleIndex];
-          if (newKeyRef == 0) {
+          if (newSlots[newSlot] == 0) {
             break;
           }
           ++newMetricPutConflict;
@@ -149,81 +99,47 @@ public abstract class VectorMapJoinFastBytesHashTable
           // debugDumpKeyProbe(keyOffset, keyLength, hashCode, slot);
         }
 
-        // Use old value reference word.
-        // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash key " + 
tableKey + " slot " + newSlot + " newPairIndex " + newPairIndex + " empty slot 
(i = " + i + ")");
-
-        newSlotTriples[newTripleIndex] = keyRef;
-        newSlotTriples[newTripleIndex + 1] = hashCode;
-        newSlotTriples[newTripleIndex + 2] = valueRef;
+        // Use old reference word.
+        newSlots[newSlot] = refWord;
       }
     }
 
-    slotTriples = newSlotTriples;
+    slots = newSlots;
     logicalHashBucketCount = newLogicalHashBucketCount;
     logicalHashBucketMask = newLogicalHashBucketMask;
     metricPutConflict = newMetricPutConflict;
     largestNumberOfSteps = newLargestNumberOfSteps;
     resizeThreshold = (int)(logicalHashBucketCount * loadFactor);
     metricExpands++;
-    // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new 
logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + 
resizeThreshold + " metricExpands " + metricExpands);
-  }
-
-  protected final long findReadSlot(
-      byte[] keyBytes, int keyStart, int keyLength, long hashCode, 
WriteBuffers.Position readPos) {
-
-    int intHashCode = (int) hashCode;
-    int slot = (intHashCode & logicalHashBucketMask);
-    long probeSlot = slot;
-    int i = 0;
-    while (true) {
-      int tripleIndex = slot * 3;
-      // LOG.debug("VectorMapJoinFastBytesHashMap findReadSlot slot keyRefWord 
" + Long.toHexString(slotTriples[tripleIndex]) + " hashCode " + 
Long.toHexString(hashCode) + " entry hashCode " + 
Long.toHexString(slotTriples[tripleIndex + 1]) + " valueRefWord " + 
Long.toHexString(slotTriples[tripleIndex + 2]));
-      if (slotTriples[tripleIndex] == 0) {
-        // Given that we do not delete, an empty slot means no match.
-        return -1;
-      } else if (hashCode == slotTriples[tripleIndex + 1]) {
-        // Finally, verify the key bytes match.
-
-        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, 
keyLength, readPos)) {
-          return slotTriples[tripleIndex + 2];
-        }
-      }
-      // Some other key (collision) - keep probing.
-      probeSlot += (++i);
-      if (i > largestNumberOfSteps) {
-        // We know we never went that far when we were inserting.
-        return -1;
-      }
-      slot = (int)(probeSlot & logicalHashBucketMask);
-    }
   }
 
   /*
-   * The hash table slots.  For a bytes key hash table, each slot is 3 longs 
and the array is
-   * 3X sized.
-   *
-   * The slot triple is 1) a non-zero reference word to the key bytes, 2) the 
key hash code, and
-   * 3) a non-zero reference word to the first value bytes.
+   * The hash table slots for fast HashMap.
    */
-  protected long[] slotTriples;
+  protected long[] slots;
 
   private void allocateBucketArray() {
-    // We allocate triples, so we cannot go above highest Integer power of 2 / 
6.
-    if (logicalHashBucketCount > ONE_SIXTH_LIMIT) {
-      throwExpandError(ONE_SIXTH_LIMIT, "Bytes");
+
+    // We cannot go above highest Integer power of 2.
+    if (logicalHashBucketCount > HIGHEST_INT_POWER_OF_2) {
+      throwExpandError(HIGHEST_INT_POWER_OF_2, "Bytes");
     }
-    int slotTripleArraySize = 3 * logicalHashBucketCount;
-    slotTriples = new long[slotTripleArraySize];
+    slots = new long[logicalHashBucketCount];
   }
 
   public VectorMapJoinFastBytesHashTable(
         int initialCapacity, float loadFactor, int writeBuffersSize, long 
estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
+    unsafeReadPos = new WriteBuffers.Position();
     allocateBucketArray();
   }
 
   @Override
   public long getEstimatedMemorySize() {
-    return super.getEstimatedMemorySize() + 
JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length);
+    long size = 0;
+    size += super.getEstimatedMemorySize();
+    size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize();
+    size += JavaDataModel.get().lengthForLongArrayOfSize(slots.length);
+    return size;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 27b6a87..cd952a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -263,11 +263,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   }
 
   public long computeOnlineDataSizeFast3(Statistics statistics) {
-    // The datastructure doing the actual storage during mapjoins has no per 
row orhead;
-    // but uses a 192 bit wide table
     return computeOnlineDataSizeGeneric(statistics,
-        0, // key is stored in a bytearray
-        3 * 8 // maintenance structure consists of 3 longs
+        5 + 4, // list header ; value length stored as vint
+        8 // maintenance structure consists of 1 long
     );
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastBytesHashMap.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastBytesHashMap.java
index 528daf2..9bf8bbc 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastBytesHashMap.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastBytesHashMap.java
@@ -28,6 +28,8 @@ import 
org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.CheckFastHashTable.VerifyFastBytesHashMap;
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import org.junit.Ignore;
 import org.junit.Test;
 
 /*
@@ -299,6 +301,7 @@ public class TestVectorMapJoinFastBytesHashMap extends 
CommonFastHashTable {
     addAndVerifyMultipleKeyMultipleValue(keyCount, map, verifyTable);
   }
 
+  @Ignore
   @Test
   public void testOutOfBounds() throws Exception {
     random = new Random(42662);

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastLongHashMap.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastLongHashMap.java
index bc333e8..a21bdcf 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastLongHashMap.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastLongHashMap.java
@@ -28,6 +28,8 @@ import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.CheckFastHashTable.Ver
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashMap;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -276,6 +278,7 @@ public class TestVectorMapJoinFastLongHashMap extends 
CommonFastHashTable {
     addAndVerifyMultipleKeyMultipleValue(keyCount, map, verifyTable);
   }
 
+  @Ignore
   @Test
   public void testOutOfBounds() throws Exception {
     random = new Random(42662);

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q 
b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
index 2189b96..4b2cad8 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
@@ -79,7 +79,7 @@ set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, b.key from (select key from tab_part_n11 where key > 1) a right 
outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key;
 
-set hive.auto.convert.join.noconditionaltask.size=2800;
+set hive.auto.convert.join.noconditionaltask.size=2000;
 set hive.convert.join.bucket.mapjoin.tez = false;
 explain select a.key, b.key from (select distinct key from tab_n10) a join 
tab_n10 b on b.key = a.key;
 set hive.convert.join.bucket.mapjoin.tez = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/test/queries/clientpositive/tez_smb_main.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q 
b/ql/src/test/queries/clientpositive/tez_smb_main.q
index b88149b..3a75d61 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_main.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_main.q
@@ -69,8 +69,7 @@ from tab_n11 a join tab_part_n12 b on a.key = b.key;
 select count(*)
 from tab_n11 a join tab_part_n12 b on a.key = b.key;
 
-
-set hive.auto.convert.join.noconditionaltask.size=1400;
+set hive.auto.convert.join.noconditionaltask.size=800;
 set hive.mapjoin.hybridgrace.minwbsize=125;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
 set hive.llap.memory.oversubscription.max.executors.per.query=0;

http://git-wip-us.apache.org/repos/asf/hive/blob/ff98a30a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out 
b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index a639b68..f4f8278 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -1021,8 +1021,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Map 2 <- Map 1 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1046,7 +1046,7 @@ STAGE PLANS:
                         value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 4 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: o2
@@ -1059,38 +1059,31 @@ STAGE PLANS:
                       expressions: csmallint (type: smallint), cstring2 (type: 
string)
                       outputColumnNames: _col0, _col2
                       Statistics: Num rows: 136968 Data size: 11042828 Basic 
stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: smallint)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: smallint)
-                        Statistics: Num rows: 136968 Data size: 11042828 Basic 
stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col2 (type: string)
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: smallint)
+                          1 _col0 (type: smallint)
+                        outputColumnNames: _col2, _col5
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 636522 Data size: 114343414 
Basic stats: COMPLETE Column stats: COMPLETE
+                        Select Operator
+                          expressions: hash(_col2,_col5) (type: int)
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 636522 Data size: 114343414 
Basic stats: COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            aggregations: sum(_col0)
+                            mode: hash
+                            outputColumnNames: _col0
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
+                            Reduce Output Operator
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 8 Basic 
stats: COMPLETE Column stats: COMPLETE
+                              value expressions: _col0 (type: bigint)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Reducer 2 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Merge Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 _col0 (type: smallint)
-                  1 _col0 (type: smallint)
-                outputColumnNames: _col2, _col5
-                Statistics: Num rows: 636522 Data size: 114343414 Basic stats: 
COMPLETE Column stats: COMPLETE
-                Select Operator
-                  expressions: hash(_col2,_col5) (type: int)
-                  outputColumnNames: _col0
-                  Statistics: Num rows: 636522 Data size: 114343414 Basic 
stats: COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: sum(_col0)
-                    mode: hash
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
-                    Reduce Output Operator
-                      sort order: 
-                      Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
-                      value expressions: _col0 (type: bigint)
         Reducer 3 
             Execution mode: vectorized, llap
             Reduce Operator Tree:

Reply via email to