[ 
https://issues.apache.org/jira/browse/DRILL-505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13971666#comment-13971666
 ] 

ASF GitHub Bot commented on DRILL-505:
--------------------------------------

Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11695133
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
 ---
    @@ -0,0 +1,220 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +
    +
    +/*
    + * Helper class for hash join. Keeps track of information about the build 
side batches.
    + *
    + * Hash join is a blocking operator, so we consume all the batches on the 
build side and
    + * store them in a hyper container. The way we can retrieve records from 
the hyper container
    + * is by providing the record index and batch index in the hyper 
container. When we invoke put()
    + * for a given row, hash table returns a global index. We store the 
current row's record index
    + * and batch index in this global index of the startIndices structure.
    + *
    + * Since there can be many rows with the same key on the build side, we 
store the first
    + * index in the startIndices list and the remaining are stored as a 
logical linked list using
    + * the 'links' field in the BuildInfo structures.
    + *
    + * Apart from the indexes into the hyper container, this class also stores 
information about
    + * which records of the build side had a matching record on the probe 
side. Stored in a bitvector
    + * keyMatchBitVector, it is used to retrieve all records that did not 
match a record on probe side
    + * for right outer and full outer joins
    + */
    +public class HashJoinHelper {
    +
    +    /* List of start indexes. Stores the record and batch index of the 
first record
    +     * with a give key.
    +     */
    +    public List<SelectionVector4> startIndices = new ArrayList<>();
    +
    +    // List of BuildInfo structures. Used to maintain auxiliary 
information about the build batches
    +    public List<BuildInfo> buildInfoList = new ArrayList<>();
    +
    +    // Fragment context
    +    public FragmentContext context;
    +
    +    // Constant to indicate index is empty.
    +    public static final int INDEX_EMPTY = -1;
    +
    +    public HashJoinHelper(FragmentContext context) {
    +        this.context = context;
    +    }
    +
    +    public void addStartIndexBatch() throws SchemaChangeException {
    +        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
    +    }
    +
    +    public class BuildInfo {
    +        // List of links. Logically it helps maintain a linked list of 
records with the same key value
    +        private SelectionVector4 links;
    +
    +        // List of bitvectors. Keeps track of records on the build side 
that matched a record on the probe side
    +        private BitSet keyMatchBitVector;
    +
    +        // number of records in this batch
    +        private int recordCount;
    +
    +        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, 
int recordCount) {
    +            this.links = links;
    +            this.keyMatchBitVector = keyMatchBitVector;
    +            this.recordCount = recordCount;
    +        }
    +
    +        public SelectionVector4 getLinks() {
    +            return links;
    +        }
    +
    +        public BitSet getKeyMatchBitVector() {
    +            return keyMatchBitVector;
    +        }
    +    }
    +
    +    public SelectionVector4 getNewSV4(int recordCount) throws 
SchemaChangeException {
    +
    +        ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
    +
    +        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, 
recordCount);
    +
    +        // Initialize the vector
    +        for (int i = 0; i < recordCount; i++) {
    +            sv4.set(i, INDEX_EMPTY);
    +        }
    +
    +        return sv4;
    +    }
    +
    +    public void addNewBatch(int recordCount) throws SchemaChangeException {
    +        // Add a node to the list of BuildInfo's
    +        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new 
BitSet(recordCount), recordCount);
    +        buildInfoList.add(info);
    +    }
    +
    +    public int getStartIndex(int keyIndex) {
    +        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
    +        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
    +
    +        assert batchIdx < startIndices.size();
    +
    +        SelectionVector4 sv4 = startIndices.get(batchIdx);
    +
    +        return sv4.get(offsetIdx);
    +    }
    +
    +    public int getNextIndex(int currentIdx) {
    +        // Get to the links field of the current index to get the next 
index
    +        int batchIdx = currentIdx >>> 16;
    +        int recordIdx = currentIdx & 65535;
    +
    +        assert batchIdx < buildInfoList.size();
    +
    +        // Get the corresponding BuildInfo node
    +        BuildInfo info = buildInfoList.get(batchIdx);
    +        return info.getLinks().get(recordIdx);
    +    }
    +
    +    public LinkedList<Integer> getNextUnmatchedIndex() {
    +        LinkedList<Integer> compositeIndexes = new LinkedList<>();
    --- End diff --
    
    In this and other places where you use LinkedList, do you really need it ?  
If you are only appending to the list, ArrayList should be sufficient and 
better performant, less memory footprint.   


> Implement hash join
> -------------------
>
>                 Key: DRILL-505
>                 URL: https://issues.apache.org/jira/browse/DRILL-505
>             Project: Apache Drill
>          Issue Type: New Feature
>            Reporter: Mehant Baid
>            Assignee: Mehant Baid
>         Attachments: DRILL-505.patch
>
>
> Implement the hash join operator. The first version of the hash join 
> algorithm is a simple in memory implementation only and will not spill to 
> disk 
> Scope is to support inner, left outer, right outer and full outer joins for 
> simple equality conditions.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to