Repository: hive Updated Branches: refs/heads/branch-2.0 98c559474 -> 74c46e846 refs/heads/branch-2.1 f7fdd4e88 -> 18c679722 refs/heads/master 4b6ac735f -> f008a38b4
HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f008a38b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f008a38b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f008a38b Branch: refs/heads/master Commit: f008a38b433fc287a233c7131b1d7e45f9465557 Parents: 4b6ac73 Author: Sergey Shelukhin <[email protected]> Authored: Wed Aug 3 12:33:35 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Aug 3 12:33:35 2016 -0700 ---------------------------------------------------------------------- .../persistence/BytesBytesMultiHashMap.java | 34 +++++++------- .../ql/exec/vector/VectorMapJoinOperator.java | 2 +- .../ql/exec/vector/VectorizedBatchUtil.java | 13 +++--- .../mapjoin/VectorMapJoinCommonOperator.java | 1 + .../VectorMapJoinGenerateResultOperator.java | 1 + .../fast/VectorMapJoinFastBytesHashMap.java | 2 +- .../VectorMapJoinFastBytesHashMultiSet.java | 2 +- .../fast/VectorMapJoinFastBytesHashSet.java | 2 +- .../fast/VectorMapJoinFastBytesHashTable.java | 8 ++-- .../fast/VectorMapJoinFastHashTable.java | 2 +- .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 16 ++++--- .../fast/VectorMapJoinFastLongHashTable.java | 8 ++-- .../fast/VectorMapJoinFastTableContainer.java | 10 ++-- .../hashtable/VectorMapJoinHashTableResult.java | 8 ++++ .../apache/hadoop/hive/serde2/WriteBuffers.java | 49 +++++++++++--------- 15 files changed, 90 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 51acae0..dd88461 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -449,7 +449,7 @@ public final class BytesBytesMultiHashMap { kv.writeKey(writeBuffers); int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset); - int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode; + int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode; int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode); // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -683,8 +683,8 @@ public final class BytesBytesMultiHashMap { if (!compareHashBits(ref, hashCode)) { return false; // Hash bits in ref don't match. } - writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null)); - int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong(); + writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null)); + int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong(); if (keyLength != cmpLength) { return false; } @@ -730,7 +730,7 @@ public final class BytesBytesMultiHashMap { private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) { long tailOffset = Ref.getOffset(ref); if (Ref.hasList(ref)) { - long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5) + long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5) : writeBuffers.readNByteLong(tailOffset, 5, readPos); tailOffset += relativeOffset; } @@ -763,10 +763,10 @@ public final class BytesBytesMultiHashMap { // TODO: we could actually store a bit flag in ref indicating whether this is a hash // match or a probe, and in the former case use hash bits (for a first few resizes). // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount); - writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null)); + writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null)); // Read the value and key length for the first record. - int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef) - - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4); + int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef) + - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4); int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode); maxSteps = Math.max(probeSteps, maxSteps); } @@ -785,16 +785,16 @@ public final class BytesBytesMultiHashMap { private long createOrGetListRecord(long ref) { if (Ref.hasList(ref)) { // LOG.info("Found list record at " + writeBuffers.getReadPoint()); - return writeBuffers.getReadPoint(); // Assumes we are here after key compare. + return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare. } long firstTailOffset = Ref.getOffset(ref); // LOG.info("First tail offset to create list record is " + firstTailOffset); // Determine the length of storage for value and key lengths of the first record. - writeBuffers.setReadPoint(firstTailOffset); - writeBuffers.skipVLong(); - writeBuffers.skipVLong(); - int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset); + writeBuffers.setUnsafeReadPoint(firstTailOffset); + writeBuffers.unsafeSkipVLong(); + writeBuffers.unsafeSkipVLong(); + int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset); // Create the list record, copy first record value/key lengths there. writeBuffers.writeBytes(firstTailOffset, lengthsLength); @@ -816,7 +816,7 @@ public final class BytesBytesMultiHashMap { */ private void addRecordToList(long lrPtrOffset, long tailOffset) { // Now, insert this record into the list. - long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5); + long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5); // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset); assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset. writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset); @@ -885,10 +885,10 @@ public final class BytesBytesMultiHashMap { ++examined; long recOffset = getFirstRecordLengthsOffset(ref, null); long tailOffset = Ref.getOffset(ref); - writeBuffers.setReadPoint(recOffset); - int valueLength = (int)writeBuffers.readVLong(), - keyLength = (int)writeBuffers.readVLong(); - long ptrOffset = writeBuffers.getReadPoint(); + writeBuffers.setUnsafeReadPoint(recOffset); + int valueLength = (int)writeBuffers.unsafeReadVLong(), + keyLength = (int)writeBuffers.unsafeReadVLong(); + long ptrOffset = writeBuffers.getUnsafeReadPoint(); if (Ref.hasList(ref)) { byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset)); } http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 3323df3..0cb6c8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -154,7 +154,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { // This is a vectorized aware evaluator ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) { - int columnIndex;; + int columnIndex; int writerIndex; public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) { http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 9471e66..990e896 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -702,14 +702,15 @@ public class VectorizedBatchUtil { public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { StringBuilder sb = new StringBuilder(); + LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString()); + } + + public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch, + int index, String prefix, StringBuilder sb) { sb.append(prefix + " row " + index + " "); for (int p = 0; p < batch.projectionSize; p++) { int column = batch.projectedColumns[p]; - if (p == column) { - sb.append("(col " + p + ") "); - } else { - sb.append("(proj col " + p + " col " + column + ") "); - } + sb.append("(" + p + "," + column + ") "); ColumnVector colVector = batch.cols[column]; if (colVector == null) { sb.append("(null ColumnVector)"); @@ -752,7 +753,7 @@ public class VectorizedBatchUtil { } sb.append(" "); } - LOG.info(sb.toString()); + return sb; } public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 8ad7ca4..24668f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -636,6 +636,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem default: throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name()); } + LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName()); } /* http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 6a3d64b..22eb07e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -507,6 +507,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf, smallTable); needHashTableSetup = true; + LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName()); if (isLogDebugEnabled) { LOG.debug(CLASS_NAME + " reloadHashTable!"); http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index a4bc188..d878f65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -81,7 +81,7 @@ public abstract class VectorMapJoinFastBytesHashMap optimizedHashMapResult.forget(); long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength); - long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode); + long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos()); JoinUtil.JoinResult joinResult; if (valueRefWord == -1) { joinResult = JoinUtil.JoinResult.NOMATCH; http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index aaf3497..b328efd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -75,7 +75,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet optimizedHashMultiSetResult.forget(); long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength); - long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode); + long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos()); JoinUtil.JoinResult joinResult; if (count == -1) { joinResult = JoinUtil.JoinResult.NOMATCH; http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 841183e..c9b23bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -65,7 +65,7 @@ public abstract class VectorMapJoinFastBytesHashSet optimizedHashSetResult.forget(); long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength); - long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode); + long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos()); JoinUtil.JoinResult joinResult; if (existance == -1) { joinResult = JoinUtil.JoinResult.NOMATCH; http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index d6e107b..7987723 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.WriteBuffers; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.common.util.HashCodeUtil; @@ -75,7 +76,7 @@ public abstract class VectorMapJoinFastBytesHashTable break; } if (hashCode == slotTriples[tripleIndex + 1] && - keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) { + keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) { // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing"); isNewKey = false; break; @@ -164,7 +165,8 @@ public abstract class VectorMapJoinFastBytesHashTable // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands); } - protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) { + protected final long findReadSlot( + byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) { int intHashCode = (int) hashCode; int slot = (intHashCode & logicalHashBucketMask); @@ -176,7 +178,7 @@ public abstract class VectorMapJoinFastBytesHashTable if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) { // Finally, verify the key bytes match. - if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) { + if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) { return slotTriples[tripleIndex + 2]; } } http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index 099f38e..7df9eed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab protected int logicalHashBucketMask; protected float loadFactor; - protected int writeBuffersSize; + protected final int writeBuffersSize; protected int metricPutConflict; protected int largestNumberOfSteps; http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java index efdcd43..be51693 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java @@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore { private WriteBuffers writeBuffers; - private WriteBuffers.Position readPos; + private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time. /** * A store for arbitrary length keys in memory. @@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore { return keyRefWord; } - public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) { + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) { + return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos); + } + + public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength, + WriteBuffers.Position readPos) { int storedKeyLengthLength = (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift); @@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore { public VectorMapJoinFastKeyStore(int writeBuffersSize) { writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize); - - readPos = new WriteBuffers.Position(); + unsafeReadPos = new WriteBuffers.Position(); } public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) { // TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize. this.writeBuffers = writeBuffers; - - readPos = new WriteBuffers.Position(); + unsafeReadPos = new WriteBuffers.Position(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index 78b55a1..5373aad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -48,13 +48,13 @@ public abstract class VectorMapJoinFastLongHashTable private transient final boolean isLogDebugEnabled = LOG.isDebugEnabled(); - private HashTableKeyType hashTableKeyType; + private final HashTableKeyType hashTableKeyType; - private boolean isOuterJoin; + private final boolean isOuterJoin; - private BinarySortableDeserializeRead keyBinarySortableDeserializeRead; + private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead; - private boolean useMinMax; + private final boolean useMinMax; private long min; private long max; http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 3b73f7d..9f3b107 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai private final long keyCount; - private final VectorMapJoinFastHashTable VectorMapJoinFastHashTable; + private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable; public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf, long keyCount) throws SerDeException { @@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai // LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold); - VectorMapJoinFastHashTable = createHashTable(newThreshold); + vectorMapJoinFastHashTable = createHashTable(newThreshold); } @Override public VectorMapJoinHashTable vectorMapJoinHashTable() { - return VectorMapJoinFastHashTable; + return vectorMapJoinFastHashTable; } private VectorMapJoinFastHashTable createHashTable(int newThreshold) { @@ -178,7 +178,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai throws SerDeException, HiveException, IOException { // We are not using the key and value contexts, nor do we support a MapJoinKey. - VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue); + vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue); return null; } @@ -214,7 +214,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai @Override public int size() { - return VectorMapJoinFastHashTable.size(); + return vectorMapJoinFastHashTable.size(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java index ce598e3..b51d6fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.serde2.WriteBuffers; /* * Root abstract class for a hash table result. @@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult { private int spillPartitionId; + private final WriteBuffers.Position readPos; + public VectorMapJoinHashTableResult() { joinResult = JoinUtil.JoinResult.NOMATCH; spillPartitionId = -1; + readPos = new WriteBuffers.Position(); } /** @@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult { sb.append("joinResult " + joinResult.name()); return sb.toString(); } + + public WriteBuffers.Position getReadPos() { + return readPos; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index 5900428..a4ecd9f 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2; import java.nio.ByteBuffer; import java.util.ArrayList; -import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.io.WritableUtils; import org.apache.hive.common.util.HashCodeUtil; @@ -53,7 +50,7 @@ public final class WriteBuffers implements RandomAccessOutput { } Position writePos = new Position(); // Position where we'd write - Position defaultReadPos = new Position(); // Position where we'd read (by default). + Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time). public WriteBuffers(int wbSize, long maxSize) { @@ -64,16 +61,18 @@ public final class WriteBuffers implements RandomAccessOutput { writePos.bufferIndex = -1; } - public int readVInt() { - return (int) readVLong(defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public int unsafeReadVInt() { + return (int) readVLong(unsafeReadPos); } public int readVInt(Position readPos) { return (int) readVLong(readPos); } - public long readVLong() { - return readVLong(defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public long unsafeReadVLong() { + return readVLong(unsafeReadPos); } public long readVLong(Position readPos) { @@ -97,8 +96,9 @@ public final class WriteBuffers implements RandomAccessOutput { return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); } - public void skipVLong() { - skipVLong(defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public void unsafeSkipVLong() { + skipVLong(unsafeReadPos); } public void skipVLong(Position readPos) { @@ -117,8 +117,9 @@ public final class WriteBuffers implements RandomAccessOutput { } } - public void setReadPoint(long offset) { - setReadPoint(offset, defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public void setUnsafeReadPoint(long offset) { + setReadPoint(offset, unsafeReadPos); } public void setReadPoint(long offset, Position readPos) { @@ -127,8 +128,9 @@ public final class WriteBuffers implements RandomAccessOutput { readPos.offset = getOffset(offset); } - public int hashCode(long offset, int length) { - return hashCode(offset, length, defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public int unsafeHashCode(long offset, int length) { + return hashCode(offset, length, unsafeReadPos); } public int hashCode(long offset, int length, Position readPos) { @@ -352,7 +354,7 @@ public final class WriteBuffers implements RandomAccessOutput { private void clearState() { writePos.clear(); - defaultReadPos.clear(); + unsafeReadPos.clear(); } @@ -363,8 +365,9 @@ public final class WriteBuffers implements RandomAccessOutput { return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset; } - public long getReadPoint() { - return getReadPoint(defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public long getUnsafeReadPoint() { + return getReadPoint(unsafeReadPos); } public long getReadPoint(Position readPos) { @@ -518,8 +521,9 @@ public final class WriteBuffers implements RandomAccessOutput { clearState(); } - public long readNByteLong(long offset, int bytes) { - return readNByteLong(offset, bytes, defaultReadPos); + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public long unsafeReadNByteLong(long offset, int bytes) { + return readNByteLong(offset, bytes, unsafeReadPos); } public long readNByteLong(long offset, int bytes, Position readPos) { @@ -561,7 +565,7 @@ public final class WriteBuffers implements RandomAccessOutput { } public int readInt(long offset) { - return (int)readNByteLong(offset, 4); + return (int)unsafeReadNByteLong(offset, 4); } @Override @@ -606,7 +610,8 @@ public final class WriteBuffers implements RandomAccessOutput { return writeBuffers.size() * (long) wbSize; } - public Position getReadPosition() { - return defaultReadPos; + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ + public Position getUnsafeReadPosition() { + return unsafeReadPos; } }
