This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 4a13b7a3c8 [SYSTEMDS-3577] Fix default storage memory fractions in
Spark
4a13b7a3c8 is described below
commit 4a13b7a3c8835eeeda32245b4067ef5a2ed8b363
Author: Arnab Phani <[email protected]>
AuthorDate: Sun May 28 13:25:15 2023 +0200
[SYSTEMDS-3577] Fix default storage memory fractions in Spark
This patch fixes a bug in analyzing Spark configurations for memory.
The current code wrongly assumes the storage fraction relative to the
heap, instead of the unified memory (execution + storage).
Closes #1835
---
.../context/SparkExecutionContext.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ce7b43972d..371ec3db51 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1804,9 +1804,10 @@ public class SparkExecutionContext extends
ExecutionContext
*/
public static class SparkClusterConfig
{
- //broadcasts are stored in mem-and-disk in data space, this
config
- //defines the fraction of data space to be used as broadcast
budget
- private static final double BROADCAST_DATA_FRACTION = 0.35;
+ //broadcasts are stored in mem-and-disk in storage space, this
config
+ //defines the fraction of min storage space to be used as
broadcast budget
+ private static final double BROADCAST_DATA_FRACTION = 0.70;
+ private static final double BROADCAST_DATA_FRACTION_LEGACY =
0.35;
//forward private config from Spark's
UnifiedMemoryManager.scala (>1.6)
private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 *
1024 * 1024;
@@ -1894,7 +1895,7 @@ public class SparkExecutionContext extends
ExecutionContext
double dataFrac =
sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
_memDataMinFrac = dataFrac;
_memDataMaxFrac = dataFrac;
- _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION;
//default 18%
+ _memBroadcastFrac = dataFrac *
BROADCAST_DATA_FRACTION_LEGACY; //default 18%
//analyze spark degree of parallelism
analyzeSparkParallelismConfiguation(sconf);
@@ -1910,10 +1911,14 @@ public class SparkExecutionContext extends
ExecutionContext
- RESERVED_SYSTEM_MEMORY_BYTES;
//get data and shuffle memory ratios (defaults not
specified in job conf)
- _memDataMinFrac =
sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
- _memDataMaxFrac =
sconf.getDouble("spark.memory.fraction", 0.6); //default 60%
- _memBroadcastFrac = _memDataMaxFrac *
BROADCAST_DATA_FRACTION; //default 21%
-
+ //first get the unified memory fraction (60%)
comprising execution and storage
+ double unifiedMem =
sconf.getDouble("spark.memory.fraction", 0.6);
+ //minimum default storage expressed as 50% of unified
memory (= 30% of heap)
+ _memDataMinFrac = unifiedMem *
sconf.getDouble("spark.memory.storageFraction", 0.5);
+ //storage memory can expand and take up the full
unified memory
+ _memDataMaxFrac = unifiedMem;
+ //Heuristic-based broadcast fraction (70% of min
storage = 21% of heap)
+ _memBroadcastFrac = _memDataMinFrac *
BROADCAST_DATA_FRACTION;
//analyze spark degree of parallelism
analyzeSparkParallelismConfiguation(sconf);
}