This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 49e2d1735 [VL] Gluten-it: --auto-cluster-resource to automatically set
up CPU cores and memory sizes for local cluster (#6655)
49e2d1735 is described below
commit 49e2d17359444cd64263bdd8ec30032abff3d617
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Aug 1 08:17:25 2024 +0800
[VL] Gluten-it: --auto-cluster-resource to automatically set up CPU cores
and memory sizes for local cluster (#6655)
---
.github/workflows/velox_docker.yml | 18 +-
.../org/apache/gluten/integration/BaseMixin.java | 11 +-
.../gluten/integration/command/DataGenMixin.java | 2 +-
.../gluten/integration/command/SparkRunModes.java | 310 ++++++++++++++++-----
.../org/apache/gluten/integration/Suite.scala | 3 -
.../integration/clickbench/ClickBenchSuite.scala | 2 -
.../apache/gluten/integration/ds/TpcdsSuite.scala | 2 -
.../apache/gluten/integration/h/TpchSuite.scala | 2 -
8 files changed, 261 insertions(+), 89 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index ca3317d12..913a55cbd 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -296,7 +296,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q67,q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
-d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -308,7 +308,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q67 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
\
-d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
-d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -319,7 +319,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
\
-d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
-d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -330,7 +330,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1
\
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
\
@@ -341,7 +341,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1
\
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
\
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
\
@@ -352,7 +352,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
- --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+ --data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
\
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
@@ -408,7 +408,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries \
--local --preset=velox --benchmark-type=ds --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
- --data-gen-strategy=skip --random-kill-tasks --no-session-reuse
+ --data-gen=skip --random-kill-tasks --no-session-reuse
# run-tpc-test-ubuntu-sf30:
# needs: build-native-lib-centos-7
@@ -457,10 +457,10 @@ jobs:
# cd tools/gluten-it \
# && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
# --local --preset=velox --benchmark-type=h --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
- # --data-gen-strategy=skip --shard=${{ matrix.shard }} \
+ # --data-gen=skip --shard=${{ matrix.shard }} \
# && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
# --local --preset=velox --benchmark-type=ds --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
- # --data-gen-strategy=skip --shard=${{ matrix.shard }}
+ # --data-gen=skip --shard=${{ matrix.shard }}
run-tpc-test-centos8-uniffle:
needs: build-native-lib-centos-7
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index 47aa0a0cb..08c55d78a 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -61,10 +61,7 @@ public class BaseMixin {
private int hsUiPort;
@CommandLine.ArgGroup(exclusive = true, multiplicity = "1")
- SparkRunModes.ModeEnumeration runModeEnumeration;
-
- @CommandLine.Option(names = {"--off-heap-size"}, description = "Off heap
memory size per executor", defaultValue = "6g")
- private String offHeapSize;
+ SparkRunModes.Mode.Enumeration runModeEnumeration;
@CommandLine.Option(names = {"--disable-aqe"}, description = "Disable Spark
SQL adaptive query execution", defaultValue = "false")
private boolean disableAqe;
@@ -133,19 +130,19 @@ public class BaseMixin {
case "h":
suite = new TpchSuite(runModeEnumeration.getSparkMasterUrl(), actions,
testConf,
baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir,
enableUi,
- enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+ enableHsUi, hsUiPort, disableAqe, disableBhj,
disableWscg, shufflePartitions, scanPartitions);
break;
case "ds":
suite = new TpcdsSuite(runModeEnumeration.getSparkMasterUrl(),
actions, testConf,
baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir,
enableUi,
- enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+ enableHsUi, hsUiPort, disableAqe, disableBhj,
disableWscg, shufflePartitions, scanPartitions);
break;
case "clickbench":
suite = new ClickBenchSuite(runModeEnumeration.getSparkMasterUrl(),
actions, testConf,
baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir,
enableUi,
- enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+ enableHsUi, hsUiPort, disableAqe, disableBhj,
disableWscg, shufflePartitions, scanPartitions);
break;
default:
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
index 3854d078e..fa51054f0 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
@@ -21,7 +21,7 @@ import org.apache.gluten.integration.action.DataGenOnly;
import picocli.CommandLine;
public class DataGenMixin {
- @CommandLine.Option(names = {"--data-gen-strategy"}, description = "The
strategy of data generation, accepted values: skip, once, always", defaultValue
= "always")
+ @CommandLine.Option(names = {"--data-gen"}, description = "The strategy of
data generation, accepted values: skip, once, always", defaultValue = "always")
private String dataGenStrategy;
@CommandLine.Option(names = {"-s", "--scale"}, description = "The scale
factor of sample TPC-H dataset", defaultValue = "0.1")
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
index 56ef68db9..6750b90e9 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
@@ -16,64 +16,50 @@
*/
package org.apache.gluten.integration.command;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import picocli.CommandLine;
+import javax.management.*;
import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.lang.management.ManagementFactory;
+import java.util.*;
import java.util.stream.Collectors;
public final class SparkRunModes {
+ private static <T> T findNonNull(T... objects) {
+ final List<T> nonNullObjects =
Arrays.stream(objects).filter(Objects::nonNull).collect(Collectors.toList());
+ Preconditions.checkState(nonNullObjects.size() == 1, "There are zero or
more than one non-null objects: " + nonNullObjects);
+ return nonNullObjects.get(0);
+ }
+
public interface Mode {
String getSparkMasterUrl();
- Map<String, String> extraSparkConf();
- }
- public static class ModeEnumeration implements Mode {
- @CommandLine.ArgGroup(exclusive = false)
- LocalMode localMode;
+ Map<String, String> extraSparkConf();
- @CommandLine.ArgGroup(exclusive = false)
- LocalClusterMode localClusterMode;
+ class Enumeration implements Mode {
+ @CommandLine.ArgGroup(exclusive = false)
+ LocalMode localMode;
- private Mode getActiveMode() {
- int enabledModeCount = 0;
- if (localMode != null) {
- enabledModeCount++;
- }
- if (localClusterMode != null) {
- enabledModeCount++;
- }
+ @CommandLine.ArgGroup(exclusive = false)
+ LocalClusterMode localClusterMode;
- if (enabledModeCount != 1) {
- throw new IllegalStateException("Only one single run mode can be
specified");
+ private Mode getActiveMode() {
+ return findNonNull(localMode, localClusterMode);
}
- if (localMode != null) {
- return localMode;
+ @Override
+ public String getSparkMasterUrl() {
+ return getActiveMode().getSparkMasterUrl();
}
- if (localClusterMode != null) {
- return localClusterMode;
+ @Override
+ public Map<String, String> extraSparkConf() {
+ return getActiveMode().extraSparkConf();
}
-
- throw new IllegalStateException("unreachable code");
- }
-
- @Override
- public String getSparkMasterUrl() {
- return getActiveMode().getSparkMasterUrl();
- }
-
- @Override
- public Map<String, String> extraSparkConf() {
- return getActiveMode().extraSparkConf();
}
}
@@ -84,6 +70,9 @@ public final class SparkRunModes {
@CommandLine.Option(names = {"--threads"}, description = "Local mode: Run
Spark locally with as many worker threads", defaultValue = "4")
private int localThreads;
+ @CommandLine.Option(names = {"--off-heap-size"}, description = "Local
mode: Total off-heap memory size", defaultValue = "6g")
+ private String offHeapSize;
+
@Override
public String getSparkMasterUrl() {
if (!enabled) {
@@ -95,15 +84,71 @@ public final class SparkRunModes {
@Override
public Map<String, String> extraSparkConf() {
- return Collections.emptyMap();
+ return ImmutableMap.<String, String>builder()
+ .put("spark.memory.offHeap.enabled", "true")
+ .put("spark.memory.offHeap.size", offHeapSize)
+ .build();
}
}
- private static class LocalClusterMode implements Mode {
- // We should transfer the jars to be tested in the integration testing to
executors
- public static final String[] EXTRA_JARS = new
String[]{"gluten-package-1.2.0-SNAPSHOT.jar"};
+ private interface ClusterResource {
+ int lcWorkers();
- @CommandLine.Option(names = {"--local-cluster"}, description = "Run in
Spark local cluster mode", required = true)
+ int lcWorkerCores();
+
+ long lcWorkerHeapMem(); // in MiB.
+
+ int lcExecutorCores();
+
+ long lcExecutorHeapMem(); // in MiB.
+
+ long lcExecutorOffHeapMem(); // in MiB.
+
+ class Enumeration implements ClusterResource {
+ @CommandLine.ArgGroup(exclusive = false)
+ ManualClusterResource manual;
+
+ @CommandLine.ArgGroup(exclusive = false)
+ AutoClusterResource auto;
+
+ private ClusterResource getActive() {
+ return findNonNull(manual, auto);
+ }
+
+ @Override
+ public int lcWorkers() {
+ return getActive().lcWorkers();
+ }
+
+ @Override
+ public int lcWorkerCores() {
+ return getActive().lcWorkerCores();
+ }
+
+ @Override
+ public long lcWorkerHeapMem() {
+ return getActive().lcWorkerHeapMem();
+ }
+
+ @Override
+ public int lcExecutorCores() {
+ return getActive().lcExecutorCores();
+ }
+
+ @Override
+ public long lcExecutorHeapMem() {
+ return getActive().lcExecutorHeapMem();
+ }
+
+ @Override
+ public long lcExecutorOffHeapMem() {
+ return getActive().lcExecutorOffHeapMem();
+ }
+ }
+ }
+
+ private static class ManualClusterResource implements ClusterResource {
+ @CommandLine.Option(names = {"--manual-cluster-resource"}, description =
"Local cluster mode: Manually configure cluster resource", required = true)
private boolean enabled;
@CommandLine.Option(names = {"--workers"}, description = "Local cluster
mode: Number of workers", defaultValue = "2")
@@ -112,14 +157,163 @@ public final class SparkRunModes {
@CommandLine.Option(names = {"--worker-cores"}, description = "Local
cluster mode: Number of cores per worker", defaultValue = "2")
private int lcWorkerCores;
- @CommandLine.Option(names = {"--worker-mem"}, description = "Local cluster
mode: Memory per worker", defaultValue = "4g")
- private String lcWorkerMem;
+ @CommandLine.Option(names = {"--worker-heap-size"}, description = "Local
cluster mode: Heap memory per worker", defaultValue = "4g")
+ private String lcWorkerHeapMem;
@CommandLine.Option(names = {"--executor-cores"}, description = "Local
cluster mode: Number of cores per executor", defaultValue = "1")
private int lcExecutorCores;
- @CommandLine.Option(names = {"--executor-mem"}, description = "Local
cluster mode: Memory per executor", defaultValue = "2g")
- private String lcExecutorMem;
+ @CommandLine.Option(names = {"--executor-heap-size"}, description = "Local
cluster mode: Heap memory per executor", defaultValue = "2g")
+ private String lcExecutorHeapMem;
+
+ @CommandLine.Option(names = {"--executor-off-heap-size"}, description =
"Local cluster mode: Off-heap memory per executor", defaultValue = "6g")
+ private String lcExecutorOffHeapMem;
+
+ @Override
+ public int lcWorkers() {
+ ensureEnabled();
+ return lcWorkers;
+ }
+
+ @Override
+ public int lcWorkerCores() {
+ ensureEnabled();
+ return lcWorkerCores;
+ }
+
+ @Override
+ public long lcWorkerHeapMem() {
+ ensureEnabled();
+ return Utils.byteStringAsMb(lcWorkerHeapMem);
+ }
+
+ @Override
+ public int lcExecutorCores() {
+ ensureEnabled();
+ return lcExecutorCores;
+ }
+
+ @Override
+ public long lcExecutorHeapMem() {
+ ensureEnabled();
+ return Utils.byteStringAsMb(lcExecutorHeapMem);
+ }
+
+ @Override
+ public long lcExecutorOffHeapMem() {
+ ensureEnabled();
+ return Utils.byteStringAsMb(lcExecutorOffHeapMem);
+ }
+
+ private void ensureEnabled() {
+ if (!enabled) {
+ throw new IllegalStateException("Manual cluster resource is not
enabled");
+ }
+ }
+ }
+
+ private static class AutoClusterResource implements ClusterResource {
+ @CommandLine.Option(names = {"--auto-cluster-resource"}, description =
"Local cluster mode: Automatically configure cluster resource", required = true)
+ private boolean enabled;
+
+ private final int lcWorkers;
+ private final int lcWorkerCores;
+ private final long lcWorkerHeapMem;
+ private final int lcExecutorCores;
+ private final long lcExecutorHeapMem;
+ private final long lcExecutorOffHeapMem;
+
+ public AutoClusterResource() {
+ final int totalCores = Runtime.getRuntime().availableProcessors();
+ final long totalMem = (long) (getTotalMem() * 0.8);
+ Preconditions.checkState(totalMem >= 64, "--auto-cluster-resource mode
requires for at least 64 MiB physical memory available. Current: " + totalMem);
+ Preconditions.checkState(totalCores >= 1, "--auto-cluster-resource mode
requires for at least 1 CPU core available. Current: " + totalCores);
+ if (totalCores % 2 == 1) {
+ // Platform has an odd number of CPU cores.
+ this.lcWorkers = 1;
+ this.lcWorkerCores = totalCores;
+ this.lcExecutorCores = 1;
+ } else {
+ // Platform has an even number of CPU cores.
+ this.lcWorkers = 2;
+ this.lcWorkerCores = totalCores / this.lcWorkers;
+ if (lcWorkerCores % 2 == 1) {
+ this.lcExecutorCores = 1;
+ } else {
+ this.lcExecutorCores = 2;
+ }
+ }
+ Preconditions.checkState(totalCores % this.lcExecutorCores == 0);
+ final int numExecutors = totalCores / this.lcExecutorCores;
+ Preconditions.checkState(this.lcWorkerCores % this.lcExecutorCores == 0);
+ final int numExecutorsPerWorker = this.lcWorkerCores /
this.lcExecutorCores;
+ final long executorMem = totalMem / numExecutors;
+ this.lcExecutorHeapMem = (long) (executorMem * 0.33);
+ this.lcExecutorOffHeapMem = (long) (executorMem * 0.67);
+ this.lcWorkerHeapMem = this.lcExecutorHeapMem * numExecutorsPerWorker;
+ System.out.printf("Automatically configured cluster resource settings:
%n" +
+ " lcWorkers: [%d]%n" +
+ " lcWorkerCores: [%d]%n" +
+ " lcWorkerHeapMem: [%dMiB]%n" +
+ " lcExecutorCores: [%d]%n" +
+ " lcExecutorHeapMem: [%dMiB]%n" +
+ " lcExecutorOffHeapMem: [%dMiB]%n",
+ lcWorkers,
+ lcWorkerCores,
+ lcWorkerHeapMem,
+ lcExecutorCores,
+ lcExecutorHeapMem,
+ lcExecutorOffHeapMem);
+ }
+
+ private static long getTotalMem() {
+ try {
+ final MBeanServer mBeanServer =
ManagementFactory.getPlatformMBeanServer();
+ final Object attribute = mBeanServer.getAttribute(new
ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize");
+ final long totalMem = Long.parseLong(attribute.toString()) / 1024 /
1024;
+ return totalMem;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int lcWorkers() {
+ return lcWorkers;
+ }
+
+ @Override
+ public int lcWorkerCores() {
+ return lcWorkerCores;
+ }
+
+ @Override
+ public long lcWorkerHeapMem() {
+ return lcWorkerHeapMem;
+ }
+
+ @Override
+ public int lcExecutorCores() {
+ return lcExecutorCores;
+ }
+
+ @Override
+ public long lcExecutorHeapMem() {
+ return lcExecutorHeapMem;
+ }
+
+ @Override
+ public long lcExecutorOffHeapMem() {
+ return lcExecutorOffHeapMem;
+ }
+ }
+
+ private static class LocalClusterMode implements Mode {
+ @CommandLine.Option(names = {"--local-cluster"}, description = "Run in
Spark local cluster mode", required = true)
+ private boolean enabled;
+
+ @CommandLine.ArgGroup(exclusive = true, multiplicity = "1")
+ ClusterResource.Enumeration resourceEnumeration;
@Override
public String getSparkMasterUrl() {
@@ -132,26 +326,16 @@ public final class SparkRunModes {
if (!System.getenv().containsKey("SPARK_SCALA_VERSION")) {
throw new IllegalArgumentException("SPARK_SCALA_VERSION not set!
Please set it first or use --local instead. Example: export
SPARK_SCALA_VERSION=2.12");
}
- return String.format("local-cluster[%d,%d,%d]", lcWorkers,
lcWorkerCores, Utils.byteStringAsMb(lcWorkerMem));
+ return String.format("local-cluster[%d,%d,%d]",
resourceEnumeration.lcWorkers(), resourceEnumeration.lcWorkerCores(),
resourceEnumeration.lcWorkerHeapMem());
}
@Override
public Map<String, String> extraSparkConf() {
- final Set<String> extraJarSet =
Arrays.stream(EXTRA_JARS).collect(Collectors.toSet());
- String classpath = System.getProperty("java.class.path");
- String[] classPathValues = classpath.split(File.pathSeparator);
- Optional<String> extraClassPath =
Arrays.stream(classPathValues).filter(classPath -> {
- File file = new File(classPath);
- return file.exists() && file.isFile() &&
extraJarSet.contains(file.getName());
- }).map(classPath -> {
- File file = new File(classPath);
- return file.getAbsolutePath();
- }).reduce((s1, s2) -> s1 + File.pathSeparator + s2);
-
final Map<String, String> extras = new HashMap<>();
- extras.put(SparkLauncher.EXECUTOR_CORES,
String.valueOf(lcExecutorCores));
- extras.put(SparkLauncher.EXECUTOR_MEMORY, lcExecutorMem);
- extraClassPath.ifPresent(path ->
extras.put("spark.executor.extraClassPath", path));
+ extras.put(SparkLauncher.EXECUTOR_CORES,
String.valueOf(resourceEnumeration.lcExecutorCores()));
+ extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm",
resourceEnumeration.lcExecutorHeapMem()));
+ extras.put("spark.memory.offHeap.enabled", "true");
+ extras.put("spark.memory.offHeap.size", String.format("%dm",
resourceEnumeration.lcExecutorOffHeapMem()));
return extras;
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 070c43e9b..64fc179ce 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -38,7 +38,6 @@ abstract class Suite(
private val enableUi: Boolean,
private val enableHsUi: Boolean,
private val hsUiPort: Int,
- private val offHeapSize: String,
private val disableAqe: Boolean,
private val disableBhj: Boolean,
private val disableWscg: Boolean,
@@ -75,8 +74,6 @@ abstract class Suite(
sessionSwitcher
.defaultConf()
.setWarningOnOverriding("spark.unsafe.exceptionOnMemoryLeak",
s"$errorOnMemLeak")
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.memory.offHeap.enabled",
"true")
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.memory.offHeap.size",
offHeapSize)
if (!enableUi) {
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.ui.enabled",
"false")
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index f75431941..da0c6b8ce 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -41,7 +41,6 @@ class ClickBenchSuite(
val enableUi: Boolean,
val enableHsUi: Boolean,
val hsUiPort: Int,
- val offHeapSize: String,
val disableAqe: Boolean,
val disableBhj: Boolean,
val disableWscg: Boolean,
@@ -58,7 +57,6 @@ class ClickBenchSuite(
enableUi,
enableHsUi,
hsUiPort,
- offHeapSize,
disableAqe,
disableBhj,
disableWscg,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 190623614..9e6fa1edb 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -36,7 +36,6 @@ class TpcdsSuite(
val enableUi: Boolean,
val enableHsUi: Boolean,
val hsUiPort: Int,
- val offHeapSize: String,
val disableAqe: Boolean,
val disableBhj: Boolean,
val disableWscg: Boolean,
@@ -53,7 +52,6 @@ class TpcdsSuite(
enableUi,
enableHsUi,
hsUiPort,
- offHeapSize,
disableAqe,
disableBhj,
disableWscg,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 86fcaea0a..fc3ad1310 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -36,7 +36,6 @@ class TpchSuite(
val enableUi: Boolean,
val enableHsUi: Boolean,
val hsUiPort: Int,
- val offHeapSize: String,
val disableAqe: Boolean,
val disableBhj: Boolean,
val disableWscg: Boolean,
@@ -53,7 +52,6 @@ class TpchSuite(
enableUi,
enableHsUi,
hsUiPort,
- offHeapSize,
disableAqe,
disableBhj,
disableWscg,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]