This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d21ba11c89 HIVE-26655: VectorUDAFBloomFilterMerge should take care of 
safe batch handling when working in parallel (#4158) (Laszlo Bodor reviewed by 
Ayush Saxena)
4d21ba11c89 is described below

commit 4d21ba11c893038dbecd98c512a81ccf1d4fc6d0
Author: Bodor Laszlo <[email protected]>
AuthorDate: Mon Mar 27 15:21:51 2023 +0200

    HIVE-26655: VectorUDAFBloomFilterMerge should take care of safe batch 
handling when working in parallel (#4158) (Laszlo Bodor reviewed by Ayush 
Saxena)
---
 ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java    | 10 ++++++++++
 .../apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java  | 10 +++++++++-
 .../hadoop/hive/ql/exec/vector/VectorGroupByOperator.java   | 12 ++++++++++++
 .../expressions/aggregates/VectorAggregateExpression.java   |  8 ++++++++
 .../expressions/aggregates/VectorUDAFBloomFilterMerge.java  | 13 ++++++++++---
 5 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index b0525c126db..902d6afdc03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -1555,4 +1555,14 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
       c.replaceTabAlias(oldAlias, newAlias);
     }
   }
+
+  /**
+   * There are aggregate implementations where the contents of the batch are 
processed in an async way,
+   * (e.g. in executors/thread pool), in which cases we have to create a new 
batch every time. This leads to
+   * minor GC pressure, so this feature should be only used when you have the 
evidence that there is an expression
+   * which relies on this, and its implementation has benefits over the 
default, non-cloned approach.
+   */
+  public boolean batchNeedsClone() {
+    return false;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 276d9bb7d49..5c30f2ed994 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -95,6 +95,7 @@ public class ReduceRecordSource implements RecordSource {
 
   private VectorDeserializeRow<LazyBinaryDeserializeRead> 
valueLazyBinaryDeserializeToRow;
 
+  private VectorizedRowBatchCtx batchContext;
   private VectorizedRowBatch batch;
 
   // number of columns pertaining to keys in a vectorized row batch
@@ -178,6 +179,7 @@ public class ReduceRecordSource implements RecordSource {
 
         rowObjectInspector = 
Utilities.constructVectorizedReduceRowOI(keyStructInspector,
             valueStructInspectors);
+        this.batchContext = batchContext;
         batch = batchContext.createVectorizedRowBatch();
 
         // Setup vectorized deserialization for the key and value.
@@ -421,6 +423,9 @@ public class ReduceRecordSource implements RecordSource {
   private void processVectorGroup(BytesWritable keyWritable,
           Iterable<Object> values, byte tag) throws HiveException, IOException 
{
 
+    if (reducer.batchNeedsClone()) {
+      batch = batchContext.createVectorizedRowBatch();
+    }
     Preconditions.checkState(batch.size == 0);
 
     // Deserialize key into vector row columns.
@@ -496,7 +501,10 @@ public class ReduceRecordSource implements RecordSource {
         }
         reducer.process(batch, tag);
       }
-      batch.reset();
+      // reset only when we reuse the batch
+      if (!reducer.batchNeedsClone()) {
+        batch.reset();
+      }
     } catch (Exception e) {
       String rowString = null;
       try {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 0921e657b5d..c2dfaeb074e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -161,6 +161,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
 
   // tracks overall access count in map agg buffer any given time.
   private long totalAccessCount;
+  private boolean batchNeedsClone;
 
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group 
batch
@@ -1157,6 +1158,13 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
         objectInspectors.add(objInsp);
       }
 
+      for (VectorAggregateExpression aggregator : aggregators) {
+        if (aggregator.batchNeedsClone()) {
+          batchNeedsClone = true;
+          break;
+        }
+      }
+
       keyWrappersBatch = 
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
       aggregationBatchInfo = new VectorAggregationBufferBatch();
       aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
@@ -1433,4 +1441,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   public long getMaxMemory() {
     return maxMemory;
   }
+
+  public boolean batchNeedsClone() {
+    return batchNeedsClone;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
index 65a0df474cc..fa238ec5d6a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
@@ -154,5 +154,13 @@ public abstract class VectorAggregateExpression  
implements Serializable {
    */
   public void finish(AggregationBuffer aggregationBuffer, boolean aborted) {
   }
+
+  /**
+   * This method should return true, when the particular 
VectorAggregateExpression must use a cloned batch
+   * while processing one, see Operator.batchNeedsClone() for further details.
+   */
+  public boolean batchNeedsClone() {
+    return false;
+  }
 }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
index ef0e1196c9f..003f08ba731 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -223,9 +223,9 @@ public class VectorUDAFBloomFilterMerge extends 
VectorAggregateExpression {
           merge(currentBf);
         } catch (InterruptedException e) {// Executor.shutdownNow() is called
           if (!queue.isEmpty()){
-            LOG.debug(
-                "bloom filter merge was interrupted while processing and queue 
is still not empty"
-                    + ", this is fine in case of shutdownNow");
+            LOG.info(
+                "bloom filter merge was interrupted while processing and queue 
is still not empty (size: {})"
+                    + ", this is fine in case of shutdownNow", queue.size());
           }
           if (aborted.get()) {
             LOG.info("bloom filter merge was aborted, won't finish 
merging...");
@@ -602,4 +602,11 @@ public class VectorUDAFBloomFilterMerge extends 
VectorAggregateExpression {
     Aggregation bfAgg = (Aggregation) agg;
     outputColVector.setVal(batchIndex, bfAgg.bfBytes, 0, bfAgg.bfBytes.length);
   }
+
+  /**
+   * Let's clone the batch when we're working in parallel, see HIVE-26655.
+   */
+  public boolean batchNeedsClone() {
+    return numThreads > 0;
+  }
 }

Reply via email to