This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new a1b66d53 [YUNIKORN-2875] E2E: Launch spark from official image to
avoid client java dependency (#956)
a1b66d53 is described below
commit a1b66d53be946a2fb63a32ea694742073366f48a
Author: kaichiachen <[email protected]>
AuthorDate: Tue Mar 11 18:01:33 2025 -0500
[YUNIKORN-2875] E2E: Launch spark from official image to avoid client java
dependency (#956)
Closes: #956
Signed-off-by: Craig Condit <[email protected]>
---
Makefile | 19 ++-----
.../spark_jobs_scheduling_test.go | 37 +++++--------
test/e2e/testdata/spark_jobs.sh | 62 +++++++++++-----------
3 files changed, 47 insertions(+), 71 deletions(-)
diff --git a/Makefile b/Makefile
index f73963cb..6fde1a02 100644
--- a/Makefile
+++ b/Makefile
@@ -191,13 +191,11 @@ HELM_ARCHIVE_BASE=$(OS)-$(EXEC_ARCH)
export PATH := $(BASE_DIR)/$(HELM_PATH):$(PATH)
# spark
-export SPARK_VERSION=3.3.3
+export SPARK_VERSION=3.5.5-java17
# sometimes the image is not avaiable with $SPARK_VERSION, the minor version
must match
-export SPARK_PYTHON_VERSION=3.3.1
-export SPARK_HOME=$(BASE_DIR)$(TOOLS_DIR)/spark-v$(SPARK_VERSION)
-export SPARK_SUBMIT_CMD=$(SPARK_HOME)/bin/spark-submit
+export SPARK_PYTHON_VERSION=3.4.0
+export SPARK_IMAGE=apache/spark:$(SPARK_VERSION)
export SPARK_PYTHON_IMAGE=docker.io/apache/spark-py:v$(SPARK_PYTHON_VERSION)
-export PATH := $(SPARK_HOME):$(PATH)
# go-licenses
GO_LICENSES_VERSION=v1.6.0
@@ -270,7 +268,7 @@ print_helm_version:
# Install tools
.PHONY: tools
-tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN)
$(HELM_BIN) $(SPARK_SUBMIT_CMD) $(GO_LICENSES_BIN) $(GINKGO_BIN)
+tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN)
$(HELM_BIN) $(GO_LICENSES_BIN) $(GINKGO_BIN)
# Install shellcheck
$(SHELLCHECK_BIN):
@@ -309,15 +307,6 @@ $(HELM_BIN):
@curl -sSfL "https://get.helm.sh/$(HELM_ARCHIVE)" \
| tar -x -z --strip-components=1 -C "$(HELM_PATH)"
"$(HELM_ARCHIVE_BASE)/helm"
-# Install spark
-$(SPARK_SUBMIT_CMD):
- @echo "installing spark v$(SPARK_VERSION)"
- @rm -rf "$(SPARK_HOME)" "$(SPARK_HOME).tmp"
- @mkdir -p "$(SPARK_HOME).tmp"
- @curl -sSfL
"https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz"
\
- | tar -x -z --strip-components=1 -C "$(SPARK_HOME).tmp"
- @mv -f "$(SPARK_HOME).tmp" "$(SPARK_HOME)"
-
# Install go-licenses
$(GO_LICENSES_BIN):
@echo "installing go-licenses $(GO_LICENSES_VERSION)"
diff --git a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
index 4bd2ad21..3c4a661b 100644
--- a/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
+++ b/test/e2e/spark_jobs_scheduling/spark_jobs_scheduling_test.go
@@ -21,7 +21,6 @@ package spark_jobs_scheduling
import (
"context"
"fmt"
- "net/url"
"os"
"os/exec"
"sort"
@@ -29,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/rest"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
@@ -46,16 +44,17 @@ var _ = Describe("", func() {
var exErr error
var sparkNS = "spark-" + common.RandSeq(10)
var svcAcc = "svc-acc-" + common.RandSeq(10)
- var config *rest.Config
- var masterURL string
var roleName = "spark-jobs-role-" + common.RandSeq(5)
var clusterEditRole = "edit"
- var sparkImage = os.Getenv("SPARK_PYTHON_IMAGE")
+ var sparkImage = os.Getenv("SPARK_IMAGE")
+ var sparkPyImage = os.Getenv("SPARK_PYTHON_IMAGE")
var sparkExecutorCount = 3
BeforeEach(func() {
By(fmt.Sprintf("Spark image is: %s", sparkImage))
Ω(sparkImage).NotTo(BeEmpty())
+ By(fmt.Sprintf("Spark_py image is: %s", sparkPyImage))
+ Ω(sparkPyImage).NotTo(BeEmpty())
kClient = k8s.KubeCtl{}
Ω(kClient.SetClient()).To(BeNil())
Ω(exErr).NotTo(HaveOccurred())
@@ -71,21 +70,6 @@ var _ = Describe("", func() {
By(fmt.Sprintf("Creating cluster role binding: %s for spark
jobs", roleName))
_, err = kClient.CreateClusterRoleBinding(roleName,
clusterEditRole, sparkNS, svcAcc)
Ω(err).NotTo(HaveOccurred())
-
- config, err = kClient.GetKubeConfig()
- Ω(err).NotTo(HaveOccurred())
-
- u, err := url.Parse(config.Host)
- Ω(err).NotTo(HaveOccurred())
- port := u.Port()
- if port == "" {
- port = "443"
- if u.Scheme == "http" {
- port = "80"
- }
- }
- masterURL = u.Scheme + "://" + u.Hostname() + ":" + port
- By(fmt.Sprintf("MasterURL info is %s ", masterURL))
})
It("Test_With_Spark_Jobs", func() {
@@ -93,8 +77,8 @@ var _ = Describe("", func() {
err := exec.Command(
"bash",
"../testdata/spark_jobs.sh",
- masterURL,
sparkImage,
+ sparkPyImage,
sparkNS,
svcAcc,
string(rune(sparkExecutorCount))).Run()
@@ -110,12 +94,15 @@ var _ = Describe("", func() {
err =
restClient.WaitforQueueToAppear(configmanager.DefaultPartition, sparkQueueName,
120)
Ω(err).NotTo(HaveOccurred())
- By(fmt.Sprintf("Get apps from specific queue: %s", sparkNS))
+ By(fmt.Sprintf("Get apps from specific queue: %s",
sparkQueueName))
var appsFromQueue []*dao.ApplicationDAOInfo
// Poll for apps to appear in the queue
err = wait.PollUntilContextTimeout(context.TODO(),
time.Millisecond*100, time.Duration(120)*time.Second, false,
func(context.Context) (done bool, err error) {
- appsFromQueue, err =
restClient.GetApps(configmanager.DefaultPartition,
configmanager.RootQueue+"."+sparkNS)
- if len(appsFromQueue) == 3 {
+ appsFromQueue, err =
restClient.GetApps(configmanager.DefaultPartition, sparkQueueName)
+ if err != nil {
+ return false, err
+ }
+ if len(appsFromQueue) == 4 {
return true, nil
}
return false, err
@@ -138,7 +125,7 @@ var _ = Describe("", func() {
By(fmt.Sprintf("Apps submitted are: %s", appIds))
// Verify that all the spark jobs are scheduled and are in
running state.
- for _, id := range appIds {
+ for _, id := range appIds[1:] {
By(fmt.Sprintf("Verify driver pod for application %s
has been created.", id))
err = kClient.WaitForPodBySelector(sparkNS,
fmt.Sprintf("spark-app-selector=%s, spark-role=driver", id), 180*time.Second)
Ω(err).ShouldNot(HaveOccurred())
diff --git a/test/e2e/testdata/spark_jobs.sh b/test/e2e/testdata/spark_jobs.sh
index 04384523..ce09793f 100755
--- a/test/e2e/testdata/spark_jobs.sh
+++ b/test/e2e/testdata/spark_jobs.sh
@@ -22,42 +22,42 @@ if [[ $# -gt 8 ]]; then
exit 1
fi
-MASTER_URL=$1
-SPARK_IMAGE=$2
+SPARK_IMAGE=$1
+SPARK_PY_IMAGE=$2
NAMESPACE=$3
SVC_ACC=$4
EXEC_COUNT=3
END=${6:-3}
-if [[ -z "${SPARK_HOME}" ]]; then
- SPARK_SUBMIT_CMD="spark-submit"
-else
- SPARK_SUBMIT_CMD="${SPARK_HOME}/bin/spark-submit"
-fi
-
+kubectl run spark-client --image="$SPARK_IMAGE" -n "$NAMESPACE"
--overrides="{\"spec\": {\"serviceAccountName\": \"$SVC_ACC\"}}" -- sleep
infinity
+kubectl wait --for=condition=ready pod/spark-client -n "$NAMESPACE"
--timeout=300s
+kubectl cp ../testdata/spark_pod_template.yaml
spark-client:/tmp/spark_pod_template.yaml -n "$NAMESPACE"
+MASTER_URL=$(kubectl exec spark-client -n "$NAMESPACE" -- bash -c "echo
\"https://\${KUBERNETES_SERVICE_HOST}:\${KUBERNETES_SERVICE_PORT}\"")
for i in $(seq 1 "$END"); do
- nohup "$SPARK_SUBMIT_CMD" \
- --master k8s://"$MASTER_URL" \
- --deploy-mode cluster \
- --name spark-yk-example-"$i" \
- --conf spark.executor.instances=$EXEC_COUNT \
- --conf spark.kubernetes.container.image="$SPARK_IMAGE" \
- --conf
spark.kubernetes.authenticate.driver.serviceAccountName="$SVC_ACC" \
- --conf spark.pyspark.python=python3 \
- --conf spark.pyspark.driver.python=python3 \
- --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \
- --conf
spark.kubernetes.driver.podTemplateFile=../testdata/spark_pod_template.yaml \
- --conf
spark.kubernetes.executor.podTemplateFile=../testdata/spark_pod_template.yaml \
- --conf spark.kubernetes.namespace="$NAMESPACE" \
- --conf spark.kubernetes.driver.limit.cores=0.5 \
- --conf spark.kubernetes.driver.request.cores=0.1 \
- --conf spark.driver.memory=500m \
- --conf spark.driver.memoryOverhead=500m \
- --conf spark.kubernetes.executor.limit.cores=0.5 \
- --conf spark.kubernetes.executor.request.cores=0.1 \
- --conf spark.executor.memory=500m \
- --conf spark.executor.memoryOverhead=500m \
- local:///opt/spark/examples/src/main/python/pi.py \
- 100 &
+ CMD="kubectl exec spark-client -n $NAMESPACE -- bash -c \
+ \"/opt/spark/bin/spark-submit \
+ --master k8s://$MASTER_URL \
+ --deploy-mode cluster \
+ --name spark-yk-example-$i \
+ --conf spark.executor.instances=$EXEC_COUNT \
+ --conf spark.kubernetes.container.image=$SPARK_PY_IMAGE \
+ --conf
spark.kubernetes.authenticate.driver.serviceAccountName=$SVC_ACC \
+ --conf spark.pyspark.python=python3 \
+ --conf spark.pyspark.driver.python=python3 \
+ --conf spark.kubernetes.file.upload.path=/opt/spark/upload-temp \
+ --conf
spark.kubernetes.driver.podTemplateFile=/tmp/spark_pod_template.yaml \
+ --conf
spark.kubernetes.executor.podTemplateFile=/tmp/spark_pod_template.yaml \
+ --conf spark.kubernetes.namespace=$NAMESPACE \
+ --conf spark.kubernetes.driver.limit.cores=0.5 \
+ --conf spark.kubernetes.driver.request.cores=0.1 \
+ --conf spark.driver.memory=500m \
+ --conf spark.driver.memoryOverhead=500m \
+ --conf spark.kubernetes.executor.limit.cores=0.5 \
+ --conf spark.kubernetes.executor.request.cores=0.1 \
+ --conf spark.executor.memory=500m \
+ --conf spark.executor.memoryOverhead=500m \
+ local:///opt/spark/examples/src/main/python/pi.py \
+ 100\""
+ eval "$CMD" &
sleep 2
done;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]