This is an automated email from the ASF dual-hosted git repository. wangyang pushed a commit to branch spark-operator in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 8be651e3f4dc066a8591d6f3d8dc009c6f87813f Author: xiaohzho <xiaoh...@cisco.com> AuthorDate: Wed Aug 23 14:26:47 2023 +0800 [Spark][Operator] Add Spark operator CRD --- .../plugin/task/spark/crd/ApplicationState.java | 36 ++ .../task/spark/crd/ApplicationStateType.java | 25 ++ .../spark/crd/BatchSchedulerConfiguration.java | 26 ++ .../plugin/task/spark/crd/Dependencies.java | 41 ++ .../plugin/task/spark/crd/DriverInfo.java | 87 ++++ .../plugin/task/spark/crd/DriverSpec.java | 224 ++++++++++ .../plugin/task/spark/crd/DynamicAllocation.java | 90 ++++ .../plugin/task/spark/crd/ExecutorSpec.java | 217 ++++++++++ .../plugin/task/spark/crd/MonitoringSpec.java | 87 ++++ .../plugin/task/spark/crd/NamePath.java | 7 + .../plugin/task/spark/crd/Port.java | 11 + .../plugin/task/spark/crd/PrometheusSpec.java | 73 ++++ .../plugin/task/spark/crd/RestartPolicy.java | 83 ++++ .../plugin/task/spark/crd/SecretInfo.java | 6 + .../plugin/task/spark/crd/SparkApplication.java | 30 ++ .../task/spark/crd/SparkApplicationList.java | 7 + .../task/spark/crd/SparkApplicationSpec.java | 481 +++++++++++++++++++++ .../task/spark/crd/SparkApplicationStatus.java | 151 +++++++ .../task/spark/crd/SparkUIConfiguration.java | 94 ++++ 19 files changed, 1776 insertions(+) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java new file mode 100644 index 0000000000..d13c0f0c06 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java @@ -0,0 +1,36 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ApplicationState { + + ApplicationStateType state; + + String errorMessage; + + + public ApplicationStateType getState() { + return state; + } + + public void setState(ApplicationStateType state) { + this.state = state; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + @Override + public String toString() { + return "ApplicationState{" + + "state=" + state + + ", errorMessage='" + errorMessage + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java new file mode 100644 index 0000000000..c35a30856c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +public enum ApplicationStateType { + COMPLETED, + + FAILED, + + SUBMISSION_FAILED, + + FAILING, + + INVALIDATING, + + PENDING_RERUN, + + RUNNING, + + SUBMITTED, + + SUCCEEDING, + + FINISHED, + + UNKNOWN +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java new file mode 100644 index 0000000000..e688ffa07c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java @@ -0,0 +1,26 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import io.fabric8.kubernetes.api.model.DefaultKubernetesResourceList; + +public class BatchSchedulerConfiguration { + + /** + * (Optional) Queue stands for the resource queue which the application belongs to, it’s being + * used in Volcano batch scheduler. + */ + String queue; + + /** + * (Optional) PriorityClassName stands for the name of k8s PriorityClass resource, it’s being used + * in Volcano batch scheduler. + */ + String priorityClassName; + + /** + * (Optional) Resources stands for the resource list custom request for. Usually it is used to + * define the lower-bound limit. If specified, volcano scheduler will consider it as the resources + * requested. + */ + DefaultKubernetesResourceList resources; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java new file mode 100644 index 0000000000..607c931697 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java @@ -0,0 +1,41 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import java.util.List; + +public class Dependencies { + + /** + * (Optional) Jars is a list of JAR files the Spark application depends on. + */ + List<String> jars; + + /** + * (Optional) Files is a list of files the Spark application depends on. + */ + List<String> files; + + /** + * (Optional) PyFiles is a list of Python files the Spark application depends on. + */ + List<String> pyFiles; + + /** + * (Optional) Packages is a list of maven coordinates of jars to include on the driver and + * executor classpaths. This will search the local maven repo, then maven central and any + * additional remote repositories given by the “repositories” option. Each package should be of + * the form “groupId:artifactId:version”. + */ + List<String> packages; + + /** + * (Optional) ExcludePackages is a list of “groupId:artifactId”, to exclude while resolving the + * dependencies provided in Packages to avoid dependency conflicts. + */ + List<String> excludePackages; + + /** + * (Optional) Repositories is a list of additional remote repositories to search for the maven + * coordinate given with the “packages” option. + */ + List<String> repositories; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java new file mode 100644 index 0000000000..c915b67679 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java @@ -0,0 +1,87 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class DriverInfo { + + String webUIServiceName; + + /** + * UI Details for the UI created via ClusterIP service accessible from within the cluster. + */ + int webUIPort; + + + String webUIAddress; + + /** + * Ingress Details if an ingress for the UI was created. + */ + String webUIIngressName; + + + String webUIIngressAddress; + + String podName; + + @Override + public String toString() { + return "DriverInfo{" + + "webUIServiceName='" + webUIServiceName + '\'' + + ", webUIPort=" + webUIPort + + ", webUIAddress='" + webUIAddress + '\'' + + ", webUIIngressName='" + webUIIngressName + '\'' + + ", webUIIngressAddress='" + webUIIngressAddress + '\'' + + ", podName='" + podName + '\'' + + '}'; + } + + public String getWebUIServiceName() { + return webUIServiceName; + } + + public void setWebUIServiceName(String webUIServiceName) { + this.webUIServiceName = webUIServiceName; + } + + public int getWebUIPort() { + return webUIPort; + } + + public void setWebUIPort(int webUIPort) { + this.webUIPort = webUIPort; + } + + public String getWebUIAddress() { + return webUIAddress; + } + + public void setWebUIAddress(String webUIAddress) { + this.webUIAddress = webUIAddress; + } + + public String getWebUIIngressName() { + return webUIIngressName; + } + + public void setWebUIIngressName(String webUIIngressName) { + this.webUIIngressName = webUIIngressName; + } + + public String getWebUIIngressAddress() { + return webUIIngressAddress; + } + + public void setWebUIIngressAddress(String webUIIngressAddress) { + this.webUIIngressAddress = webUIIngressAddress; + } + + public String getPodName() { + return podName; + } + + public void setPodName(String podName) { + this.podName = podName; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java new file mode 100644 index 0000000000..8868bbb11d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java @@ -0,0 +1,224 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.EnvFromSource; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.HostAlias; +import io.fabric8.kubernetes.api.model.Lifecycle; +import io.fabric8.kubernetes.api.model.PodDNSConfig; +import io.fabric8.kubernetes.api.model.Toleration; +import io.fabric8.kubernetes.api.model.VolumeMount; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@ToString +public class DriverSpec { + + + /** + * (Optional) PodName is the name of the driver pod that the user creates. This is used for the + * in-cluster client mode in which the user creates a client pod where the driver of the user + * application runs. It’s an error to set this field if Mode is not in-cluster-client. + */ + String podName; + + /** + * (Optional) CoreRequest is the physical CPU core request for the driver. Maps to + * spark.kubernetes.driver.request.cores that is available since Spark 3.0. + */ + String coreRequest; + + /** + * (Optional) JavaOptions is a String of extra JVM options to pass to the driver. For instance, GC + * settings or other logging. + */ + String javaOptions; + + /** + * (Optional) Lifecycle for running preStop or postStart commands + */ + Lifecycle lifecycle; + + /** + * (Optional) KubernetesMaster is the URL of the Kubernetes master used by the driver to manage + * executor pods and other Kubernetes resources. Default to https://kubernetes.default.svc. + */ + String kubernetesMaster; + + /** + * (Optional) ServiceAnnotations defines the annotations to be added to the Kubernetes headless + * service used by executors to connect to the driver. + */ + Map<String, String> serviceAnnotations; + + /** + * (Optional) Ports settings for the pods, following the Kubernetes specifications. + */ + List<Port> ports; + + + /** + * (Optional) Cores maps to spark.driver.cores or spark.executor.cores for the driver and + * executors, respectively. + */ + Integer cores; + + /** + * CoreLimit specifies a hard limit on CPU cores for the pod. Optional + */ + String coreLimit; + + /** + * (Optional) Memory is the amount of memory to request for the pod. + */ + String memory; + + + /** + * (Optional) MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB + * unless otherwise specified. + */ + String memoryOverhead; + + + /** + * (Optional) Image is the container image to use. Overrides Spec.Image if set. + */ + String image; + + + /** + * (Optional) ConfigMaps carries information of other ConfigMaps to add to the pod. + */ + List<NamePath> configMaps; + + + /** + * (Optional) Env carries the environment variables to add to the pod. + */ + List<EnvVar> env; + + /** + * (Optional) EnvVars carries the environment variables to add to the pod. Deprecated. Consider + * using env instead. + */ + Map<String, String> envVars; + + /** + * (Optional) EnvFrom is a list of sources to populate environment variables in the container. + */ + List<EnvFromSource> envFrom; + + + /** + * (Optional) Labels are the Kubernetes labels to be added to the pod. + */ + Map<String, String> labels; + + /** + * + */ + Map<String, String> annotations; + + /** + * (Optional) VolumeMounts specifies the volumes listed in “.spec.volumes” to mount into the main + * container’s filesystem. + */ + List<VolumeMount> volumeMounts; + + /** + * (Optional) Affinity specifies the affinity/anti-affinity settings for the pod. + */ + Affinity affinity; + + /** + * (Optional) Tolerations specifies the tolerations listed in “.spec.tolerations” to be applied to + * the pod. + */ + List<Toleration> tolerations; + + /** + * (Optional) PodSecurityContext specifies the PodSecurityContext to apply. + */ + io.fabric8.kubernetes.api.model.PodSecurityContext podSecurityContext; + + /** + * (Optional) SecurityContext specifies the container’s SecurityContext to apply. + */ + io.fabric8.kubernetes.api.model.SecurityContext securityContext; + + /** + * (Optional) SchedulerName specifies the scheduler that will be used for scheduling + */ + String schedulerName; + + /** + * (Optional) Sidecars is a list of sidecar containers that run along side the main Spark + * container. + */ + List<Container> sidecars; + + /** + * (Optional) InitContainers is a list of init-containers that run to completion before the main + * Spark container. + */ + List<Container> initContainers; + + /** + * (Optional) HostNetwork indicates whether to request host networking for the pod or not. + */ + boolean hostNetwork; + + /** + * (Optional) NodeSelector is the Kubernetes node selector to be added to the driver and executor + * pods. This field is mutually exclusive with nodeSelector at SparkApplication level (which will + * be deprecated). + */ + Map<String, String> nodeSelector; + + /** + * (Optional) DnsConfig dns settings for the pod, following the Kubernetes specifications. + */ + PodDNSConfig dnsConfig; + + /** + * (Optional) Termination grace period seconds for the pod + */ + long terminationGracePeriodSeconds; + + /** + * (Optional) ServiceAccount is the name of the custom Kubernetes service account used by the + * pod. + */ + String serviceAccount; + + /** + * (Optional) HostAliases settings for the pod, following the Kubernetes specifications. + */ + List<HostAlias> hostAliases; + /** + * (Optional) ShareProcessNamespace settings for the pod, following the Kubernetes + * specifications. + */ + boolean shareProcessNamespace; + + public DriverSpec() { + } + + public void addEnv(EnvVar var) { + if (this.env == null) { + this.env = new ArrayList<>(); + } + this.env.add(var); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java new file mode 100644 index 0000000000..33677e870f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java @@ -0,0 +1,90 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class DynamicAllocation { + + /** + * bool Enabled controls whether dynamic allocation is enabled or not. + */ + boolean enabled; + + /** + * int32 (Optional) long InitialExecutors is the initial number of executors to request. If + * .spec.executor.instances is also set, the initial number of executors is set to the bigger of + * that and this option. + */ + int initialExecutors; + + /** + * int32 (Optional) MinExecutors is the lower bound for the number of executors if dynamic + * allocation is enabled. + */ + int minExecutors; + + /** + * int32 (Optional) MaxExecutors is the upper bound for the number of executors if dynamic + * allocation is enabled. + */ + int maxExecutors; + + /** + * int64 (Optional) ShuffleTrackingTimeout controls the timeout in milliseconds for executors that + * are holding shuffle data if shuffle tracking is enabled (true by default if dynamic allocation + * is enabled). + */ + long shuffleTrackingTimeout; + + public DynamicAllocation(int maxExecutors) { + this.maxExecutors = maxExecutors; + this.enabled = true; + this.initialExecutors = maxExecutors / 4; + this.minExecutors = 1; + this.shuffleTrackingTimeout = 300000; + } + + public DynamicAllocation() { + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getInitialExecutors() { + return initialExecutors; + } + + public void setInitialExecutors(int initialExecutors) { + this.initialExecutors = initialExecutors; + } + + public int getMinExecutors() { + return minExecutors; + } + + public void setMinExecutors(int minExecutors) { + this.minExecutors = minExecutors; + } + + public int getMaxExecutors() { + return maxExecutors; + } + + public void setMaxExecutors(int maxExecutors) { + this.maxExecutors = maxExecutors; + } + + public long getShuffleTrackingTimeout() { + return shuffleTrackingTimeout; + } + + public void setShuffleTrackingTimeout(long shuffleTrackingTimeout) { + this.shuffleTrackingTimeout = shuffleTrackingTimeout; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java new file mode 100644 index 0000000000..4f78441ca9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java @@ -0,0 +1,217 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.EnvFromSource; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.HostAlias; +import io.fabric8.kubernetes.api.model.PodDNSConfig; +import io.fabric8.kubernetes.api.model.Toleration; +import io.fabric8.kubernetes.api.model.VolumeMount; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@ToString +public class ExecutorSpec { + + /** + * (Optional) Instances is the number of executor instances. + */ + int instances; + + /** + * (Optional) CoreRequest is the physical CPU core request for the executors. Maps to + * spark.kubernetes.executor.request.cores that is available since Spark 2.4. + */ + String coreRequest; + + /** + * (Optional) JavaOptions is a String of extra JVM options to pass to the executors. For instance, + * GC settings or other logging. + */ + String javaOptions; + + /** + * (Optional) DeleteOnTermination specify whether executor pods should be deleted in case of + * failure or normal termination. Maps to spark.kubernetes.executor.deleteOnTermination that is + * available since Spark 3.0. + */ + boolean deleteOnTermination; + + /** + * (Optional) Ports settings for the pods, following the Kubernetes specifications. + */ + List<Port> ports; + + + /** + * (Optional) Cores maps to spark.driver.cores or spark.executor.cores for the driver and + * executors, respectively. + */ + Integer cores; + + /** + * CoreLimit specifies a hard limit on CPU cores for the pod. Optional + */ + String coreLimit; + + /** + * (Optional) Memory is the amount of memory to request for the pod. + */ + String memory; + + + /** + * (Optional) MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB + * unless otherwise specified. + */ + String memoryOverhead; + + /** + * (Optional) This sets the Memory Overhead Factor that will allocate memory to non-JVM memory. + * For JVM-based jobs this value will default to 0.10, for non-JVM jobs 0.40. Value of this field + * will be overridden by Spec.Driver.MemoryOverhead and Spec.Executor.MemoryOverhead if they are + * set. + */ + String memoryOverheadFactor = "0.20"; + + /** + * (Optional) Image is the container image to use. Overrides Spec.Image if set. + */ + String image; + + + /** + * (Optional) ConfigMaps carries information of other ConfigMaps to add to the pod. + */ + List<NamePath> configMaps; + + + /** + * (Optional) Env carries the environment variables to add to the pod. + */ + List<EnvVar> env; + + /** + * (Optional) EnvVars carries the environment variables to add to the pod. Deprecated. Consider + * using env instead. + */ + Map<String, String> envVars; + + /** + * (Optional) EnvFrom is a list of sources to populate environment variables in the container. + */ + List<EnvFromSource> envFrom; + + + /** + * (Optional) Labels are the Kubernetes labels to be added to the pod. + */ + Map<String, String> labels; + + /** + * + */ + Map<String, String> annotations; + + /** + * (Optional) VolumeMounts specifies the volumes listed in “.spec.volumes” to mount into the main + * container’s filesystem. + */ + List<VolumeMount> volumeMounts; + + /** + * (Optional) Affinity specifies the affinity/anti-affinity settings for the pod. + */ + Affinity affinity; + + /** + * (Optional) Tolerations specifies the tolerations listed in “.spec.tolerations” to be applied to + * the pod. + */ + List<Toleration> tolerations; + + /** + * (Optional) PodSecurityContext specifies the PodSecurityContext to apply. + */ + io.fabric8.kubernetes.api.model.PodSecurityContext podSecurityContext; + + /** + * (Optional) SecurityContext specifies the container’s SecurityContext to apply. + */ + io.fabric8.kubernetes.api.model.SecurityContext securityContext; + + /** + * (Optional) SchedulerName specifies the scheduler that will be used for scheduling + */ + String schedulerName; + + /** + * (Optional) Sidecars is a list of sidecar containers that run along side the main Spark + * container. + */ + List<Container> sidecars; + + /** + * (Optional) InitContainers is a list of init-containers that run to completion before the main + * Spark container. + */ + List<Container> initContainers; + + /** + * (Optional) HostNetwork indicates whether to request host networking for the pod or not. + */ + boolean hostNetwork; + + /** + * (Optional) NodeSelector is the Kubernetes node selector to be added to the driver and executor + * pods. This field is mutually exclusive with nodeSelector at SparkApplication level (which will + * be deprecated). + */ + Map<String, String> nodeSelector; + + /** + * (Optional) DnsConfig dns settings for the pod, following the Kubernetes specifications. + */ + PodDNSConfig dnsConfig; + + /** + * (Optional) Termination grace period seconds for the pod + */ + long terminationGracePeriodSeconds; + + /** + * (Optional) ServiceAccount is the name of the custom Kubernetes service account used by the + * pod. + */ + String serviceAccount; + + /** + * (Optional) HostAliases settings for the pod, following the Kubernetes specifications. + */ + List<HostAlias> hostAliases; + /** + * (Optional) ShareProcessNamespace settings for the pod, following the Kubernetes + * specifications. + */ + boolean shareProcessNamespace; + + public ExecutorSpec() { + } + + public void addEnv(EnvVar var) { + if (this.env == null) { + this.env = new ArrayList<>(); + } + this.env.add(var); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java new file mode 100644 index 0000000000..fffabdad34 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java @@ -0,0 +1,87 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class MonitoringSpec { + + /** + * ExposeDriverMetrics specifies whether to expose metrics on the driver. + */ + boolean exposeDriverMetrics; + + /** + * ExposeExecutorMetrics specifies whether to expose metrics on the executors. + */ + boolean exposeExecutorMetrics; + + /** + * (Optional) MetricsProperties is the content of a custom metrics.properties for configuring the + * Spark metric system. If not specified, the content in spark-docker/conf/metrics.properties will + * be used. + */ + String metricsProperties; + + /** + * (Optional) MetricsPropertiesFile is the container local path of file metrics.properties for + * configuring the Spark metric system. If not specified, value + * /etc/metrics/conf/metrics.properties will be used. + */ + String metricsPropertiesFile; + + /** + * (Optional) Prometheus is for configuring the Prometheus JMX exporter. + */ + PrometheusSpec prometheus; + + public boolean isExposeDriverMetrics() { + return exposeDriverMetrics; + } + + public void setExposeDriverMetrics(boolean exposeDriverMetrics) { + this.exposeDriverMetrics = exposeDriverMetrics; + } + + public boolean isExposeExecutorMetrics() { + return exposeExecutorMetrics; + } + + public void setExposeExecutorMetrics(boolean exposeExecutorMetrics) { + this.exposeExecutorMetrics = exposeExecutorMetrics; + } + + public String getMetricsProperties() { + return metricsProperties; + } + + public void setMetricsProperties(String metricsProperties) { + this.metricsProperties = metricsProperties; + } + + public String getMetricsPropertiesFile() { + return metricsPropertiesFile; + } + + public void setMetricsPropertiesFile(String metricsPropertiesFile) { + this.metricsPropertiesFile = metricsPropertiesFile; + } + + public PrometheusSpec getPrometheus() { + return prometheus; + } + + public void setPrometheus(PrometheusSpec prometheus) { + this.prometheus = prometheus; + } + + @Override + public String toString() { + return "MonitoringSpec{" + + "exposeDriverMetrics=" + exposeDriverMetrics + + ", exposeExecutorMetrics=" + exposeExecutorMetrics + + ", metricsProperties='" + metricsProperties + '\'' + + ", metricsPropertiesFile='" + metricsPropertiesFile + '\'' + + ", prometheus=" + prometheus + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java new file mode 100644 index 0000000000..88a7feafb6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +public class NamePath { + + String name; + String path; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java new file mode 100644 index 0000000000..79f8eb212f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java @@ -0,0 +1,11 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +public class Port { + + String name; + + String protocol; + + int containerPort; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java new file mode 100644 index 0000000000..3f69b51465 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java @@ -0,0 +1,73 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PrometheusSpec { + + /** + * JmxExporterJar is the path to the Prometheus JMX exporter jar in the container. + */ + String jmxExporterJar; + + + /** + * (Optional) Port is the port of the HTTP server run by the Prometheus JMX exporter. If not + * specified, 8090 will be used as the default. + */ + int port; + + /** + * (Optional) PortName is the port name of prometheus JMX exporter port. If not specified, + * jmx-exporter will be used as the default. + */ + String portName; + + /** + * (Optional) ConfigFile is the path to the custom Prometheus configuration file provided in the + * Spark image. ConfigFile takes precedence over Configuration, which is shown below. + */ + String configFile; + + /** + * (Optional) Configuration is the content of the Prometheus configuration needed by the + * Prometheus JMX exporter. If not specified, the content in spark-docker/conf/prometheus.yaml + * will be used. Configuration has no effect if ConfigFile is set. + */ + String configuration; + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getJmxExporterJar() { + return jmxExporterJar; + } + + public void setJmxExporterJar(String jmxExporterJar) { + this.jmxExporterJar = jmxExporterJar; + } + + public String getConfigFile() { + return configFile; + } + + public void setConfigFile(String configFile) { + this.configFile = configFile; + } + + @Override + public String toString() { + return "PrometheusSpec{" + + "jmxExporterJar='" + jmxExporterJar + '\'' + + ", port=" + port + + ", portName='" + portName + '\'' + + ", configFile='" + configFile + '\'' + + ", configuration='" + configuration + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java new file mode 100644 index 0000000000..6d5da14ada --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java @@ -0,0 +1,83 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class RestartPolicy { + + /** + * Type specifies the RestartPolicyType. can be: "Always" "Never" "OnFailure" + */ + String type; + + /** + * (Optional) OnSubmissionFailureRetries is the number of times to retry submitting an application + * before giving up. This is best effort and actual retry attempts can be >= the value specified + * due to caching. These are required if RestartPolicy is OnFailure. + */ + Integer onSubmissionFailureRetries; + + /** + * (Optional) OnFailureRetries the number of times to retry running an application before giving + * up. + */ + Integer onFailureRetries; + + /** + * (Optional) OnSubmissionFailureRetryInterval is the interval in seconds between retries on + * failed submissions. + */ + Long onSubmissionFailureRetryInterval; + + /** + * (Optional) OnFailureRetryInterval is the interval in seconds between retries on failed runs. + */ + Long onFailureRetryInterval; + + public RestartPolicy() { + } + + public RestartPolicy(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Integer getOnSubmissionFailureRetries() { + return onSubmissionFailureRetries; + } + + public void setOnSubmissionFailureRetries(Integer onSubmissionFailureRetries) { + this.onSubmissionFailureRetries = onSubmissionFailureRetries; + } + + public Integer getOnFailureRetries() { + return onFailureRetries; + } + + public void setOnFailureRetries(Integer onFailureRetries) { + this.onFailureRetries = onFailureRetries; + } + + public Long getOnSubmissionFailureRetryInterval() { + return onSubmissionFailureRetryInterval; + } + + public void setOnSubmissionFailureRetryInterval(long onSubmissionFailureRetryInterval) { + this.onSubmissionFailureRetryInterval = onSubmissionFailureRetryInterval; + } + + public Long getOnFailureRetryInterval() { + return onFailureRetryInterval; + } + + public void setOnFailureRetryInterval(long onFailureRetryInterval) { + this.onFailureRetryInterval = onFailureRetryInterval; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java new file mode 100644 index 0000000000..6b7ae7affe --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java @@ -0,0 +1,6 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +public class SecretInfo { + + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java new file mode 100644 index 0000000000..56939f8f91 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java @@ -0,0 +1,30 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.Kind; +import io.fabric8.kubernetes.model.annotation.Version; + +@Version(SparkApplication.apiVersion) +@Group(SparkApplication.group) +@Kind(SparkApplication.kind) +public class SparkApplication extends + CustomResource<SparkApplicationSpec, SparkApplicationStatus> implements + Namespaced { + + public static final String apiVersion = "v1beta2"; + public static final String group = "sparkoperator.k8s.io"; + public static final String kind = "SparkApplication"; + + public SparkApplication() { + } + + @Override + public String toString() { + return "SparkApplication{" + + "spec=" + spec + + '}'; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java new file mode 100644 index 0000000000..1c55d211b7 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java @@ -0,0 +1,7 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import io.fabric8.kubernetes.client.CustomResourceList; + +public class SparkApplicationList extends CustomResourceList<SparkApplication> { + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java new file mode 100644 index 0000000000..37cd3b35b8 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java @@ -0,0 +1,481 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.Volume; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkApplicationSpec { + + /** + * Type tells the type of the Spark application. can be + * <p> + * "Java" "Python" "R" "Scala" + */ + String type; + + /** + * SparkVersion is the version of Spark the application uses. + */ + String sparkVersion; + + /** + * Mode is the deployment mode of the Spark application. can be "client" "cluster" + * "in-cluster-client" + */ + String mode; + + + /** + * String (Optional) ProxyUser specifies the user to impersonate when submitting the application. + * It maps to the command-line flag “–proxy-user” in spark-submit. + */ + String proxyUser; + + /** + * (Optional) Image is the container image for the driver, executor, and init-container. Any + * custom container images for the driver, executor, or init-container takes precedence over + * this. + */ + String image; + + /** + * (Optional) ImagePullPolicy is the image pull policy for the driver, executor, and + * init-container. + */ + String imagePullPolicy; + + /** + * (Optional) ImagePullSecrets is the list of image-pull secrets. + */ + List<String> imagePullSecrets; + + /** + * (Optional) MainClass is the fully-qualified main class of the Spark application. This only + * applies to Java/Scala Spark applications. + */ + String mainClass; + + /** + * (Optional) MainFile is the path to a bundled JAR, Python, or R file of the application. + */ + String mainApplicationFile; + + /** + * (Optional) Arguments is a list of arguments to be passed to the application. + */ + List<String> arguments; + + /** + * (Optional) SparkConf carries user-specified Spark configuration properties as they would use + * the “–conf” option in spark-submit. + */ + Map<String, String> sparkConf; + + /** + * (Optional) HadoopConf carries user-specified Hadoop configuration properties as they would use + * the the “–conf” option in spark-submit. The SparkApplication controller automatically adds + * prefix “spark.hadoop.” to Hadoop configuration properties. + */ + Map<String, String> hadoopConf; + + /** + * (Optional) SparkConfigMap carries the name of the ConfigMap containing Spark configuration + * files such as log4j.properties. The controller will add environment variable SPARK_CONF_DIR to + * the path where the ConfigMap is mounted to. + */ + String sparkConfigMap; + + /** + * (Optional) HadoopConfigMap carries the name of the ConfigMap containing Hadoop configuration + * files such as core-site.xml. The controller will add environment variable HADOOP_CONF_DIR to + * the path where the ConfigMap is mounted to. + */ + String hadoopConfigMap; + + /** + * (Optional) Volumes is the list of Kubernetes volumes that can be mounted by the driver and/or + * executors. + */ + List<Volume> volumes; + + /** + * Driver is the driver specification. + */ + DriverSpec driver; + + /** + * Executor is the executor specification. + */ + ExecutorSpec executor; + + /** + * (Optional) Deps captures all possible types of dependencies of a Spark application. + */ + Dependencies deps; + + /** + * RestartPolicy defines the policy on if and in which conditions the controller should restart an + * application. + */ + RestartPolicy restartPolicy; + + /** + * (Optional) NodeSelector is the Kubernetes node selector to be added to the driver and executor + * pods. This field is mutually exclusive with nodeSelector at podSpec level (driver or executor). + * This field will be deprecated in future versions (at SparkApplicationSpec level). + */ + Map<String, String> nodeSelector; + + /** + * (Optional) FailureRetries is the number of times to retry a failed application before giving + * up. This is best effort and actual retry attempts can be >= the value specified. + */ + Integer failureRetries; + + /** + * (Optional) RetryInterval is the unit of intervals in seconds between submission retries. + */ + long retryInterval; + + /** + * (Optional) This sets the major Python version of the docker image used to run the driver and + * executor containers. Can either be 2 or 3, default 2. + */ + String pythonVersion; + + /** + * (Optional) This sets the Memory Overhead Factor that will allocate memory to non-JVM memory. + * For JVM-based jobs this value will default to 0.10, for non-JVM jobs 0.40. Value of this field + * will be overridden by Spec.Driver.MemoryOverhead and Spec.Executor.MemoryOverhead if they are + * set. + */ + String memoryOverheadFactor; + + /** + * (Optional) Monitoring configures how monitoring is handled. + */ + MonitoringSpec monitoring; + + /** + * (Optional) BatchScheduler configures which batch scheduler will be used for scheduling + */ + String batchScheduler; + + + /** + * (Optional) TimeToLiveSeconds defines the Time-To-Live (TTL) duration in seconds for this + * SparkApplication after its termination. The SparkApplication object will be garbage collected + * if the current time is more than the TimeToLiveSeconds since its termination. + */ + long timeToLiveSeconds; + + /** + * (Optional) BatchSchedulerOptions provides fine-grained control on how to batch scheduling. + */ + BatchSchedulerConfiguration batchSchedulerOptions; + + /** + * (Optional) SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI + */ + + SparkUIConfiguration sparkUIOptions; + + /** + * (Optional) DynamicAllocation configures dynamic allocation that becomes available for the + * Kubernetes scheduler backend since Spark 3.0. + */ + DynamicAllocation dynamicAllocation; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getSparkVersion() { + return sparkVersion; + } + + public void setSparkVersion(String sparkVersion) { + this.sparkVersion = sparkVersion; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getProxyUser() { + return proxyUser; + } + + public void setProxyUser(String proxyUser) { + this.proxyUser = proxyUser; + } + + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + public String getImagePullPolicy() { + return imagePullPolicy; + } + + public void setImagePullPolicy(String imagePullPolicy) { + this.imagePullPolicy = imagePullPolicy; + } + + public List<String> getImagePullSecrets() { + return imagePullSecrets; + } + + public void setImagePullSecrets(List<String> imagePullSecrets) { + this.imagePullSecrets = imagePullSecrets; + } + + public String getMainClass() { + return mainClass; + } + + public void setMainClass(String mainClass) { + this.mainClass = mainClass; + } + + public String getMainApplicationFile() { + return mainApplicationFile; + } + + public void setMainApplicationFile(String mainApplicationFile) { + this.mainApplicationFile = mainApplicationFile; + } + + public List<String> getArguments() { + return arguments; + } + + public void setArguments(List<String> arguments) { + this.arguments = arguments; + } + + public Map<String, String> getSparkConf() { + return sparkConf; + } + + public void addSparkConf(Map<String, String> sparkConf) { + if (this.sparkConf == null) { + this.sparkConf = sparkConf; + } else { + this.sparkConf.putAll(sparkConf); + } + } + + public Map<String, String> getHadoopConf() { + return hadoopConf; + } + + public void setHadoopConf(Map<String, String> hadoopConf) { + this.hadoopConf = hadoopConf; + } + + public String getSparkConfigMap() { + return sparkConfigMap; + } + + public void setSparkConfigMap(String sparkConfigMap) { + this.sparkConfigMap = sparkConfigMap; + } + + public String getHadoopConfigMap() { + return hadoopConfigMap; + } + + public void setHadoopConfigMap(String hadoopConfigMap) { + this.hadoopConfigMap = hadoopConfigMap; + } + + public List<Volume> getVolumes() { + return volumes; + } + + public void setVolumes(List<Volume> volumes) { + this.volumes = volumes; + } + + public DriverSpec getDriver() { + return driver; + } + + public void setDriver(DriverSpec driver) { + this.driver = driver; + } + + public ExecutorSpec getExecutor() { + return executor; + } + + public void setExecutor(ExecutorSpec executor) { + this.executor = executor; + } + + public Dependencies getDeps() { + return deps; + } + + public void setDeps(Dependencies deps) { + this.deps = deps; + } + + public RestartPolicy getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicy restartPolicy) { + this.restartPolicy = restartPolicy; + } + + public Map<String, String> getNodeSelector() { + return nodeSelector; + } + + public void setNodeSelector(Map<String, String> nodeSelector) { + this.nodeSelector = nodeSelector; + } + + public Integer getFailureRetries() { + return failureRetries; + } + + public void setFailureRetries(Integer failureRetries) { + this.failureRetries = failureRetries; + } + + public long getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(long retryInterval) { + this.retryInterval = retryInterval; + } + + public String getPythonVersion() { + return pythonVersion; + } + + public void setPythonVersion(String pythonVersion) { + this.pythonVersion = pythonVersion; + } + + public String getMemoryOverheadFactor() { + return memoryOverheadFactor; + } + + public void setMemoryOverheadFactor(String memoryOverheadFactor) { + this.memoryOverheadFactor = memoryOverheadFactor; + } + + public MonitoringSpec getMonitoring() { + return monitoring; + } + + public void setMonitoring(MonitoringSpec monitoring) { + this.monitoring = monitoring; + } + + public String getBatchScheduler() { + return batchScheduler; + } + + public void setBatchScheduler(String batchScheduler) { + this.batchScheduler = batchScheduler; + } + + public long getTimeToLiveSeconds() { + return timeToLiveSeconds; + } + + public void setTimeToLiveSeconds(long timeToLiveSeconds) { + this.timeToLiveSeconds = timeToLiveSeconds; + } + + public BatchSchedulerConfiguration getBatchSchedulerOptions() { + return batchSchedulerOptions; + } + + public void setBatchSchedulerOptions( + BatchSchedulerConfiguration batchSchedulerOptions) { + this.batchSchedulerOptions = batchSchedulerOptions; + } + + public SparkUIConfiguration getSparkUIOptions() { + return sparkUIOptions; + } + + public void setSparkUIOptions( + SparkUIConfiguration sparkUIOptions) { + this.sparkUIOptions = sparkUIOptions; + } + + public DynamicAllocation getDynamicAllocation() { + return dynamicAllocation; + } + + public void setDynamicAllocation( + DynamicAllocation dynamicAllocation) { + this.dynamicAllocation = dynamicAllocation; + } + + public void addConfToSparkConf(String name, String value) { + if (this.sparkConf == null) { + this.sparkConf = new HashMap<>(); + } + sparkConf.put(name, value); + } + + @Override + public String toString() { + return "SparkApplicationSpec{" + + "type='" + type + '\'' + + ", sparkVersion='" + sparkVersion + '\'' + + ", mode='" + mode + '\'' + + ", proxyUser='" + proxyUser + '\'' + + ", image='" + image + '\'' + + ", imagePullPolicy='" + imagePullPolicy + '\'' + + ", imagePullSecrets=" + imagePullSecrets + + ", mainClass='" + mainClass + '\'' + + ", mainApplicationFile='" + mainApplicationFile + '\'' + + ", arguments=" + arguments + + ", sparkConf=" + sparkConf + + ", hadoopConf=" + hadoopConf + + ", sparkConfigMap='" + sparkConfigMap + '\'' + + ", hadoopConfigMap='" + hadoopConfigMap + '\'' + + ", volumes=" + volumes + + ", driver=" + driver + + ", executor=" + executor + + ", deps=" + deps + + ", restartPolicy=" + restartPolicy + + ", nodeSelector=" + nodeSelector + + ", failureRetries=" + failureRetries + + ", retryInterval=" + retryInterval + + ", pythonVersion='" + pythonVersion + '\'' + + ", memoryOverheadFactor='" + memoryOverheadFactor + '\'' + + ", monitoring=" + monitoring + + ", batchScheduler='" + batchScheduler + '\'' + + ", timeToLiveSeconds=" + timeToLiveSeconds + + ", batchSchedulerOptions=" + batchSchedulerOptions + + ", sparkUIOptions=" + sparkUIOptions + + ", dynamicAllocation=" + dynamicAllocation + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java new file mode 100644 index 0000000000..cd755242ba --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java @@ -0,0 +1,151 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import java.util.Map; + +@JsonDeserialize() +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkApplicationStatus implements KubernetesResource { + + + /** + * SparkApplicationID is set by the spark-distribution(via spark.app.id config) on the driver and + * executor pods + */ + String sparkApplicationId; + + + /** + * SubmissionID is a unique ID of the current submission of the application. + */ + String submissionID; + + + /** + * LastSubmissionAttemptTime is the time for the last application submission attempt. + */ + String lastSubmissionAttemptTime; + + /** + * CompletionTime is the time when the application runs to completion if it does. + */ + String terminationTime; + + + /** + * DriverInfo has information about the driver. + */ + DriverInfo driverInfo; + + /** + * AppState tells the overall application state. + */ + ApplicationState applicationState; + + /** + * records the state of executors by executor Pod names. + */ + Map<String, String> executorState; + + /** + * ExecutionAttempts is the total number of attempts to run a submitted application to completion. + * Incremented upon each attempted run of the application and reset upon invalidation. + */ + int executionAttempts; + + /** + * SubmissionAttempts is the total number of attempts to submit an application to run. Incremented + * upon each attempted submission of the application and reset upon invalidation and rerun. + */ + int submissionAttempts; + + @Override + public String toString() { + return "SparkApplicationStatus{" + + "sparkApplicationId='" + sparkApplicationId + '\'' + + ", submissionID='" + submissionID + '\'' + + ", lastSubmissionAttemptTime='" + lastSubmissionAttemptTime + '\'' + + ", terminationTime='" + terminationTime + '\'' + + ", driverInfo=" + driverInfo + + ", applicationState=" + applicationState + + ", executorState='" + executorState + '\'' + + ", executionAttempts=" + executionAttempts + + ", submissionAttempts=" + submissionAttempts + + '}'; + } + + public String getSparkApplicationId() { + return sparkApplicationId; + } + + public void setSparkApplicationId(String sparkApplicationId) { + this.sparkApplicationId = sparkApplicationId; + } + + public String getSubmissionID() { + return submissionID; + } + + public void setSubmissionID(String submissionID) { + this.submissionID = submissionID; + } + + public String getLastSubmissionAttemptTime() { + return lastSubmissionAttemptTime; + } + + public void setLastSubmissionAttemptTime(String lastSubmissionAttemptTime) { + this.lastSubmissionAttemptTime = lastSubmissionAttemptTime; + } + + public String getTerminationTime() { + return terminationTime; + } + + public void setTerminationTime(String terminationTime) { + this.terminationTime = terminationTime; + } + + public DriverInfo getDriverInfo() { + return driverInfo; + } + + public void setDriverInfo(DriverInfo driverInfo) { + this.driverInfo = driverInfo; + } + + public ApplicationState getApplicationState() { + return applicationState; + } + + public void setApplicationState( + ApplicationState applicationState) { + this.applicationState = applicationState; + } + + public Map<String, String> getExecutorState() { + return executorState; + } + + public void setExecutorState(Map<String, String> executorState) { + this.executorState = executorState; + } + + public int getExecutionAttempts() { + return executionAttempts; + } + + public void setExecutionAttempts(int executionAttempts) { + this.executionAttempts = executionAttempts; + } + + public int getSubmissionAttempts() { + return submissionAttempts; + } + + public void setSubmissionAttempts(int submissionAttempts) { + this.submissionAttempts = submissionAttempts; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java new file mode 100644 index 0000000000..f299f7be83 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java @@ -0,0 +1,94 @@ +package org.apache.dolphinscheduler.plugin.task.spark.crd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS; +import java.util.List; +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkUIConfiguration { + + /** + * (Optional) ServicePort allows configuring the port at service level that might be different + * from the targetPort. TargetPort should be the same as the one defined in spark.ui.port + */ + Integer servicePort; + + /** + * (Optional) ServicePortName allows configuring the name of the service port. This may be useful + * for sidecar proxies like Envoy injected by Istio which require specific ports names to treat + * traffic as proper HTTP. Defaults to spark-driver-ui-port. + */ + String servicePortName; + + /** + * (Optional) ServiceType allows configuring the type of the service. Defaults to ClusterIP. + */ + String serviceType; + + /** + * (Optional) ServiceAnnotations is a map of key,value pairs of annotations that might be added to + * the service object. + */ + Map<String, String> serviceAnnotations; + + /** + * (Optional) IngressAnnotations is a map of key,value pairs of annotations that might be added to + * the ingress object. i.e. specify nginx as ingress.class + */ + Map<String, String> ingressAnnotations; + + /** + * (Optional) TlsHosts is useful If we need to declare SSL certificates to the ingress object + */ + List<IngressTLS> ingressTLS; + + public Integer getServicePort() { + return servicePort; + } + + public void setServicePort(Integer servicePort) { + this.servicePort = servicePort; + } + + public String getServicePortName() { + return servicePortName; + } + + public void setServicePortName(String servicePortName) { + this.servicePortName = servicePortName; + } + + public String getServiceType() { + return serviceType; + } + + public void setServiceType(String serviceType) { + this.serviceType = serviceType; + } + + public Map<String, String> getServiceAnnotations() { + return serviceAnnotations; + } + + public void setServiceAnnotations(Map<String, String> serviceAnnotations) { + this.serviceAnnotations = serviceAnnotations; + } + + public Map<String, String> getIngressAnnotations() { + return ingressAnnotations; + } + + public void setIngressAnnotations(Map<String, String> ingressAnnotations) { + this.ingressAnnotations = ingressAnnotations; + } + + public List<IngressTLS> getIngressTLS() { + return ingressTLS; + } + + public void setIngressTLS( + List<IngressTLS> ingressTLS) { + this.ingressTLS = ingressTLS; + } +}