This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6267185 Drill 6735: Implement Semi-Join for the Hash-Join operator
(#1522)
6267185 is described below
commit 6267185823c4c50ab31c029ee5b4d9df2fc94d03
Author: Boaz Ben-Zvi <[email protected]>
AuthorDate: Tue Nov 13 12:58:31 2018 -0800
Drill 6735: Implement Semi-Join for the Hash-Join operator (#1522)
---
.../drill/exec/physical/base/AbstractJoinPop.java | 7 ++-
.../drill/exec/physical/config/HashJoinPOP.java | 17 +++++-
.../drill/exec/physical/config/LateralJoinPOP.java | 2 +-
.../drill/exec/physical/config/MergeJoinPOP.java | 2 +-
.../exec/physical/config/NestedLoopJoinPOP.java | 2 +-
.../exec/physical/impl/common/HashPartition.java | 12 ++--
.../exec/physical/impl/join/HashJoinBatch.java | 13 +++-
.../impl/join/HashJoinHelperUnusedSizeImpl.java | 38 ++++++++++++
.../impl/join/HashJoinMemoryCalculatorImpl.java | 24 +++++---
.../exec/physical/impl/join/HashJoinProbe.java | 5 +-
.../physical/impl/join/HashJoinProbeTemplate.java | 71 +++++++++++-----------
.../apache/drill/exec/planner/RuleInstance.java | 1 +
.../drill/exec/planner/physical/HashJoinPrel.java | 2 +-
.../java-exec/src/main/resources/drill-module.conf | 2 +-
.../physical/impl/common/HashPartitionTest.java | 6 +-
.../impl/join/TestBuildSidePartitioningImpl.java | 18 +++---
.../join/TestHashJoinHelperSizeCalculatorImpl.java | 3 +
.../impl/join/TestPostBuildCalculationsImpl.java | 20 +++---
18 files changed, 163 insertions(+), 82 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
index a624f5c..628dde7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractJoinPop.java
@@ -33,16 +33,19 @@ public abstract class AbstractJoinPop extends AbstractBase {
protected final JoinRelType joinType;
+ protected final boolean semiJoin;
+
protected final LogicalExpression condition;
protected final List<JoinCondition> conditions;
public AbstractJoinPop(PhysicalOperator leftOp, PhysicalOperator rightOp,
- JoinRelType joinType, LogicalExpression joinCondition,
+ JoinRelType joinType, boolean semiJoin,
LogicalExpression joinCondition,
List<JoinCondition> joinConditions) {
left = leftOp;
right = rightOp;
this.joinType = joinType;
+ this.semiJoin = semiJoin;
condition = joinCondition;
conditions = joinConditions;
}
@@ -69,6 +72,8 @@ public abstract class AbstractJoinPop extends AbstractBase {
return joinType;
}
+ public boolean isSemiJoin() { return semiJoin; }
+
public LogicalExpression getCondition() { return condition; }
public List<JoinCondition> getConditions() {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 4df9c38..35c187c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -54,10 +54,11 @@ public class HashJoinPOP extends AbstractJoinPop {
public HashJoinPOP(@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("conditions") List<JoinCondition>
conditions,
@JsonProperty("joinType") JoinRelType joinType,
+ @JsonProperty("semiJoin") boolean semiJoin,
@JsonProperty("runtimeFilterDef") RuntimeFilterDef
runtimeFilterDef,
@JsonProperty("isRowKeyJoin") boolean isRowKeyJoin,
@JsonProperty("joinControl") int joinControl) {
- super(left, right, joinType, null, conditions);
+ super(left, right, joinType, semiJoin,null, conditions);
Preconditions.checkArgument(joinType != null, "Join type is missing for
HashJoin Pop");
this.runtimeFilterDef = runtimeFilterDef;
this.isRowKeyJoin = isRowKeyJoin;
@@ -68,6 +69,16 @@ public class HashJoinPOP extends AbstractJoinPop {
@VisibleForTesting
public HashJoinPOP(PhysicalOperator left, PhysicalOperator right,
List<JoinCondition> conditions,
+ JoinRelType joinType,
+ RuntimeFilterDef runtimeFilterDef,
+ boolean isRowKeyJoin,
+ int joinControl){
+ this(left, right, conditions, joinType, false, runtimeFilterDef,
isRowKeyJoin, joinControl);
+ }
+
+ @VisibleForTesting
+ public HashJoinPOP(PhysicalOperator left, PhysicalOperator right,
+ List<JoinCondition> conditions,
JoinRelType joinType) {
this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT);
}
@@ -84,7 +95,7 @@ public class HashJoinPOP extends AbstractJoinPop {
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2);
- HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0),
children.get(1), conditions, joinType, runtimeFilterDef,
+ HashJoinPOP newHashJoin = new HashJoinPOP(children.get(0),
children.get(1), conditions, joinType, semiJoin, runtimeFilterDef,
isRowKeyJoin, joinControl);
newHashJoin.setMaxAllocation(getMaxAllocation());
newHashJoin.setSubScanForRowKeyJoin(this.getSubScanForRowKeyJoin());
@@ -116,7 +127,7 @@ public class HashJoinPOP extends AbstractJoinPop {
for (JoinCondition c : conditions) {
flippedConditions.add(c.flip());
}
- return new HashJoinPOP(right, left, flippedConditions,
JoinRelType.LEFT, runtimeFilterDef, isRowKeyJoin, joinControl);
+ return new HashJoinPOP(right, left, flippedConditions,
JoinRelType.LEFT, semiJoin, runtimeFilterDef, isRowKeyJoin, joinControl);
} else {
return this;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index ff3e0b2..9e00f7c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -51,7 +51,7 @@ public class LateralJoinPOP extends AbstractJoinPop {
@JsonProperty("joinType") JoinRelType joinType,
@JsonProperty("implicitRIDColumn") String implicitRIDColumn,
@JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
- super(left, right, joinType, null, null);
+ super(left, right, joinType, false, null, null);
Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join is currently not supported with Lateral Join");
Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 51ebb51..eb1a31a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -41,7 +41,7 @@ public class MergeJoinPOP extends AbstractJoinPop{
@JsonProperty("conditions") List<JoinCondition> conditions,
@JsonProperty("joinType") JoinRelType joinType
) {
- super(left, right, joinType, null, conditions);
+ super(left, right, joinType, false, null, conditions);
Preconditions.checkArgument(joinType != null, "Join type is missing!");
Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join not currently supported with Merge Join");
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
index 79e33e6..5783b4e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
@@ -40,7 +40,7 @@ public class NestedLoopJoinPOP extends AbstractJoinPop {
@JsonProperty("joinType") JoinRelType joinType,
@JsonProperty("condition") LogicalExpression condition
) {
- super(left, right, joinType, condition, null);
+ super(left, right, joinType, false, condition, null);
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 86b870d..275cf16 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -125,9 +125,10 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
private long partitionInMemorySize;
private long numInMemoryRecords;
private boolean updatedRecordsPerBatch = false;
+ private boolean semiJoin;
public HashPartition(FragmentContext context, BufferAllocator allocator,
ChainedHashTable baseHashTable,
- RecordBatch buildBatch, RecordBatch probeBatch,
+ RecordBatch buildBatch, RecordBatch probeBatch, boolean
semiJoin,
int recordsPerBatch, SpillSet spillSet, int partNum,
int cycleNum, int numPartitions) {
this.allocator = allocator;
this.buildBatch = buildBatch;
@@ -137,6 +138,7 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
this.partitionNum = partNum;
this.cycleNum = cycleNum;
this.numPartitions = numPartitions;
+ this.semiJoin = semiJoin;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -151,7 +153,7 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
} catch (SchemaChangeException sce) {
throw new IllegalStateException("Unexpected Schema Change while creating
a hash table",sce);
}
- this.hjHelper = new HashJoinHelper(context, allocator);
+ this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
if ( numPartitions > 1 ) {
allocateNewCurrentBatchAndHV();
@@ -391,7 +393,7 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
return Pair.of(compositeIndex, matchExists);
}
public int getNextIndex(int compositeIndex) {
- // in case of iner rows with duplicate keys, get the next one
+ // in case of inner rows with duplicate keys, get the next one
return hjHelper.getNextIndex(compositeIndex);
}
public boolean setRecordMatched(int compositeIndex) {
@@ -504,7 +506,7 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
final int currentRecordCount = nextBatch.getRecordCount();
// For every incoming build batch, we create a matching helper batch
- hjHelper.addNewBatch(currentRecordCount);
+ if ( ! semiJoin ) { hjHelper.addNewBatch(currentRecordCount); }
// Holder contains the global index where the key is hashed into using
the hash table
final IndexPointer htIndex = new IndexPointer();
@@ -527,7 +529,7 @@ public class HashPartition implements
HashJoinMemoryCalculator.PartitionStat {
* the current record index and batch index. This will be used
* later when we probe and find a match.
*/
- hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */,
recInd);
+ if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /*
buildBatchIndex */, recInd); }
}
containers.add(nextBatch);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2f17ff2..f1c6181 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -122,6 +122,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
+ private boolean semiJoin;
private boolean joinIsLeftOrFull;
private boolean joinIsRightOrFull;
private boolean skipHashTableBuild; // when outer side is empty, and the
join is inner or left (see DRILL-6755)
@@ -486,7 +487,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
final double hashTableDoublingFactor =
context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
final String hashTableCalculatorType =
context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
- return new HashJoinMemoryCalculatorImpl(safetyFactor,
fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
+ return new HashJoinMemoryCalculatorImpl(safetyFactor,
fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType,
semiJoin);
} else {
return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
}
@@ -566,6 +567,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
hashJoinProbe.setupHashJoinProbe(probeBatch,
this,
joinType,
+ semiJoin,
leftUpstream,
partitions,
spilledState.getCycle(),
@@ -777,7 +779,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we
process the spilled files
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
- partitions[part] = new HashPartition(context, allocator, baseHashTable,
buildBatch, probeBatch,
+ partitions[part] = new HashPartition(context, allocator, baseHashTable,
buildBatch, probeBatch, semiJoin,
RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(),
numPartitions);
}
@@ -998,6 +1000,10 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
: read_right_HV_vector.getAccessor().get(ind); // get the hash
value from the HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
+ // semi-join skips join-key-duplicate rows
+ if ( semiJoin ) {
+
+ }
// Append the new inner row to the appropriate partition; spill
(that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
hashCode, buildCalc); // may spill if needed
}
@@ -1093,7 +1099,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
private void setupOutputContainerSchema() {
- if (buildSchema != null) {
+ if (buildSchema != null && ! semiJoin ) {
for (final MaterializedField field : buildSchema) {
final MajorType inputType = field.getType();
final MajorType outputType;
@@ -1160,6 +1166,7 @@ public class HashJoinBatch extends
AbstractBinaryRecordBatch<HashJoinPOP> implem
this.buildBatch = right;
this.probeBatch = left;
joinType = popConfig.getJoinType();
+ semiJoin = popConfig.isSemiJoin();
joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType ==
JoinRelType.FULL;
joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType ==
JoinRelType.FULL;
conditions = popConfig.getConditions();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java
new file mode 100644
index 0000000..25a985b
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperUnusedSizeImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * This calculator class is used when the Hash-Join_helper is not used (i.e.,
return size of zero)
+ */
+public class HashJoinHelperUnusedSizeImpl implements
HashJoinHelperSizeCalculator {
+ public static final HashJoinHelperUnusedSizeImpl INSTANCE = new
HashJoinHelperUnusedSizeImpl();
+
+ private HashJoinHelperUnusedSizeImpl() {
+ // Do nothing
+ }
+
+ @Override
+ public long calculateSize(HashJoinMemoryCalculator.PartitionStat
partitionStat, double fragmentationFactor) {
+ Preconditions.checkArgument(!partitionStat.isSpilled());
+
+ return 0;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index e663595..88f3ddc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -39,6 +39,7 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
private final double fragmentationFactor;
private final double hashTableDoublingFactor;
private final String hashTableCalculatorType;
+ private final boolean semiJoin;
private boolean initialized = false;
private boolean doMemoryCalculation;
@@ -46,11 +47,13 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
public HashJoinMemoryCalculatorImpl(final double safetyFactor,
final double fragmentationFactor,
final double hashTableDoublingFactor,
- final String hashTableCalculatorType) {
+ final String hashTableCalculatorType,
+ boolean semiJoin) {
this.safetyFactor = safetyFactor;
this.fragmentationFactor = fragmentationFactor;
this.hashTableDoublingFactor = hashTableDoublingFactor;
this.hashTableCalculatorType = hashTableCalculatorType;
+ this.semiJoin = semiJoin;
}
public void initialize(boolean doMemoryCalculation) {
@@ -76,8 +79,8 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
return new BuildSidePartitioningImpl(
BatchSizePredictorImpl.Factory.INSTANCE,
hashTableSizeCalculator,
- HashJoinHelperSizeCalculatorImpl.INSTANCE,
- fragmentationFactor, safetyFactor);
+ semiJoin ? HashJoinHelperUnusedSizeImpl.INSTANCE :
HashJoinHelperSizeCalculatorImpl.INSTANCE,
+ fragmentationFactor, safetyFactor, semiJoin);
} else {
return new NoopBuildSidePartitioningImpl();
}
@@ -184,6 +187,7 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
private final double fragmentationFactor;
private final double safetyFactor;
+ private final boolean semiJoin;
private int maxBatchNumRecordsBuild;
private int maxBatchNumRecordsProbe;
@@ -217,12 +221,14 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
final HashTableSizeCalculator
hashTableSizeCalculator,
final HashJoinHelperSizeCalculator
hashJoinHelperSizeCalculator,
final double fragmentationFactor,
- final double safetyFactor) {
+ final double safetyFactor,
+ boolean semiJoin) {
this.batchSizePredictorFactory =
Preconditions.checkNotNull(batchSizePredictorFactory);
this.hashTableSizeCalculator =
Preconditions.checkNotNull(hashTableSizeCalculator);
this.hashJoinHelperSizeCalculator =
Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
this.fragmentationFactor = fragmentationFactor;
this.safetyFactor = safetyFactor;
+ this.semiJoin = semiJoin;
}
@Override
@@ -470,7 +476,8 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
fragmentationFactor,
safetyFactor,
loadFactor,
- reserveHash);
+ reserveHash,
+ semiJoin);
}
@Override
@@ -575,6 +582,7 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
private final double safetyFactor;
private final double loadFactor;
private final boolean reserveHash;
+ private final boolean semiJoin;
private boolean initialized;
private long consumedMemory;
@@ -596,7 +604,8 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
final double fragmentationFactor,
final double safetyFactor,
final double loadFactor,
- final boolean reserveHash) {
+ final boolean reserveHash,
+ boolean semiJoin) {
this.firstCycle = firstCycle;
this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor);
this.memoryAvailable = memoryAvailable;
@@ -609,6 +618,7 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
this.safetyFactor = safetyFactor;
this.loadFactor = loadFactor;
this.reserveHash = reserveHash;
+ this.semiJoin = semiJoin;
this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe;
}
@@ -774,7 +784,7 @@ public class HashJoinMemoryCalculatorImpl implements
HashJoinMemoryCalculator {
// Some of our probe side batches were spilled so we have to recursively
process the partitions.
return new HashJoinMemoryCalculatorImpl(
- safetyFactor, fragmentationFactor,
hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType());
+ safetyFactor, fragmentationFactor,
hashTableSizeCalculator.getDoublingFactor(), hashTableSizeCalculator.getType(),
semiJoin);
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index beddfa6..490eba4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -39,7 +39,10 @@ public interface HashJoinProbe {
PROBE_PROJECT, PROJECT_RIGHT, DONE
}
- void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing,
JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState,
HashPartition[] partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean
buildSideIsEmpty, int numPartitions, int rightHVColPosition);
+ void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing,
JoinRelType joinRelType, boolean semiJoin,
+ RecordBatch.IterOutcome leftStartState,
HashPartition[] partitions, int cycleNum,
+ VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions, int
rightHVColPosition);
int probeAndProject() throws SchemaChangeException;
void changeToFinalProbeState();
void setTargetOutputCount(int targetOutputCount);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 57b2d5b..c549143 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -88,8 +88,9 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
private int numPartitions = 1; // must be 2 to the power of bitsInMask
private int partitionMask = 0; // numPartitions - 1
private int bitsInMask = 0; // number of bits in the MASK
- private int rightHVColPosition;
+ private int numberOfBuildSideColumns;
private int targetOutputRecords;
+ private boolean semiJoin;
@Override
public void setTargetOutputCount(int targetOutputRecords) {
@@ -106,6 +107,7 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
* @param probeBatch
* @param outgoing
* @param joinRelType
+ * @param semiJoin
* @param leftStartState
* @param partitions
* @param cycleNum
@@ -116,7 +118,10 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
* @param rightHVColPosition
*/
@Override
- public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch
outgoing, JoinRelType joinRelType, IterOutcome leftStartState, HashPartition[]
partitions, int cycleNum, VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners, boolean
buildSideIsEmpty, int numPartitions, int rightHVColPosition) {
+ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch
outgoing, JoinRelType joinRelType, boolean semiJoin,
+ IterOutcome leftStartState, HashPartition[]
partitions, int cycleNum,
+ VectorContainer container,
HashJoinBatch.HashJoinSpilledPartition[] spilledInners,
+ boolean buildSideIsEmpty, int numPartitions,
int rightHVColPosition) {
this.container = container;
this.spilledInners = spilledInners;
this.probeBatch = probeBatch;
@@ -127,7 +132,8 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
this.cycleNum = cycleNum;
this.buildSideIsEmpty = buildSideIsEmpty;
this.numPartitions = numPartitions;
- this.rightHVColPosition = rightHVColPosition;
+ this.numberOfBuildSideColumns = semiJoin ? 0 : rightHVColPosition; //
position (0 based) of added column == #columns
+ this.semiJoin = semiJoin;
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
@@ -165,35 +171,31 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
* Append the given build side row into the outgoing container
* @param buildSrcContainer The container for the right/inner side
* @param buildSrcIndex build side index
- * @return The index for the last column (where the probe side would
continue copying)
*/
- private int appendBuild(VectorContainer buildSrcContainer, int
buildSrcIndex) {
- // "- 1" to skip the last "hash values" added column
- int lastColIndex = buildSrcContainer.getNumberOfColumns() - 1;
- for (int vectorIndex = 0; vectorIndex < lastColIndex; vectorIndex++) {
+ private void appendBuild(VectorContainer buildSrcContainer, int
buildSrcIndex) {
+ for (int vectorIndex = 0; vectorIndex < numberOfBuildSideColumns;
vectorIndex++) {
ValueVector destVector =
container.getValueVector(vectorIndex).getValueVector();
ValueVector srcVector =
buildSrcContainer.getValueVector(vectorIndex).getValueVector();
destVector.copyEntry(container.getRecordCount(), srcVector,
buildSrcIndex);
}
- return lastColIndex;
}
/**
* Append the given probe side row into the outgoing container, following
the build side part
* @param probeSrcContainer The container for the left/outer side
* @param probeSrcIndex probe side index
- * @param baseIndex The column index to start copying into (following the
build columns)
*/
- private void appendProbe(VectorContainer probeSrcContainer, int
probeSrcIndex, int baseIndex) {
- for (int vectorIndex = baseIndex; vectorIndex <
container.getNumberOfColumns(); vectorIndex++) {
+ private void appendProbe(VectorContainer probeSrcContainer, int
probeSrcIndex) {
+ for (int vectorIndex = numberOfBuildSideColumns; vectorIndex <
container.getNumberOfColumns(); vectorIndex++) {
ValueVector destVector =
container.getValueVector(vectorIndex).getValueVector();
- ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex -
baseIndex).getValueVector();
+ ValueVector srcVector = probeSrcContainer.getValueVector(vectorIndex -
numberOfBuildSideColumns).getValueVector();
destVector.copyEntry(container.getRecordCount(), srcVector,
probeSrcIndex);
}
}
/**
* A special version of the VectorContainer's appendRow for the HashJoin;
(following a probe) it
* copies the build and probe sides into the outgoing container. (It uses a
composite
- * index for the build side)
+ * index for the build side). If any of the build/probe source containers
is null, then that side
+ * is not appended (effectively outputing nulls for that side's columns).
* @param buildSrcContainers The containers list for the right/inner side
* @param compositeBuildSrcIndex Composite build index
* @param probeSrcContainer The single container for the left/outer side
@@ -202,29 +204,20 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
*/
private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int
compositeBuildSrcIndex,
VectorContainer probeSrcContainer, int probeSrcIndex) {
- int buildBatchIndex = compositeBuildSrcIndex >>> 16;
- int buildOffset = compositeBuildSrcIndex & 65535;
- int baseInd = 0;
- if ( buildSrcContainers != null ) { baseInd =
appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset); }
- if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer,
probeSrcIndex, baseInd); }
+
+ if ( buildSrcContainers != null ) {
+ int buildBatchIndex = compositeBuildSrcIndex >>> 16;
+ int buildOffset = compositeBuildSrcIndex & 65535;
+ appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
+ }
+ if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer,
probeSrcIndex); }
return container.incRecordCount();
}
/**
- * A customised version of the VectorContainer's appendRow for HashJoin -
used for Left
- * Outer Join when there is no build side match - hence need a base index in
- * this container's wrappers from where to start appending
- * @param probeSrcContainer
- * @param probeSrcIndex
- * @param baseInd - index of this container's wrapper to start at
- * @return
+ * After the "inner" probe phase, finish up a Right (of Full) Join by
projecting the unmatched rows of the build side
+ * @param currBuildPart Which partition
*/
- private int outputOuterRow(VectorContainer probeSrcContainer, int
probeSrcIndex, int baseInd) {
- appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
- return container.incRecordCount();
- }
-
-
private void executeProjectRightPhase(int currBuildPart) {
while (outputRecords < targetOutputRecords && recordsProcessed <
recordsToProcess) {
outputRecords =
@@ -319,6 +312,16 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
}
+ if ( semiJoin ) {
+ if ( probeIndex != -1 ) {
+ // output the probe side only
+ outputRecords =
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
+ }
+ recordsProcessed++;
+ continue; // no build-side duplicates, go on to the next probe-side
row
+ }
+
if (probeIndex != -1) {
/* The current probe record has a key that matches. Get the index
@@ -366,8 +369,8 @@ public abstract class HashJoinProbeTemplate implements
HashJoinProbe {
// If we have a left outer join, project the outer side
if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- outputRecords =
- outputOuterRow(probeBatch.getContainer(), recordsProcessed,
rightHVColPosition);
+ outputRecords = // output only the probe side (the build side
would be all nulls)
+ outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
}
recordsProcessed++;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index b14488c..86a03b5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.volcano.AbstractConverter;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 6480f3d..25ceccd 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -165,7 +165,7 @@ public class HashJoinPrel extends JoinPrel {
buildJoinConditions(conditions, leftFields, rightFields, leftKeys,
rightKeys);
RuntimeFilterDef runtimeFilterDef = this.getRuntimeFilterDef();
- HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype,
runtimeFilterDef, isRowKeyJoin, htControl);
+ HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype,
isSemiJoin, runtimeFilterDef, isRowKeyJoin, htControl);
return creator.addMetadata(this, hjoin);
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index f083c66..5981f2d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,7 +514,7 @@ drill.exec.options: {
planner.enable_hash_single_key: true,
planner.enable_hashagg: true,
planner.enable_hashjoin: true,
- planner.enable_semijoin: false,
+ planner.enable_semijoin: true,
planner.enable_hashjoin_swap: true,
planner.enable_hep_opt: true,
planner.enable_hep_partition_pruning: true,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index e2f80d8..93475ac 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -110,8 +110,7 @@ public class HashPartitionTest {
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch,
- 10,
+ probeBatch, false, 10,
spillSet,
0,
0,
@@ -210,8 +209,7 @@ public class HashPartitionTest {
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch,
- 10,
+ probeBatch, false, 10,
spillSet,
0,
0,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index 8d0d36b..bff28a8 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -45,7 +45,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
final long accountedProbeBatchSize = firstCycle? 0: 10;
@@ -90,7 +90,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -134,7 +134,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -179,7 +179,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -225,7 +225,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -258,7 +258,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -302,7 +302,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -357,7 +357,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
@@ -407,7 +407,7 @@ public class TestBuildSidePartitioningImpl {
new
HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT,
HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor,
- safetyFactor);
+ safetyFactor, false);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index b9ae58d..5f7a36a 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -40,5 +40,8 @@ public class TestHashJoinHelperSizeCalculatorImpl {
long actual =
HashJoinHelperSizeCalculatorImpl.INSTANCE.calculateSize(partitionStat, 1.0);
Assert.assertEquals(expected, actual);
+
+ long shouldBeZero =
HashJoinHelperUnusedSizeImpl.INSTANCE.calculateSize(partitionStat, 1.0);
+ Assert.assertEquals(0, shouldBeZero);
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
index 89c56b1..0636cb7 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestPostBuildCalculationsImpl.java
@@ -137,7 +137,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor, // fragmentationFactor
safetyFactor, // safetyFactor
.75, // loadFactor
- false); // reserveHash
+ false, false); // reserveHash
calc.initialize(true);
}
@@ -181,7 +181,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- true);
+ true, false);
calc.initialize(true);
@@ -241,7 +241,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
@@ -307,7 +307,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
@@ -373,7 +373,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
@@ -445,7 +445,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- true);
+ true, false);
calc.initialize(false);
@@ -509,7 +509,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
@@ -583,7 +583,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
@@ -650,7 +650,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
Assert.assertFalse(calc.shouldSpill());
@@ -705,7 +705,7 @@ public class TestPostBuildCalculationsImpl {
fragmentationFactor,
safetyFactor,
.75,
- false);
+ false, false);
calc.initialize(false);
}