Github user mehant commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11759876
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
    @@ -0,0 +1,519 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import org.eigenbase.rel.JoinRelType;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import com.sun.codemodel.JExpression;
    +import com.sun.codemodel.JVar;
    +import com.sun.codemodel.JExpr;
    +
    +import org.apache.drill.exec.compile.sig.GeneratorMapping;
    +import org.apache.drill.exec.compile.sig.MappingSet;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.expr.holders.IntHolder;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.config.HashJoinPOP;
    +import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTableConfig;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.ExpandableHyperContainer;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.allocator.VectorAllocator;
    +
    +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
    +    // Probe side record batch
    +    private final RecordBatch left;
    +
    +    // Build side record batch
    +    private final RecordBatch right;
    +
    +    // Join type, INNER, LEFT, RIGHT or OUTER
    +    private final JoinRelType joinType;
    +
    +    // hash table configuration, created in HashJoinPOP
    +    private HashTableConfig htConfig;
    +
    +    // Runtime generated class implementing HashJoin interface
    +    private HashJoin hashJoin = null;
    +
    +    /* Helper class
    +     * Maintains linked list of build side records with the same key
    +     * Keeps information about which build records have a corresponding
    +     * matching key in the probe side (for outer, right joins)
    +     */
    +    private HashJoinHelper hjHelper = null;
    +
    +    // Underlying hashtable used by the hash join
    +    private HashTable hashTable = null;
    +
    +    /* Hyper container to store all build side record batches.
    +     * Records are retrieved from this container when there is a matching 
record
    +     * on the probe side
    +     */
    +    private ExpandableHyperContainer hyperContainer;
    +
    +    // Number of records to process on the probe side
    +    private int recordsToProcess;
    +
    +    // Number of records processed on the probe side
    +    private int recordsProcessed;
    +
    +    // Number of records in the output container
    +    private int outputRecords;
    +
    +    // Current batch index on the build side
    +    private int buildBatchIndex = 0;
    +
    +    // Indicate if we should drain the next record from the probe side
    +    private boolean getNextRecord = true;
    +
    +    // Contains both batch idx and record idx of the matching record in 
the build side
    +    private int currentCompositeIdx = -1;
    +
    +    // Current state the hash join algorithm is in
    +    private HashJoinState hjState = HashJoinState.BUILD;
    +
    +    // For outer or right joins, this is a list of unmatched records that 
needs to be projected
    +    private LinkedList<Integer> unmatchedBuildIndexes = null;
    +
    +    // List of vector allocators
    +    private LinkedList<VectorAllocator> allocators = null;
    +
    +    // Schema of the build side
    +    private BatchSchema rightSchema = null;
    +
    +    // Generator mapping for the build side
    +    private static final GeneratorMapping PROJECT_BUILD = 
GeneratorMapping.create("doSetup"/* setup method */,
    +                                                                           
       "projectBuildRecord" /* eval method */,
    +                                                                           
       null /* reset */, null /* cleanup */);
    +
    +    // Generator mapping for the probe side
    +    private static final GeneratorMapping PROJECT_PROBE = 
GeneratorMapping.create("doSetup" /* setup method */,
    +                                                                           
       "projectProbeRecord" /* eval method */,
    +                                                                           
       null /* reset */, null /* cleanup */);
    +
    +    // Mapping set for the build side
    +    private final MappingSet projectBuildMapping = new 
MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  
"buildBatch" /* read container */,
    +                                                                  
"outgoing" /* write container */,
    +                                                                  
PROJECT_BUILD, PROJECT_BUILD);
    +
    +    // Mapping set for the probe side
    +    private final MappingSet projectProbeMapping = new 
MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  
"probeBatch" /* read container */,
    +                                                                  
"outgoing" /* write container */,
    +                                                                  
PROJECT_PROBE, PROJECT_PROBE);
    +
    +    // Possible states for the hash join
    +    public static enum HashJoinState {
    +        BUILD, // Build phase, go through the 'right' record batches and 
create the hash table
    +        PROBE, // Go through the 'left' record batches, probe the hash 
table project the records if the key matches
    +        PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the 
build records that didn't match a probe record and project them
    +        DONE // Done processing all records get out
    +    }
    +
    +    @Override
    +    public int getRecordCount() {
    +        return outputRecords;
    +    }
    +
    +
    +    @Override
    +    public IterOutcome next() {
    +
    +        IterOutcome leftUpstream = IterOutcome.NONE;
    +
    +        try {
    +
    +            if (hjState == HashJoinState.BUILD) {
    +
    +                // Initialize the hash join helper context
    +                hjHelper = new HashJoinHelper(context);
    +
    +                /* Build phase requires setting up the hash table. Hash 
table will
    +                 * materialize both the build and probe side expressions 
while
    +                 * creating the hash table. So we need to invoke next() on 
our probe batch
    +                 * as well, for the materialization to be successful. This 
batch will not be used
    +                 * till we complete the build phase.
    +                 */
    +                left.next();
    --- End diff --
    
    Yes, we call next() on the probe side exactly once before we complete the 
build phase. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to