This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f9ac20675 [SPARK-44239][SQL] Free memory allocated by large vectors 
when vectors are reset
27f9ac20675 is described below

commit 27f9ac20675326d4e843387f4bb07415c4f0e8f5
Author: Kun Wan <wan...@apache.org>
AuthorDate: Wed Aug 30 22:36:59 2023 +0800

    [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are 
reset
    
    ### What changes were proposed in this pull request?
    
    Add a memory reserve policy for WritableColumnVector:
    * If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve 
requested capacity * 2 memory.
    * If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve 
requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory.
    * Free the WritableColumnVector memory if the vector capacity >= 
VECTORIZED_HUGE_VECTOR_THRESHOLD
    
    which will reuse the allocated array object for small column vectors and 
free the memory for huge column vectors.
    
    ### Why are the changes needed?
    
    When spark reads a data file into a WritableColumnVector, the memory 
allocated by the WritableColumnVectors is not freed until the 
VectorizedColumnReader completes.
    
    It will save memory allocation time by reusing the allocated array objects. 
But it also takes up too many unused memory after the current large vector 
batch has been read.
    
    Add a memory reserve policy for this scenario which will reuse the 
allocated array object for small column vectors and free the memory for huge 
column vectors.
    
    
![image](https://github.com/apache/spark/assets/3626747/a7a487bd-f184-4b24-bea0-75e530702887)
    
![image](https://github.com/apache/spark/assets/3626747/01d0268f-68e7-416f-b9b3-6c9d60919596)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added UT
    
    Closes #41782 from wankunde/vector.
    
    Lead-authored-by: Kun Wan <wan...@apache.org>
    Co-authored-by: wankun <wan...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 24 +++++++++++++++
 .../execution/vectorized/OffHeapColumnVector.java  |  9 ++++--
 .../execution/vectorized/OnHeapColumnVector.java   |  9 ++++--
 .../execution/vectorized/WritableColumnVector.java | 35 ++++++++++++++++++++--
 .../execution/vectorized/ColumnVectorSuite.scala   | 32 +++++++++++++++++++-
 5 files changed, 100 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index beaf61f0712..8c8b33921e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -487,6 +487,26 @@ object SQLConf {
     .intConf
     .createWithDefault(10000)
 
+  val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO =
+    buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio")
+      .doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or 
the required " +
+        "memory is smaller than 
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " +
+        "reserves required memory * 2 memory; otherwise, spark reserves " +
+        "required memory * this ratio memory, and will release this column 
vector memory before " +
+        "reading the next batch rows.")
+      .version("4.0.0")
+      .doubleConf
+      .createWithDefault(1.2)
+
+  val VECTORIZED_HUGE_VECTOR_THRESHOLD =
+    buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold")
+      .doc("When the required memory is larger than this, spark reserves 
required memory * " +
+        s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and 
release this column " +
+        s"vector memory before reading the next batch rows. -1 means disabling 
the optimization.")
+      .version("4.0.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(-1)
+
   val IN_MEMORY_PARTITION_PRUNING =
     buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
       .internal()
@@ -4662,6 +4682,10 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
+  def vectorizedHugeVectorThreshold: Int = 
getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt
+
+  def vectorizedHugeVectorReserveRatio: Double = 
getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO)
+
   def cacheVectorizedReaderEnabled: Boolean = 
getConf(CACHE_VECTORIZED_READER_ENABLED)
 
   def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 5674a091f6d..bc2636caefd 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -84,9 +84,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
     return data;
   }
 
-  @Override
-  public void close() {
-    super.close();
+  protected void releaseMemory() {
     Platform.freeMemory(nulls);
     Platform.freeMemory(data);
     Platform.freeMemory(lengthData);
@@ -97,6 +95,11 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
     offsetData = 0;
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
+
   //
   // APIs dealing with nulls
   //
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 6e4a9c643e8..56a96907f0f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -80,9 +80,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
     reset();
   }
 
-  @Override
-  public void close() {
-    super.close();
+  protected void releaseMemory() {
     nulls = null;
     byteData = null;
     shortData = null;
@@ -94,6 +92,11 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
     arrayOffsets = null;
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
+
   //
   // APIs dealing with nulls
   //
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index a8e4aad60c2..ac8da471f00 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -53,6 +53,8 @@ import org.apache.spark.unsafe.types.UTF8String;
 public abstract class WritableColumnVector extends ColumnVector {
   private final byte[] byte8 = new byte[8];
 
+  protected abstract void releaseMemory();
+
   /**
    * Resets this column for writing. The currently stored values are no longer 
accessible.
    */
@@ -69,6 +71,12 @@ public abstract class WritableColumnVector extends 
ColumnVector {
       putNotNulls(0, capacity);
       numNulls = 0;
     }
+
+    if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) {
+      capacity = defaultCapacity;
+      releaseMemory();
+      reserveInternal(capacity);
+    }
   }
 
   @Override
@@ -85,6 +93,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
       dictionaryIds = null;
     }
     dictionary = null;
+    releaseMemory();
   }
 
   public void reserveAdditional(int additionalCapacity) {
@@ -95,7 +104,10 @@ public abstract class WritableColumnVector extends 
ColumnVector {
     if (requiredCapacity < 0) {
       throwUnsupportedException(requiredCapacity, null);
     } else if (requiredCapacity > capacity) {
-      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+      int newCapacity =
+          hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ?
+              (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) :
+              (int) Math.min(MAX_CAPACITY, requiredCapacity * 
hugeVectorReserveRatio);
       if (requiredCapacity <= newCapacity) {
         try {
           reserveInternal(newCapacity);
@@ -846,7 +858,14 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   /**
    * Marks this column as being constant.
    */
-  public final void setIsConstant() { isConstant = true; }
+  public final void setIsConstant() {
+    if (childColumns != null) {
+      for (WritableColumnVector c : childColumns) {
+        c.setIsConstant();
+      }
+    }
+    isConstant = true;
+  }
 
   /**
    * Marks this column only contains null values.
@@ -867,12 +886,21 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    */
   protected int capacity;
 
+  /**
+   * The default number of rows that can be stored in this column.
+   */
+  protected final int defaultCapacity;
+
   /**
    * Upper limit for the maximum capacity for this column.
    */
   @VisibleForTesting
   protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
+  protected int hugeVectorThreshold;
+
+  protected double hugeVectorReserveRatio;
+
   /**
    * Number of nulls in this column. This is an optimization for the reader, 
to skip NULL checks.
    */
@@ -922,6 +950,9 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   protected WritableColumnVector(int capacity, DataType dataType) {
     super(dataType);
     this.capacity = capacity;
+    this.defaultCapacity = capacity;
+    this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold();
+    this.hugeVectorReserveRatio = 
SQLConf.get().vectorizedHugeVectorReserveRatio();
 
     if (isArray()) {
       DataType childType;
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 5e06eb729ea..b2b2729e90e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.execution.columnar.ColumnAccessor
 import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarArray
 import org.apache.spark.unsafe.types.UTF8String
 
-class ColumnVectorSuite extends SparkFunSuite {
+class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
   private def withVector(
       vector: WritableColumnVector)(
       block: WritableColumnVector => Unit): Unit = {
@@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-44239: Test column vector reserve policy") {
+    withSQLConf(
+      SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300",
+      SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") {
+      val dataType = ByteType
+
+      Array(new OnHeapColumnVector(80, dataType),
+        new OffHeapColumnVector(80, dataType)).foreach { vector =>
+        try {
+          // The new capacity of small vector = request capacity * 2 and will 
not be reset
+          vector.appendBytes(100, 0)
+          assert(vector.capacity == 200)
+          vector.reset()
+          assert(vector.capacity == 200)
+
+          // The new capacity of huge vector = (request capacity - 
HUGE_VECTOR_THRESHOLD) * 1.2 +
+          // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset.
+          vector.appendBytes(300, 0)
+          assert(vector.capacity == 360)
+          vector.reset()
+          assert(vector.capacity == 80)
+        } finally {
+          vector.close()
+        }
+      }
+    }
+  }
+
   DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
     val structType = new StructType().add(dt.typeName, dt)
     testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to