This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 27f9ac20675 [SPARK-44239][SQL] Free memory allocated by large vectors
when vectors are reset
27f9ac20675 is described below
commit 27f9ac20675326d4e843387f4bb07415c4f0e8f5
Author: Kun Wan <[email protected]>
AuthorDate: Wed Aug 30 22:36:59 2023 +0800
[SPARK-44239][SQL] Free memory allocated by large vectors when vectors are
reset
### What changes were proposed in this pull request?
Add a memory reserve policy for WritableColumnVector:
* If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve
requested capacity * 2 memory.
* If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve
requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory.
* Free the WritableColumnVector memory if the vector capacity >=
VECTORIZED_HUGE_VECTOR_THRESHOLD
which will reuse the allocated array object for small column vectors and
free the memory for huge column vectors.
### Why are the changes needed?
When spark reads a data file into a WritableColumnVector, the memory
allocated by the WritableColumnVectors is not freed until the
VectorizedColumnReader completes.
It will save memory allocation time by reusing the allocated array objects.
But it also takes up too many unused memory after the current large vector
batch has been read.
Add a memory reserve policy for this scenario which will reuse the
allocated array object for small column vectors and free the memory for huge
column vectors.


### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT
Closes #41782 from wankunde/vector.
Lead-authored-by: Kun Wan <[email protected]>
Co-authored-by: wankun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 24 +++++++++++++++
.../execution/vectorized/OffHeapColumnVector.java | 9 ++++--
.../execution/vectorized/OnHeapColumnVector.java | 9 ++++--
.../execution/vectorized/WritableColumnVector.java | 35 ++++++++++++++++++++--
.../execution/vectorized/ColumnVectorSuite.scala | 32 +++++++++++++++++++-
5 files changed, 100 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index beaf61f0712..8c8b33921e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -487,6 +487,26 @@ object SQLConf {
.intConf
.createWithDefault(10000)
+ val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO =
+ buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio")
+ .doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or
the required " +
+ "memory is smaller than
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " +
+ "reserves required memory * 2 memory; otherwise, spark reserves " +
+ "required memory * this ratio memory, and will release this column
vector memory before " +
+ "reading the next batch rows.")
+ .version("4.0.0")
+ .doubleConf
+ .createWithDefault(1.2)
+
+ val VECTORIZED_HUGE_VECTOR_THRESHOLD =
+ buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold")
+ .doc("When the required memory is larger than this, spark reserves
required memory * " +
+ s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and
release this column " +
+ s"vector memory before reading the next batch rows. -1 means disabling
the optimization.")
+ .version("4.0.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(-1)
+
val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
@@ -4662,6 +4682,10 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
+ def vectorizedHugeVectorThreshold: Int =
getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt
+
+ def vectorizedHugeVectorReserveRatio: Double =
getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO)
+
def cacheVectorizedReaderEnabled: Boolean =
getConf(CACHE_VECTORIZED_READER_ENABLED)
def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 5674a091f6d..bc2636caefd 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -84,9 +84,7 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
return data;
}
- @Override
- public void close() {
- super.close();
+ protected void releaseMemory() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
@@ -97,6 +95,11 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
offsetData = 0;
}
+ @Override
+ public void close() {
+ super.close();
+ }
+
//
// APIs dealing with nulls
//
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 6e4a9c643e8..56a96907f0f 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -80,9 +80,7 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
reset();
}
- @Override
- public void close() {
- super.close();
+ protected void releaseMemory() {
nulls = null;
byteData = null;
shortData = null;
@@ -94,6 +92,11 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
arrayOffsets = null;
}
+ @Override
+ public void close() {
+ super.close();
+ }
+
//
// APIs dealing with nulls
//
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index a8e4aad60c2..ac8da471f00 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -53,6 +53,8 @@ import org.apache.spark.unsafe.types.UTF8String;
public abstract class WritableColumnVector extends ColumnVector {
private final byte[] byte8 = new byte[8];
+ protected abstract void releaseMemory();
+
/**
* Resets this column for writing. The currently stored values are no longer
accessible.
*/
@@ -69,6 +71,12 @@ public abstract class WritableColumnVector extends
ColumnVector {
putNotNulls(0, capacity);
numNulls = 0;
}
+
+ if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) {
+ capacity = defaultCapacity;
+ releaseMemory();
+ reserveInternal(capacity);
+ }
}
@Override
@@ -85,6 +93,7 @@ public abstract class WritableColumnVector extends
ColumnVector {
dictionaryIds = null;
}
dictionary = null;
+ releaseMemory();
}
public void reserveAdditional(int additionalCapacity) {
@@ -95,7 +104,10 @@ public abstract class WritableColumnVector extends
ColumnVector {
if (requiredCapacity < 0) {
throwUnsupportedException(requiredCapacity, null);
} else if (requiredCapacity > capacity) {
- int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+ int newCapacity =
+ hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ?
+ (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) :
+ (int) Math.min(MAX_CAPACITY, requiredCapacity *
hugeVectorReserveRatio);
if (requiredCapacity <= newCapacity) {
try {
reserveInternal(newCapacity);
@@ -846,7 +858,14 @@ public abstract class WritableColumnVector extends
ColumnVector {
/**
* Marks this column as being constant.
*/
- public final void setIsConstant() { isConstant = true; }
+ public final void setIsConstant() {
+ if (childColumns != null) {
+ for (WritableColumnVector c : childColumns) {
+ c.setIsConstant();
+ }
+ }
+ isConstant = true;
+ }
/**
* Marks this column only contains null values.
@@ -867,12 +886,21 @@ public abstract class WritableColumnVector extends
ColumnVector {
*/
protected int capacity;
+ /**
+ * The default number of rows that can be stored in this column.
+ */
+ protected final int defaultCapacity;
+
/**
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+ protected int hugeVectorThreshold;
+
+ protected double hugeVectorReserveRatio;
+
/**
* Number of nulls in this column. This is an optimization for the reader,
to skip NULL checks.
*/
@@ -922,6 +950,9 @@ public abstract class WritableColumnVector extends
ColumnVector {
protected WritableColumnVector(int capacity, DataType dataType) {
super(dataType);
this.capacity = capacity;
+ this.defaultCapacity = capacity;
+ this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold();
+ this.hugeVectorReserveRatio =
SQLConf.get().vectorizedHugeVectorReserveRatio();
if (isArray()) {
DataType childType;
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 5e06eb729ea..b2b2729e90e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.execution.columnar.ColumnAccessor
import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
import org.apache.spark.unsafe.types.UTF8String
-class ColumnVectorSuite extends SparkFunSuite {
+class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
private def withVector(
vector: WritableColumnVector)(
block: WritableColumnVector => Unit): Unit = {
@@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite {
}
}
+ test("SPARK-44239: Test column vector reserve policy") {
+ withSQLConf(
+ SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300",
+ SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") {
+ val dataType = ByteType
+
+ Array(new OnHeapColumnVector(80, dataType),
+ new OffHeapColumnVector(80, dataType)).foreach { vector =>
+ try {
+ // The new capacity of small vector = request capacity * 2 and will
not be reset
+ vector.appendBytes(100, 0)
+ assert(vector.capacity == 200)
+ vector.reset()
+ assert(vector.capacity == 200)
+
+ // The new capacity of huge vector = (request capacity -
HUGE_VECTOR_THRESHOLD) * 1.2 +
+ // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset.
+ vector.appendBytes(300, 0)
+ assert(vector.capacity == 360)
+ vector.reset()
+ assert(vector.capacity == 80)
+ } finally {
+ vector.close()
+ }
+ }
+ }
+ }
+
DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
val structType = new StructType().add(dt.typeName, dt)
testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]