This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 49e2d1735 [VL] Gluten-it: --auto-cluster-resource to automatically set 
up CPU cores and memory sizes for local cluster (#6655)
49e2d1735 is described below

commit 49e2d17359444cd64263bdd8ec30032abff3d617
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Aug 1 08:17:25 2024 +0800

    [VL] Gluten-it: --auto-cluster-resource to automatically set up CPU cores 
and memory sizes for local cluster (#6655)
---
 .github/workflows/velox_docker.yml                 |  18 +-
 .../org/apache/gluten/integration/BaseMixin.java   |  11 +-
 .../gluten/integration/command/DataGenMixin.java   |   2 +-
 .../gluten/integration/command/SparkRunModes.java  | 310 ++++++++++++++++-----
 .../org/apache/gluten/integration/Suite.scala      |   3 -
 .../integration/clickbench/ClickBenchSuite.scala   |   2 -
 .../apache/gluten/integration/ds/TpcdsSuite.scala  |   2 -
 .../apache/gluten/integration/h/TpchSuite.scala    |   2 -
 8 files changed, 261 insertions(+), 89 deletions(-)

diff --git a/.github/workflows/velox_docker.yml 
b/.github/workflows/velox_docker.yml
index ca3317d12..913a55cbd 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -296,7 +296,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q67,q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
             -d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
             -d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -308,7 +308,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q67 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
 \
             -d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
             -d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -319,7 +319,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
 \
             -d=OFFHEAP_SIZE:6g,spark.memory.offHeap.size=6g \
             -d=OFFHEAP_SIZE:4g,spark.memory.offHeap.size=4g \
@@ -330,7 +330,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 
\
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
             -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
             
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
 \
@@ -341,7 +341,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 
\
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
 \
             -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
             
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
 \
@@ -352,7 +352,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
-            --data-gen-strategy=skip -m=OffHeapExecutionMemory \
+            --data-gen=skip -m=OffHeapExecutionMemory \
             -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
             
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1
 \
             -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
@@ -408,7 +408,7 @@ jobs:
           cd tools/gluten-it \
           && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
-s=30.0  --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 
\
-            --data-gen-strategy=skip  --random-kill-tasks --no-session-reuse
+            --data-gen=skip  --random-kill-tasks --no-session-reuse
 
   # run-tpc-test-ubuntu-sf30:
   #   needs: build-native-lib-centos-7
@@ -457,10 +457,10 @@ jobs:
   #         cd tools/gluten-it \
   #         && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
   #           --local --preset=velox --benchmark-type=h --error-on-memleak 
-s=30.0  --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 
\
-  #           --data-gen-strategy=skip --shard=${{ matrix.shard }} \
+  #           --data-gen=skip --shard=${{ matrix.shard }} \
   #         && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
   #           --local --preset=velox --benchmark-type=ds --error-on-memleak 
-s=30.0  --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 
\
-  #           --data-gen-strategy=skip --shard=${{ matrix.shard }}
+  #           --data-gen=skip --shard=${{ matrix.shard }}
 
   run-tpc-test-centos8-uniffle:
     needs: build-native-lib-centos-7
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index 47aa0a0cb..08c55d78a 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -61,10 +61,7 @@ public class BaseMixin {
   private int hsUiPort;
 
   @CommandLine.ArgGroup(exclusive = true, multiplicity = "1")
-  SparkRunModes.ModeEnumeration runModeEnumeration;
-
-  @CommandLine.Option(names = {"--off-heap-size"}, description = "Off heap 
memory size per executor", defaultValue = "6g")
-  private String offHeapSize;
+  SparkRunModes.Mode.Enumeration runModeEnumeration;
 
   @CommandLine.Option(names = {"--disable-aqe"}, description = "Disable Spark 
SQL adaptive query execution", defaultValue = "false")
   private boolean disableAqe;
@@ -133,19 +130,19 @@ public class BaseMixin {
       case "h":
         suite = new TpchSuite(runModeEnumeration.getSparkMasterUrl(), actions, 
testConf,
             baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir, 
enableUi,
-            enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+            enableHsUi, hsUiPort, disableAqe, disableBhj,
             disableWscg, shufflePartitions, scanPartitions);
         break;
       case "ds":
         suite = new TpcdsSuite(runModeEnumeration.getSparkMasterUrl(), 
actions, testConf,
             baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir, 
enableUi,
-            enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+            enableHsUi, hsUiPort, disableAqe, disableBhj,
             disableWscg, shufflePartitions, scanPartitions);
         break;
       case "clickbench":
         suite = new ClickBenchSuite(runModeEnumeration.getSparkMasterUrl(), 
actions, testConf,
             baselineConf, extraSparkConfScala, level, errorOnMemLeak, dataDir, 
enableUi,
-            enableHsUi, hsUiPort, offHeapSize, disableAqe, disableBhj,
+            enableHsUi, hsUiPort, disableAqe, disableBhj,
             disableWscg, shufflePartitions, scanPartitions);
         break;
       default:
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
index 3854d078e..fa51054f0 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/DataGenMixin.java
@@ -21,7 +21,7 @@ import org.apache.gluten.integration.action.DataGenOnly;
 import picocli.CommandLine;
 
 public class DataGenMixin {
-  @CommandLine.Option(names = {"--data-gen-strategy"}, description = "The 
strategy of data generation, accepted values: skip, once, always", defaultValue 
= "always")
+  @CommandLine.Option(names = {"--data-gen"}, description = "The strategy of 
data generation, accepted values: skip, once, always", defaultValue = "always")
   private String dataGenStrategy;
 
   @CommandLine.Option(names = {"-s", "--scale"}, description = "The scale 
factor of sample TPC-H dataset", defaultValue = "0.1")
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
index 56ef68db9..6750b90e9 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
@@ -16,64 +16,50 @@
  */
 package org.apache.gluten.integration.command;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.spark.util.Utils;
 import picocli.CommandLine;
 
+import javax.management.*;
 import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.lang.management.ManagementFactory;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public final class SparkRunModes {
+  private static <T> T findNonNull(T... objects) {
+    final List<T> nonNullObjects = 
Arrays.stream(objects).filter(Objects::nonNull).collect(Collectors.toList());
+    Preconditions.checkState(nonNullObjects.size() == 1, "There are zero or 
more than one non-null objects: " + nonNullObjects);
+    return nonNullObjects.get(0);
+  }
+
   public interface Mode {
     String getSparkMasterUrl();
-    Map<String, String> extraSparkConf();
-  }
 
-  public static class ModeEnumeration implements Mode {
-    @CommandLine.ArgGroup(exclusive = false)
-    LocalMode localMode;
+    Map<String, String> extraSparkConf();
 
-    @CommandLine.ArgGroup(exclusive = false)
-    LocalClusterMode localClusterMode;
+    class Enumeration implements Mode {
+      @CommandLine.ArgGroup(exclusive = false)
+      LocalMode localMode;
 
-    private Mode getActiveMode() {
-      int enabledModeCount = 0;
-      if (localMode != null) {
-        enabledModeCount++;
-      }
-      if (localClusterMode != null) {
-        enabledModeCount++;
-      }
+      @CommandLine.ArgGroup(exclusive = false)
+      LocalClusterMode localClusterMode;
 
-      if (enabledModeCount != 1) {
-        throw new IllegalStateException("Only one single run mode can be 
specified");
+      private Mode getActiveMode() {
+        return findNonNull(localMode, localClusterMode);
       }
 
-      if (localMode != null) {
-        return localMode;
+      @Override
+      public String getSparkMasterUrl() {
+        return getActiveMode().getSparkMasterUrl();
       }
 
-      if (localClusterMode != null) {
-        return localClusterMode;
+      @Override
+      public Map<String, String> extraSparkConf() {
+        return getActiveMode().extraSparkConf();
       }
-
-      throw new IllegalStateException("unreachable code");
-    }
-
-    @Override
-    public String getSparkMasterUrl() {
-      return getActiveMode().getSparkMasterUrl();
-    }
-
-    @Override
-    public Map<String, String> extraSparkConf() {
-      return getActiveMode().extraSparkConf();
     }
   }
 
@@ -84,6 +70,9 @@ public final class SparkRunModes {
     @CommandLine.Option(names = {"--threads"}, description = "Local mode: Run 
Spark locally with as many worker threads", defaultValue = "4")
     private int localThreads;
 
+    @CommandLine.Option(names = {"--off-heap-size"}, description = "Local 
mode: Total off-heap memory size", defaultValue = "6g")
+    private String offHeapSize;
+
     @Override
     public String getSparkMasterUrl() {
       if (!enabled) {
@@ -95,15 +84,71 @@ public final class SparkRunModes {
 
     @Override
     public Map<String, String> extraSparkConf() {
-      return Collections.emptyMap();
+      return ImmutableMap.<String, String>builder()
+          .put("spark.memory.offHeap.enabled", "true")
+          .put("spark.memory.offHeap.size", offHeapSize)
+          .build();
     }
   }
 
-  private static class LocalClusterMode implements Mode {
-    // We should transfer the jars to be tested in the integration testing to 
executors
-    public static final String[] EXTRA_JARS = new 
String[]{"gluten-package-1.2.0-SNAPSHOT.jar"};
+  private interface ClusterResource {
+    int lcWorkers();
 
-    @CommandLine.Option(names = {"--local-cluster"}, description = "Run in 
Spark local cluster mode", required = true)
+    int lcWorkerCores();
+
+    long lcWorkerHeapMem(); // in MiB.
+
+    int lcExecutorCores();
+
+    long lcExecutorHeapMem(); // in MiB.
+
+    long lcExecutorOffHeapMem(); // in MiB.
+
+    class Enumeration implements ClusterResource {
+      @CommandLine.ArgGroup(exclusive = false)
+      ManualClusterResource manual;
+
+      @CommandLine.ArgGroup(exclusive = false)
+      AutoClusterResource auto;
+
+      private ClusterResource getActive() {
+        return findNonNull(manual, auto);
+      }
+
+      @Override
+      public int lcWorkers() {
+        return getActive().lcWorkers();
+      }
+
+      @Override
+      public int lcWorkerCores() {
+        return getActive().lcWorkerCores();
+      }
+
+      @Override
+      public long lcWorkerHeapMem() {
+        return getActive().lcWorkerHeapMem();
+      }
+
+      @Override
+      public int lcExecutorCores() {
+        return getActive().lcExecutorCores();
+      }
+
+      @Override
+      public long lcExecutorHeapMem() {
+        return getActive().lcExecutorHeapMem();
+      }
+
+      @Override
+      public long lcExecutorOffHeapMem() {
+        return getActive().lcExecutorOffHeapMem();
+      }
+    }
+  }
+
+  private static class ManualClusterResource implements ClusterResource {
+    @CommandLine.Option(names = {"--manual-cluster-resource"}, description = 
"Local cluster mode: Manually configure cluster resource", required = true)
     private boolean enabled;
 
     @CommandLine.Option(names = {"--workers"}, description = "Local cluster 
mode: Number of workers", defaultValue = "2")
@@ -112,14 +157,163 @@ public final class SparkRunModes {
     @CommandLine.Option(names = {"--worker-cores"}, description = "Local 
cluster mode: Number of cores per worker", defaultValue = "2")
     private int lcWorkerCores;
 
-    @CommandLine.Option(names = {"--worker-mem"}, description = "Local cluster 
mode: Memory per worker", defaultValue = "4g")
-    private String lcWorkerMem;
+    @CommandLine.Option(names = {"--worker-heap-size"}, description = "Local 
cluster mode: Heap memory per worker", defaultValue = "4g")
+    private String lcWorkerHeapMem;
 
     @CommandLine.Option(names = {"--executor-cores"}, description = "Local 
cluster mode: Number of cores per executor", defaultValue = "1")
     private int lcExecutorCores;
 
-    @CommandLine.Option(names = {"--executor-mem"}, description = "Local 
cluster mode: Memory per executor", defaultValue = "2g")
-    private String lcExecutorMem;
+    @CommandLine.Option(names = {"--executor-heap-size"}, description = "Local 
cluster mode: Heap memory per executor", defaultValue = "2g")
+    private String lcExecutorHeapMem;
+
+    @CommandLine.Option(names = {"--executor-off-heap-size"}, description = 
"Local cluster mode: Off-heap memory per executor", defaultValue = "6g")
+    private String lcExecutorOffHeapMem;
+
+    @Override
+    public int lcWorkers() {
+      ensureEnabled();
+      return lcWorkers;
+    }
+
+    @Override
+    public int lcWorkerCores() {
+      ensureEnabled();
+      return lcWorkerCores;
+    }
+
+    @Override
+    public long lcWorkerHeapMem() {
+      ensureEnabled();
+      return Utils.byteStringAsMb(lcWorkerHeapMem);
+    }
+
+    @Override
+    public int lcExecutorCores() {
+      ensureEnabled();
+      return lcExecutorCores;
+    }
+
+    @Override
+    public long lcExecutorHeapMem() {
+      ensureEnabled();
+      return Utils.byteStringAsMb(lcExecutorHeapMem);
+    }
+
+    @Override
+    public long lcExecutorOffHeapMem() {
+      ensureEnabled();
+      return Utils.byteStringAsMb(lcExecutorOffHeapMem);
+    }
+
+    private void ensureEnabled() {
+      if (!enabled) {
+        throw new IllegalStateException("Manual cluster resource is not 
enabled");
+      }
+    }
+  }
+  
+  private static class AutoClusterResource implements ClusterResource {
+    @CommandLine.Option(names = {"--auto-cluster-resource"}, description = 
"Local cluster mode: Automatically configure cluster resource", required = true)
+    private boolean enabled;
+
+    private final int lcWorkers;
+    private final int lcWorkerCores;
+    private final long lcWorkerHeapMem;
+    private final int lcExecutorCores;
+    private final long lcExecutorHeapMem;
+    private final long lcExecutorOffHeapMem;
+
+    public AutoClusterResource() {
+      final int totalCores = Runtime.getRuntime().availableProcessors();
+      final long totalMem = (long) (getTotalMem() * 0.8);
+      Preconditions.checkState(totalMem >= 64, "--auto-cluster-resource mode 
requires for at least 64 MiB physical memory available. Current: " + totalMem);
+      Preconditions.checkState(totalCores >= 1, "--auto-cluster-resource mode 
requires for at least 1 CPU core available. Current: " + totalCores);
+      if (totalCores % 2 == 1) {
+        // Platform has an odd number of CPU cores.
+        this.lcWorkers = 1;
+        this.lcWorkerCores = totalCores;
+        this.lcExecutorCores = 1;
+      } else {
+        // Platform has an even number of CPU cores.
+        this.lcWorkers = 2;
+        this.lcWorkerCores = totalCores / this.lcWorkers;
+        if (lcWorkerCores % 2 == 1) {
+          this.lcExecutorCores = 1;
+        } else {
+          this.lcExecutorCores = 2;
+        }
+      }
+      Preconditions.checkState(totalCores % this.lcExecutorCores == 0);
+      final int numExecutors = totalCores / this.lcExecutorCores;
+      Preconditions.checkState(this.lcWorkerCores % this.lcExecutorCores == 0);
+      final int numExecutorsPerWorker = this.lcWorkerCores / 
this.lcExecutorCores;
+      final long executorMem = totalMem / numExecutors;
+      this.lcExecutorHeapMem = (long) (executorMem * 0.33);
+      this.lcExecutorOffHeapMem = (long) (executorMem * 0.67);
+      this.lcWorkerHeapMem = this.lcExecutorHeapMem * numExecutorsPerWorker;
+      System.out.printf("Automatically configured cluster resource settings: 
%n" +
+          "  lcWorkers: [%d]%n" +
+          "  lcWorkerCores: [%d]%n" +
+          "  lcWorkerHeapMem: [%dMiB]%n" +
+          "  lcExecutorCores: [%d]%n" +
+          "  lcExecutorHeapMem: [%dMiB]%n" +
+          "  lcExecutorOffHeapMem: [%dMiB]%n",
+          lcWorkers,
+          lcWorkerCores,
+          lcWorkerHeapMem,
+          lcExecutorCores,
+          lcExecutorHeapMem,
+          lcExecutorOffHeapMem);
+    }
+
+    private static long getTotalMem() {
+      try {
+        final MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+        final Object attribute = mBeanServer.getAttribute(new 
ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize");
+        final long totalMem = Long.parseLong(attribute.toString()) / 1024 / 
1024;
+        return totalMem;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public int lcWorkers() {
+      return lcWorkers;
+    }
+
+    @Override
+    public int lcWorkerCores() {
+      return lcWorkerCores;
+    }
+
+    @Override
+    public long lcWorkerHeapMem() {
+      return lcWorkerHeapMem;
+    }
+
+    @Override
+    public int lcExecutorCores() {
+      return lcExecutorCores;
+    }
+
+    @Override
+    public long lcExecutorHeapMem() {
+      return lcExecutorHeapMem;
+    }
+
+    @Override
+    public long lcExecutorOffHeapMem() {
+      return lcExecutorOffHeapMem;
+    }
+  }
+
+  private static class LocalClusterMode implements Mode {
+    @CommandLine.Option(names = {"--local-cluster"}, description = "Run in 
Spark local cluster mode", required = true)
+    private boolean enabled;
+
+    @CommandLine.ArgGroup(exclusive = true, multiplicity = "1")
+    ClusterResource.Enumeration resourceEnumeration;
 
     @Override
     public String getSparkMasterUrl() {
@@ -132,26 +326,16 @@ public final class SparkRunModes {
       if (!System.getenv().containsKey("SPARK_SCALA_VERSION")) {
         throw new IllegalArgumentException("SPARK_SCALA_VERSION not set! 
Please set it first or use --local instead. Example: export 
SPARK_SCALA_VERSION=2.12");
       }
-      return String.format("local-cluster[%d,%d,%d]", lcWorkers, 
lcWorkerCores, Utils.byteStringAsMb(lcWorkerMem));
+      return String.format("local-cluster[%d,%d,%d]", 
resourceEnumeration.lcWorkers(), resourceEnumeration.lcWorkerCores(), 
resourceEnumeration.lcWorkerHeapMem());
     }
 
     @Override
     public Map<String, String> extraSparkConf() {
-      final Set<String> extraJarSet = 
Arrays.stream(EXTRA_JARS).collect(Collectors.toSet());
-      String classpath = System.getProperty("java.class.path");
-      String[] classPathValues = classpath.split(File.pathSeparator);
-      Optional<String> extraClassPath = 
Arrays.stream(classPathValues).filter(classPath -> {
-        File file = new File(classPath);
-        return file.exists() && file.isFile() && 
extraJarSet.contains(file.getName());
-      }).map(classPath -> {
-        File file = new File(classPath);
-        return file.getAbsolutePath();
-      }).reduce((s1, s2) -> s1 + File.pathSeparator + s2);
-
       final Map<String, String> extras = new HashMap<>();
-      extras.put(SparkLauncher.EXECUTOR_CORES, 
String.valueOf(lcExecutorCores));
-      extras.put(SparkLauncher.EXECUTOR_MEMORY, lcExecutorMem);
-      extraClassPath.ifPresent(path -> 
extras.put("spark.executor.extraClassPath", path));
+      extras.put(SparkLauncher.EXECUTOR_CORES, 
String.valueOf(resourceEnumeration.lcExecutorCores()));
+      extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm", 
resourceEnumeration.lcExecutorHeapMem()));
+      extras.put("spark.memory.offHeap.enabled", "true");
+      extras.put("spark.memory.offHeap.size", String.format("%dm", 
resourceEnumeration.lcExecutorOffHeapMem()));
       return extras;
     }
   }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 070c43e9b..64fc179ce 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -38,7 +38,6 @@ abstract class Suite(
     private val enableUi: Boolean,
     private val enableHsUi: Boolean,
     private val hsUiPort: Int,
-    private val offHeapSize: String,
     private val disableAqe: Boolean,
     private val disableBhj: Boolean,
     private val disableWscg: Boolean,
@@ -75,8 +74,6 @@ abstract class Suite(
   sessionSwitcher
     .defaultConf()
     .setWarningOnOverriding("spark.unsafe.exceptionOnMemoryLeak", 
s"$errorOnMemLeak")
-  
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.memory.offHeap.enabled",
 "true")
-  
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.memory.offHeap.size",
 offHeapSize)
 
   if (!enableUi) {
     sessionSwitcher.defaultConf().setWarningOnOverriding("spark.ui.enabled", 
"false")
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index f75431941..da0c6b8ce 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -41,7 +41,6 @@ class ClickBenchSuite(
     val enableUi: Boolean,
     val enableHsUi: Boolean,
     val hsUiPort: Int,
-    val offHeapSize: String,
     val disableAqe: Boolean,
     val disableBhj: Boolean,
     val disableWscg: Boolean,
@@ -58,7 +57,6 @@ class ClickBenchSuite(
       enableUi,
       enableHsUi,
       hsUiPort,
-      offHeapSize,
       disableAqe,
       disableBhj,
       disableWscg,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 190623614..9e6fa1edb 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -36,7 +36,6 @@ class TpcdsSuite(
     val enableUi: Boolean,
     val enableHsUi: Boolean,
     val hsUiPort: Int,
-    val offHeapSize: String,
     val disableAqe: Boolean,
     val disableBhj: Boolean,
     val disableWscg: Boolean,
@@ -53,7 +52,6 @@ class TpcdsSuite(
       enableUi,
       enableHsUi,
       hsUiPort,
-      offHeapSize,
       disableAqe,
       disableBhj,
       disableWscg,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 86fcaea0a..fc3ad1310 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -36,7 +36,6 @@ class TpchSuite(
     val enableUi: Boolean,
     val enableHsUi: Boolean,
     val hsUiPort: Int,
-    val offHeapSize: String,
     val disableAqe: Boolean,
     val disableBhj: Boolean,
     val disableWscg: Boolean,
@@ -53,7 +52,6 @@ class TpchSuite(
       enableUi,
       enableHsUi,
       hsUiPort,
-      offHeapSize,
       disableAqe,
       disableBhj,
       disableWscg,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to