This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 27f9ac20675 [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset 27f9ac20675 is described below commit 27f9ac20675326d4e843387f4bb07415c4f0e8f5 Author: Kun Wan <wan...@apache.org> AuthorDate: Wed Aug 30 22:36:59 2023 +0800 [SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset ### What changes were proposed in this pull request? Add a memory reserve policy for WritableColumnVector: * If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * 2 memory. * If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory. * Free the WritableColumnVector memory if the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD which will reuse the allocated array object for small column vectors and free the memory for huge column vectors. ### Why are the changes needed? When spark reads a data file into a WritableColumnVector, the memory allocated by the WritableColumnVectors is not freed until the VectorizedColumnReader completes. It will save memory allocation time by reusing the allocated array objects. But it also takes up too many unused memory after the current large vector batch has been read. Add a memory reserve policy for this scenario which will reuse the allocated array object for small column vectors and free the memory for huge column vectors. ![image](https://github.com/apache/spark/assets/3626747/a7a487bd-f184-4b24-bea0-75e530702887) ![image](https://github.com/apache/spark/assets/3626747/01d0268f-68e7-416f-b9b3-6c9d60919596) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #41782 from wankunde/vector. Lead-authored-by: Kun Wan <wan...@apache.org> Co-authored-by: wankun <wan...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 24 +++++++++++++++ .../execution/vectorized/OffHeapColumnVector.java | 9 ++++-- .../execution/vectorized/OnHeapColumnVector.java | 9 ++++-- .../execution/vectorized/WritableColumnVector.java | 35 ++++++++++++++++++++-- .../execution/vectorized/ColumnVectorSuite.scala | 32 +++++++++++++++++++- 5 files changed, 100 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index beaf61f0712..8c8b33921e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -487,6 +487,26 @@ object SQLConf { .intConf .createWithDefault(10000) + val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio") + .doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or the required " + + "memory is smaller than spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " + + "reserves required memory * 2 memory; otherwise, spark reserves " + + "required memory * this ratio memory, and will release this column vector memory before " + + "reading the next batch rows.") + .version("4.0.0") + .doubleConf + .createWithDefault(1.2) + + val VECTORIZED_HUGE_VECTOR_THRESHOLD = + buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold") + .doc("When the required memory is larger than this, spark reserves required memory * " + + s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " + + s"vector memory before reading the next batch rows. -1 means disabling the optimization.") + .version("4.0.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(-1) + val IN_MEMORY_PARTITION_PRUNING = buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() @@ -4662,6 +4682,10 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt + + def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO) + def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 5674a091f6d..bc2636caefd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -84,9 +84,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { return data; } - @Override - public void close() { - super.close(); + protected void releaseMemory() { Platform.freeMemory(nulls); Platform.freeMemory(data); Platform.freeMemory(lengthData); @@ -97,6 +95,11 @@ public final class OffHeapColumnVector extends WritableColumnVector { offsetData = 0; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 6e4a9c643e8..56a96907f0f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -80,9 +80,7 @@ public final class OnHeapColumnVector extends WritableColumnVector { reset(); } - @Override - public void close() { - super.close(); + protected void releaseMemory() { nulls = null; byteData = null; shortData = null; @@ -94,6 +92,11 @@ public final class OnHeapColumnVector extends WritableColumnVector { arrayOffsets = null; } + @Override + public void close() { + super.close(); + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index a8e4aad60c2..ac8da471f00 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -53,6 +53,8 @@ import org.apache.spark.unsafe.types.UTF8String; public abstract class WritableColumnVector extends ColumnVector { private final byte[] byte8 = new byte[8]; + protected abstract void releaseMemory(); + /** * Resets this column for writing. The currently stored values are no longer accessible. */ @@ -69,6 +71,12 @@ public abstract class WritableColumnVector extends ColumnVector { putNotNulls(0, capacity); numNulls = 0; } + + if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) { + capacity = defaultCapacity; + releaseMemory(); + reserveInternal(capacity); + } } @Override @@ -85,6 +93,7 @@ public abstract class WritableColumnVector extends ColumnVector { dictionaryIds = null; } dictionary = null; + releaseMemory(); } public void reserveAdditional(int additionalCapacity) { @@ -95,7 +104,10 @@ public abstract class WritableColumnVector extends ColumnVector { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); } else if (requiredCapacity > capacity) { - int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + int newCapacity = + hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ? + (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) : + (int) Math.min(MAX_CAPACITY, requiredCapacity * hugeVectorReserveRatio); if (requiredCapacity <= newCapacity) { try { reserveInternal(newCapacity); @@ -846,7 +858,14 @@ public abstract class WritableColumnVector extends ColumnVector { /** * Marks this column as being constant. */ - public final void setIsConstant() { isConstant = true; } + public final void setIsConstant() { + if (childColumns != null) { + for (WritableColumnVector c : childColumns) { + c.setIsConstant(); + } + } + isConstant = true; + } /** * Marks this column only contains null values. @@ -867,12 +886,21 @@ public abstract class WritableColumnVector extends ColumnVector { */ protected int capacity; + /** + * The default number of rows that can be stored in this column. + */ + protected final int defaultCapacity; + /** * Upper limit for the maximum capacity for this column. */ @VisibleForTesting protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + protected int hugeVectorThreshold; + + protected double hugeVectorReserveRatio; + /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. */ @@ -922,6 +950,9 @@ public abstract class WritableColumnVector extends ColumnVector { protected WritableColumnVector(int capacity, DataType dataType) { super(dataType); this.capacity = capacity; + this.defaultCapacity = capacity; + this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold(); + this.hugeVectorReserveRatio = SQLConf.get().vectorizedHugeVectorReserveRatio(); if (isArray()) { DataType childType; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 5e06eb729ea..b2b2729e90e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.columnar.ColumnAccessor import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray import org.apache.spark.unsafe.types.UTF8String -class ColumnVectorSuite extends SparkFunSuite { +class ColumnVectorSuite extends SparkFunSuite with SQLHelper { private def withVector( vector: WritableColumnVector)( block: WritableColumnVector => Unit): Unit = { @@ -588,6 +590,34 @@ class ColumnVectorSuite extends SparkFunSuite { } } + test("SPARK-44239: Test column vector reserve policy") { + withSQLConf( + SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300", + SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") { + val dataType = ByteType + + Array(new OnHeapColumnVector(80, dataType), + new OffHeapColumnVector(80, dataType)).foreach { vector => + try { + // The new capacity of small vector = request capacity * 2 and will not be reset + vector.appendBytes(100, 0) + assert(vector.capacity == 200) + vector.reset() + assert(vector.capacity == 200) + + // The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 + + // HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset. + vector.appendBytes(300, 0) + assert(vector.capacity == 360) + vector.reset() + assert(vector.capacity == 80) + } finally { + vector.close() + } + } + } + } + DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt => val structType = new StructType().add(dt.typeName, dt) testVectors("ColumnarRow " + dt.typeName, 10, structType) { v => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org