Github user arina-ielchiieva commented on a diff in the pull request:
https://github.com/apache/drill/pull/794#discussion_r108382080
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
---
@@ -40,132 +41,133 @@
// Record count of the left batch currently being processed
private int leftRecordCount = 0;
- // List of record counts per batch in the hyper container
+ // List of record counts per batch in the hyper container
private List<Integer> rightCounts = null;
// Output batch
private NestedLoopJoinBatch outgoing = null;
- // Next right batch to process
- private int nextRightBatchToProcess = 0;
-
- // Next record in the current right batch to process
- private int nextRightRecordToProcess = 0;
-
- // Next record in the left batch to process
- private int nextLeftRecordToProcess = 0;
+ // Iteration status tracker
+ private IterationStatusTracker tracker = new IterationStatusTracker();
/**
* Method initializes necessary state and invokes the doSetup() to set
the
- * input and output value vector references
+ * input and output value vector references.
+ *
* @param context Fragment context
* @param left Current left input batch being processed
* @param rightContainer Hyper container
+ * @param rightCounts Counts for each right container
* @param outgoing Output batch
*/
- public void setupNestedLoopJoin(FragmentContext context, RecordBatch
left,
+ public void setupNestedLoopJoin(FragmentContext context,
+ RecordBatch left,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
- leftRecordCount = left.getRecordCount();
+ this.leftRecordCount = left.getRecordCount();
this.rightCounts = rightCounts;
this.outgoing = outgoing;
doSetup(context, rightContainer, left, outgoing);
}
/**
- * This method is the core of the nested loop join. For every record on
the right we go over
- * the left batch and produce the cross product output
+ * Main entry point for producing the output records. Thin wrapper
around populateOutgoingBatch(), this method
+ * controls which left batch we are processing and fetches the next left
input batch once we exhaust the current one.
+ *
+ * @param joinType join type (INNER ot LEFT)
+ * @return the number of records produced in the output batch
+ */
+ public int outputRecords(JoinRelType joinType) {
+ int outputIndex = 0;
+ while (leftRecordCount != 0) {
+ outputIndex = populateOutgoingBatch(joinType, outputIndex);
+ if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ break;
+ }
+ // reset state and get next left batch
+ resetAndGetNextLeft();
+ }
+ return outputIndex;
+ }
+
+ /**
+ * This method is the core of the nested loop join.For each left batch
record looks for matching record
+ * from the list of right batches. Match is checked by calling {@link
#doEval(int, int, int)} method.
+ * If matching record is found both left and right records are written
into output batch,
+ * otherwise if join type is LEFT, than only left record is written,
right batch record values will be null.
+ *
+ * @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
- private int populateOutgoingBatch(int outputIndex) {
-
- // Total number of batches on the right side
- int totalRightBatches = rightCounts.size();
-
- // Total number of records on the left
- int localLeftRecordCount = leftRecordCount;
-
- /*
- * The below logic is the core of the NLJ. To have better performance
we copy the instance members into local
- * method variables, once we are done with the loop we need to update
the instance variables to reflect the new
- * state. To avoid code duplication of resetting the instance members
at every exit point in the loop we are using
- * 'goto'
- */
- int localNextRightBatchToProcess = nextRightBatchToProcess;
- int localNextRightRecordToProcess = nextRightRecordToProcess;
- int localNextLeftRecordToProcess = nextLeftRecordToProcess;
-
- outer: {
-
- for (; localNextRightBatchToProcess< totalRightBatches;
localNextRightBatchToProcess++) { // for every batch on the right
- int compositeIndexPart = localNextRightBatchToProcess << 16;
- int rightRecordCount =
rightCounts.get(localNextRightBatchToProcess);
-
- for (; localNextRightRecordToProcess < rightRecordCount;
localNextRightRecordToProcess++) { // for every record in this right batch
- for (; localNextLeftRecordToProcess < localLeftRecordCount;
localNextLeftRecordToProcess++) { // for every record in the left batch
-
+ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex)
{
+ // copy index and match counters as local variables to speed up
processing
+ int nextRightBatchToProcess = tracker.getNextRightBatchToProcess();
+ int nextRightRecordToProcess = tracker.getNextRightRecordToProcess();
+ int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess();
+ boolean rightRecordMatched = tracker.isRightRecordMatched();
+
+ outer:
+ // for every record in the left batch
+ for (; nextLeftRecordToProcess < leftRecordCount;
nextLeftRecordToProcess++) {
+ // for every batch on the right
+ for (; nextRightBatchToProcess < rightCounts.size();
nextRightBatchToProcess++) {
+ int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
--- End diff --
Here we keep track of right records within one right record batch. We have
list of right records counts `List<Integer> rightCounts`. This list is
populated with record counts from each right input record batch [1],
`getRecordCount()` returns int [2].
[1]
https://github.com/apache/drill/blob/2af709f43d01f341b2a52c6473ea49d6761fdc61/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java#L349
[2]
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java#L229
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---