[
https://issues.apache.org/jira/browse/DRILL-4411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497360#comment-16497360
]
ASF GitHub Bot commented on DRILL-4411:
---------------------------------------
ilooner closed pull request #381: DRILL-4411: hash join should limit batch
based on size and number of records
URL: https://github.com/apache/drill/pull/381
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 3ea97c6581..c5941cd61a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -35,6 +35,7 @@
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.HashJoinPOP;
@@ -106,6 +107,8 @@
// Schema of the build side
private BatchSchema rightSchema = null;
+ private BufferAllocator outputAllocator = null;
+
// Generator mapping for the build side
// Generator mapping for the build side : scalar
@@ -126,7 +129,6 @@
"doSetup" /* eval method */,
null /* reset */, null /* cleanup */);
-
// Mapping set for the build side
private final MappingSet projectBuildMapping =
new MappingSet("buildIndex" /* read index */, "outIndex" /* write index
*/, "buildBatch" /* read container */,
@@ -166,6 +168,10 @@ public int getRecordCount() {
return outputRecords;
}
+ public long getMemoryUsed() {
+ return outputAllocator.getAllocatedMemory();
+ }
+
@Override
protected void buildSchema() throws SchemaChangeException {
leftUpstream = next(left);
@@ -182,7 +188,11 @@ protected void buildSchema() throws SchemaChangeException {
}
// Initialize the hash join helper context
- hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+ // Use a suballocator to keep track of how big our output is becoming!
+ BufferAllocator parentAllocator = this.oContext.getAllocator();
+ outputAllocator =
parentAllocator.newChildAllocator("HashJoinOutputAllocator", 0,
parentAllocator.getLimit());
+ container.setAllocator(outputAllocator);
+ hjHelper = new HashJoinHelper(context, outputAllocator);
try {
rightSchema = right.getSchema();
final VectorContainer vectors = new VectorContainer(oContext);
@@ -532,6 +542,13 @@ public void close() {
if (hashTable != null) {
hashTable.clear();
}
+
+ container.clear();
+
+ if (outputAllocator != null) {
+ outputAllocator.close();
+ }
+
super.close();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 5531bc7e29..3cb055214d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -47,7 +47,17 @@
private HashJoinBatch outgoingJoinBatch = null;
- private static final int TARGET_RECORDS_PER_BATCH = 4000;
+ // As long as the batch size is less than targetBatchSizeInBytes AND the
number of records is less than
+ // targetRecordsPerBatch, we can still add records to this batch. If we
exceed the number of records before hitting
+ // the targetBatchSizeInBytes (e.g. we accumulate 4000 records and the batch
is smaller than 10 MB), then nothing
+ // special happens. However, if we exceed the targetBatchSizeInBytes, that
means that the rows are pretty big and
+ // we make the following adjustments.
+ // 1) Set targetRecordsPerBatch = MAX(1, currentNumRecords/2)
+ // 2) Set targetBatchSizeInBytes = current allocator size. Let's say, with
the latest record added, the allocator
+ // is 12 MB in size (> 10MB, which is the current targetBatchSizeInBytes).
Then, when we start the next batch, the
+ // allocator automatically allocates 12 MB (not the default value, which
would have been smaller than 10 MB).
+ private int targetRecordsPerBatch = 4000;
+ private long targetBatchSizeInBytes = 10*1024*1024;
/* Helper class
* Maintains linked list of build side records with the same key
@@ -98,7 +108,9 @@ public void setupHashJoinProbe(FragmentContext context,
VectorContainer buildBat
}
public void executeProjectRightPhase() {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed <
recordsToProcess) {
+ while (outputRecords < targetRecordsPerBatch
+ && recordsProcessed < recordsToProcess
+ && outgoingJoinBatch.getMemoryUsed() < targetBatchSizeInBytes) {
projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed),
outputRecords);
recordsProcessed++;
outputRecords++;
@@ -106,7 +118,10 @@ public void executeProjectRightPhase() {
}
public void executeProbePhase() throws SchemaChangeException {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState !=
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+ while (outputRecords < targetRecordsPerBatch
+ && outgoingJoinBatch.getMemoryUsed() < targetBatchSizeInBytes
+ && probeState != ProbeState.DONE
+ && probeState != ProbeState.PROJECT_RIGHT) {
// Check if we have processed all records in this batch we need to
invoke next
if (recordsProcessed == recordsToProcess) {
@@ -237,6 +252,10 @@ public int probeAndProject() throws SchemaChangeException,
ClassTransformationEx
executeProjectRightPhase();
}
+ if (outgoingJoinBatch.getMemoryUsed() >= targetBatchSizeInBytes) {
+ targetRecordsPerBatch = Math.max(1, outputRecords/2);
+ targetBatchSizeInBytes = outgoingJoinBatch.getMemoryUsed()+1;
+ }
return outputRecords;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 663cf22b05..a6a4f4e8f7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -46,6 +46,7 @@
private int recordCount = -1;
private OperatorContext oContext;
private boolean schemaChanged = true; // Schema has changed since last
built. Must rebuild schema
+ private BufferAllocator allocator;
public VectorContainer() {
this.oContext = null;
@@ -53,6 +54,11 @@ public VectorContainer() {
public VectorContainer( OperatorContext oContext) {
this.oContext = oContext;
+ allocator = this.oContext.getAllocator();
+ }
+
+ public void setAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
}
@Override
@@ -122,13 +128,13 @@ void transferOut(VectorContainer containerOut) {
if (id != null) {
vector = getValueAccessorById(id.getFieldIds()).getValueVector();
if (id.getFieldIds().length == 1 && clazz != null &&
!clazz.isAssignableFrom(vector.getClass())) {
- final ValueVector newVector = TypeHelper.getNewVector(field,
this.oContext.getAllocator(), callBack);
+ final ValueVector newVector = TypeHelper.getNewVector(field,
this.allocator, callBack);
replace(vector, newVector);
return (T) newVector;
}
} else {
- vector = TypeHelper.getNewVector(field, this.oContext.getAllocator(),
callBack);
+ vector = TypeHelper.getNewVector(field, this.allocator, callBack);
add(vector);
}
return (T) vector;
@@ -199,7 +205,7 @@ public int compare(VectorWrapper<?> v1, VectorWrapper<?>
v2) {
}
private void cloneAndTransfer(VectorWrapper<?> wrapper) {
- wrappers.add(wrapper.cloneAndTransfer(oContext.getAllocator()));
+ wrappers.add(wrapper.cloneAndTransfer(allocator));
}
public void addCollection(Iterable<ValueVector> vectors) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> HashJoin should not only depend on number of records, but also on size
> ----------------------------------------------------------------------
>
> Key: DRILL-4411
> URL: https://issues.apache.org/jira/browse/DRILL-4411
> Project: Apache Drill
> Issue Type: Bug
> Components: Server
> Reporter: MinJi Kim
> Assignee: MinJi Kim
> Priority: Major
>
> In HashJoinProbeTemplate, each batch is limited to TARGET_RECORDS_PER_BATCH
> (4000). But we should not only depend on the number of records, but also
> size (in case of extremely large records).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)