This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4d21ba11c89 HIVE-26655: VectorUDAFBloomFilterMerge should take care of
safe batch handling when working in parallel (#4158) (Laszlo Bodor reviewed by
Ayush Saxena)
4d21ba11c89 is described below
commit 4d21ba11c893038dbecd98c512a81ccf1d4fc6d0
Author: Bodor Laszlo <[email protected]>
AuthorDate: Mon Mar 27 15:21:51 2023 +0200
HIVE-26655: VectorUDAFBloomFilterMerge should take care of safe batch
handling when working in parallel (#4158) (Laszlo Bodor reviewed by Ayush
Saxena)
---
ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java | 10 ++++++++++
.../apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java | 10 +++++++++-
.../hadoop/hive/ql/exec/vector/VectorGroupByOperator.java | 12 ++++++++++++
.../expressions/aggregates/VectorAggregateExpression.java | 8 ++++++++
.../expressions/aggregates/VectorUDAFBloomFilterMerge.java | 13 ++++++++++---
5 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index b0525c126db..902d6afdc03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -1555,4 +1555,14 @@ public abstract class Operator<T extends OperatorDesc>
implements Serializable,C
c.replaceTabAlias(oldAlias, newAlias);
}
}
+
+ /**
+ * There are aggregate implementations where the contents of the batch are
processed in an async way,
+ * (e.g. in executors/thread pool), in which cases we have to create a new
batch every time. This leads to
+ * minor GC pressure, so this feature should be only used when you have the
evidence that there is an expression
+ * which relies on this, and its implementation has benefits over the
default, non-cloned approach.
+ */
+ public boolean batchNeedsClone() {
+ return false;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 276d9bb7d49..5c30f2ed994 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -95,6 +95,7 @@ public class ReduceRecordSource implements RecordSource {
private VectorDeserializeRow<LazyBinaryDeserializeRead>
valueLazyBinaryDeserializeToRow;
+ private VectorizedRowBatchCtx batchContext;
private VectorizedRowBatch batch;
// number of columns pertaining to keys in a vectorized row batch
@@ -178,6 +179,7 @@ public class ReduceRecordSource implements RecordSource {
rowObjectInspector =
Utilities.constructVectorizedReduceRowOI(keyStructInspector,
valueStructInspectors);
+ this.batchContext = batchContext;
batch = batchContext.createVectorizedRowBatch();
// Setup vectorized deserialization for the key and value.
@@ -421,6 +423,9 @@ public class ReduceRecordSource implements RecordSource {
private void processVectorGroup(BytesWritable keyWritable,
Iterable<Object> values, byte tag) throws HiveException, IOException
{
+ if (reducer.batchNeedsClone()) {
+ batch = batchContext.createVectorizedRowBatch();
+ }
Preconditions.checkState(batch.size == 0);
// Deserialize key into vector row columns.
@@ -496,7 +501,10 @@ public class ReduceRecordSource implements RecordSource {
}
reducer.process(batch, tag);
}
- batch.reset();
+ // reset only when we reuse the batch
+ if (!reducer.batchNeedsClone()) {
+ batch.reset();
+ }
} catch (Exception e) {
String rowString = null;
try {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 0921e657b5d..c2dfaeb074e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -161,6 +161,7 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
// tracks overall access count in map agg buffer any given time.
private long totalAccessCount;
+ private boolean batchNeedsClone;
/**
* Interface for processing mode: global, hash, unsorted streaming, or group
batch
@@ -1157,6 +1158,13 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
objectInspectors.add(objInsp);
}
+ for (VectorAggregateExpression aggregator : aggregators) {
+ if (aggregator.batchNeedsClone()) {
+ batchNeedsClone = true;
+ break;
+ }
+ }
+
keyWrappersBatch =
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
aggregationBatchInfo = new VectorAggregationBufferBatch();
aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
@@ -1433,4 +1441,8 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
public long getMaxMemory() {
return maxMemory;
}
+
+ public boolean batchNeedsClone() {
+ return batchNeedsClone;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
index 65a0df474cc..fa238ec5d6a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
@@ -154,5 +154,13 @@ public abstract class VectorAggregateExpression
implements Serializable {
*/
public void finish(AggregationBuffer aggregationBuffer, boolean aborted) {
}
+
+ /**
+ * This method should return true, when the particular
VectorAggregateExpression must use a cloned batch
+ * while processing one, see Operator.batchNeedsClone() for further details.
+ */
+ public boolean batchNeedsClone() {
+ return false;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
index ef0e1196c9f..003f08ba731 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -223,9 +223,9 @@ public class VectorUDAFBloomFilterMerge extends
VectorAggregateExpression {
merge(currentBf);
} catch (InterruptedException e) {// Executor.shutdownNow() is called
if (!queue.isEmpty()){
- LOG.debug(
- "bloom filter merge was interrupted while processing and queue
is still not empty"
- + ", this is fine in case of shutdownNow");
+ LOG.info(
+ "bloom filter merge was interrupted while processing and queue
is still not empty (size: {})"
+ + ", this is fine in case of shutdownNow", queue.size());
}
if (aborted.get()) {
LOG.info("bloom filter merge was aborted, won't finish
merging...");
@@ -602,4 +602,11 @@ public class VectorUDAFBloomFilterMerge extends
VectorAggregateExpression {
Aggregation bfAgg = (Aggregation) agg;
outputColVector.setVal(batchIndex, bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
}
+
+ /**
+ * Let's clone the batch when we're working in parallel, see HIVE-26655.
+ */
+ public boolean batchNeedsClone() {
+ return numThreads > 0;
+ }
}