Github user amansinha100 commented on a diff in the pull request:
https://github.com/apache/drill/pull/822#discussion_r117052757
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
---
@@ -544,86 +572,69 @@ private static int roundUpToPowerOf2(int number) {
return rounded;
}
- @Override
- public void put(int incomingRowIdx, IndexPointer htIdxHolder, int
retryCount) {
- put(incomingRowIdx, htIdxHolder);
+ public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+ return getHashBuild(incomingRowIdx);
}
- private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
+ /** put() uses the hash code (from gethashCode() above) to insert the
key(s) from the incoming
+ * row into the hash table. The code selects the bucket in the
startIndices, then the keys are
+ * placed into the chained list - by storing the key values into a
batch, and updating its
+ * "links" member. Last it modifies the index holder to the batch offset
so that the caller
+ * can store the remaining parts of the row into a matching batch
(outside the hash table).
+ * Returning
+ *
+ * @param incomingRowIdx - position of the incoming row
+ * @param htIdxHolder - to return batch + batch-offset (for caller to
manage a matching batch)
+ * @param hashCode - computed over the key(s) by calling getHashCode()
+ * @return Status - the key(s) was ADDED or was already PRESENT
+ */
+ @Override
+ public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int
hashCode) throws SchemaChangeException {
- int hash = getHashBuild(incomingRowIdx);
- int i = getBucketIndex(hash, numBuckets());
- int startIdx = startIndices.getAccessor().get(i);
+ int bucketIndex = getBucketIndex(hashCode, numBuckets());
+ int startIdx = startIndices.getAccessor().get(bucketIndex);
int currentIdx;
- int currentIdxWithinBatch;
- BatchHolder bh;
BatchHolder lastEntryBatch = null;
int lastEntryIdxWithinBatch = EMPTY_SLOT;
+ // if startIdx is non-empty, follow the hash chain links until we find
a matching
+ // key or reach the end of the chain (and remember the last link there)
+ for ( currentIdxHolder.value = startIdx;
+ currentIdxHolder.value != EMPTY_SLOT;
+ /* isKeyMatch() below also advances the currentIdxHolder to the
next link */) {
- if (startIdx == EMPTY_SLOT) {
- // this is the first entry in this bucket; find the first available
slot in the
- // container of keys and values
- currentIdx = freeIndex++;
- addBatchIfNeeded(currentIdx);
+ // remember the current link, which would be the last when the next
link is empty
+ lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) &
HashTable.BATCH_MASK);
+ lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
- if (EXTRA_DEBUG) {
- logger.debug("Empty bucket index = {}. incomingRowIdx = {};
inserting new entry at currentIdx = {}.", i,
- incomingRowIdx, currentIdx);
+ if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder,
false)) {
+ htIdxHolder.value = currentIdxHolder.value;
+ return PutStatus.KEY_PRESENT;
}
-
- insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch,
lastEntryIdxWithinBatch);
- // update the start index array
- startIndices.getMutator().setSafe(getBucketIndex(hash,
numBuckets()), currentIdx);
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
}
- currentIdx = startIdx;
- boolean found = false;
-
- bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
- currentIdxHolder.value = currentIdx;
-
- // if startIdx is non-empty, follow the hash chain links until we find
a matching
- // key or reach the end of the chain
- while (true) {
- currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
+ // no match was found, so insert a new entry
+ currentIdx = freeIndex++;
+ boolean addedBatch = addBatchIfNeeded(currentIdx);
- if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
- htIdxHolder.value = currentIdxHolder.value;
- found = true;
- break;
- } else if (currentIdxHolder.value == EMPTY_SLOT) {
- lastEntryBatch = bh;
- lastEntryIdxWithinBatch = currentIdxWithinBatch;
- break;
- } else {
- bh = batchHolders.get((currentIdxHolder.value >>> 16) &
HashTable.BATCH_MASK);
- lastEntryBatch = bh;
- }
+ if (EXTRA_DEBUG) {
+ logger.debug("No match was found for incomingRowIdx = {}; inserting
new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
}
- if (!found) {
- // no match was found, so insert a new entry
- currentIdx = freeIndex++;
- addBatchIfNeeded(currentIdx);
+ insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch,
lastEntryIdxWithinBatch);
- if (EXTRA_DEBUG) {
- logger.debug("No match was found for incomingRowIdx = {};
inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
- }
-
- insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch,
lastEntryIdxWithinBatch);
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
+ // if there was no hash chain at this bucket, need to update the start
index array
+ if (startIdx == EMPTY_SLOT) {
+ startIndices.getMutator().setSafe(getBucketIndex(hashCode,
numBuckets()), currentIdx);
}
-
- return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
+ htIdxHolder.value = currentIdx;
+ return addedBatch ? PutStatus.NEW_BATCH_ADDED :
+ ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
--- End diff --
Does KEY_ADDED_LAST indicate the last key in a single batch or last key
across all batches added so far ? It seems it is the latter but would it be
sufficient to check whether the last batch is filled up because the previous
batches would have already been filled up ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---