lenoxzhao commented on code in PR #3952:
URL:
https://github.com/apache/incubator-streampark/pull/3952#discussion_r1713649280
##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java:
##########
@@ -291,39 +285,43 @@ public void start(SparkApplication appParam, boolean
auto) throws Exception {
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isSparkSqlJob()) {
- FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
+ SparkSql sparkSql =
sparkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
- String realSql =
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
- flinkSql.setSql(DeflaterUtils.zipString(realSql));
- extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null),
flinkSql.getSql());
+ String realSql =
variableService.replaceVariable(application.getTeamId(), sparkSql.getSql());
+ sparkSql.setSql(DeflaterUtils.zipString(realSql));
+ extraParameter.put(ConfigKeys.KEY_SPARK_SQL(null),
sparkSql.getSql());
}
Tuple2<String, String> userJarAndAppConf =
getUserJarAndAppConf(sparkEnv, application);
- String flinkUserJar = userJarAndAppConf.f0;
+ String sparkUserJar = userJarAndAppConf.f0;
String appConf = userJarAndAppConf.f1;
BuildResult buildResult = buildPipeline.getBuildResult();
- if (SparkExecutionMode.YARN_CLUSTER ==
application.getSparkExecutionMode()
- || SparkExecutionMode.YARN_CLIENT ==
application.getSparkExecutionMode()) {
- buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
- application.setJobManagerUrl(YarnUtils.getRMWebAppURL(true));
+ if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
+ buildResult = new ShadedBuildResponse(null, sparkUserJar, true);
+ if (StringUtils.isNotBlank(application.getYarnQueueName())) {
+ extraParameter.put("yarnQueueName",
application.getYarnQueueName());
+ }
+ if (StringUtils.isNotBlank(application.getYarnQueueLabel())) {
+ extraParameter.put("yarnQueueLabel",
application.getYarnQueueLabel());
Review Comment:
Already extracted as constants in
`org.apache.streampark.common.conf.ConfigKeys`.
##########
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala:
##########
@@ -119,24 +119,40 @@ object YarnClient extends SparkClientTrait {
.setDeployMode(submitRequest.executionMode match {
case SparkExecutionMode.YARN_CLIENT => "client"
case SparkExecutionMode.YARN_CLUSTER => "cluster"
- case _ => throw new IllegalArgumentException("[StreamPark][Spark]
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
+ case _ =>
+ throw new IllegalArgumentException("[StreamPark][Spark][YarnClient]
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
})
}
private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher:
SparkLauncher): Unit = {
logger.info("[StreamPark][Spark][YarnClient] set spark configuration.")
- // 1) set spark conf
- submitRequest.properties.foreach(prop => {
+ // 1) put yarn queue
+ if (SparkExecutionMode.isYarnMode(submitRequest.executionMode)) {
+ setYarnQueue(submitRequest)
+ }
+
+ // 2) set spark conf
+ submitRequest.appProperties.foreach(prop => {
val k = prop._1
val v = prop._2
logInfo(s"| $k : $v")
sparkLauncher.setConf(k, v)
})
- // 2) appArgs...
+ // 3) set spark args
+ submitRequest.appArgs.foreach(sparkLauncher.addAppArgs(_))
if (submitRequest.hasExtra("sql")) {
sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString)
}
}
+ protected def setYarnQueue(submitRequest: SubmitRequest): Unit = {
+ if (submitRequest.hasExtra("yarnQueueName")) {
+ submitRequest.appProperties.put("spark.yarn.queue",
submitRequest.getExtra("yarnQueueName").asInstanceOf[String])
+ }
+ if (submitRequest.hasExtra("yarnQueueLabel")) {
+ submitRequest.appProperties.put("spark.yarn.am.nodeLabelExpression",
submitRequest.getExtra("yarnQueueLabel").asInstanceOf[String])
+
submitRequest.appProperties.put("spark.yarn.executor.nodeLabelExpression",
submitRequest.getExtra("yarnQueueLabel").asInstanceOf[String])
Review Comment:
Already extracted as constants in
`org.apache.streampark.common.conf.ConfigKeys`.
##########
streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql:
##########
@@ -491,102 +491,173 @@ create table if not exists `t_yarn_queue` (
primary key (`id`)
);
+-- ----------------------------
+-- Table structure for t_flink_catalog
+-- ----------------------------
+create table if not exists t_flink_catalog (
+ `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
+ `team_id` bigint not null,
+ `user_id` bigint default null,
+ `catalog_type` varchar(255) not NULL,
+ `catalog_name` VARCHAR(255) NOT NULL,
+ `configuration` text,
+ `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+ `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+ CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
+);
+
-- ----------------------------
-- Table structure for t_spark_env
-- ----------------------------
create table if not exists `t_spark_env` (
- `id` bigint generated by default
as identity not null,
- `spark_name` varchar(128) not
null comment 'spark instance name',
- `spark_home` varchar(255) not null comment 'spark home path',
- `version` varchar(64) not null comment 'spark version',
- `scala_version` varchar(64) not null comment 'scala version of spark',
- `spark_conf` text not null comment 'spark-conf',
- `is_default` tinyint not null default 0 comment 'whether default version
or not',
- `description` varchar(255) default null comment 'description',
- `create_time` datetime not null default current_timestamp comment 'create
time',
- primary key(`id`),
- unique (`spark_name`)
- );
+ `id` bigint generated by default as identity not null,
+ `spark_name` varchar(128) not null comment 'spark instance name',
+ `spark_home` varchar(255) not null comment 'spark home path',
+ `version` varchar(64) not null comment 'spark version',
+ `scala_version` varchar(64) not null comment 'scala version of spark',
+ `spark_conf` text not null comment 'spark-conf',
+ `is_default` tinyint not null default 0 comment 'whether default version or
not',
+ `description` varchar(255) default null comment 'description',
+ `create_time` datetime not null default current_timestamp comment 'create
time',
+ primary key(`id`),
+ unique (`spark_name`)
+);
+
-- ----------------------------
-- Table structure for t_spark_app
-- ----------------------------
create table if not exists `t_spark_app` (
- `id` bigint generated by default
as identity not null,
- `team_id` bigint not null,
- `job_type` tinyint default null,
- `execution_mode` tinyint default
null,
- `resource_from` tinyint default
null,
- `project_id` bigint default null,
- `job_name` varchar(255) default
null,
- `module` varchar(255) default null,
- `jar` varchar(255) default null,
- `jar_check_sum` bigint default null,
- `main_class` varchar(255) default null,
- `args` text,
- `options` text,
- `hot_params` text ,
- `user_id` bigint default null,
- `app_id` varchar(64) default null,
- `app_type` tinyint default null,
- `duration` bigint default null,
- `job_id` varchar(64) default null,
- `job_manager_url` varchar(255) default null,
- `version_id` bigint default null,
- `cluster_id` varchar(45) default null,
- `k8s_name` varchar(63) default null,
- `k8s_namespace` varchar(63) default null,
- `spark_image` varchar(128) default null,
- `state` int default null,
- `restart_size` int default null,
- `restart_count` int default null,
- `cp_threshold` int default null,
- `cp_max_failure_interval` int default null,
- `cp_failure_rate_interval` int default null,
- `cp_failure_action` tinyint default null,
- `dynamic_properties` text ,
- `description` varchar(255) default null,
- `resolve_order` tinyint default null,
- `k8s_rest_exposed_type` tinyint default null,
- `jm_memory` int default null,
- `tm_memory` int default null,
- `total_task` int default null,
- `total_tm` int default null,
- `total_slot` int default null,
- `available_slot` int default null,
- `option_state` tinyint default null,
- `tracking` tinyint default null,
- `create_time` datetime not null default current_timestamp comment 'create
time',
- `modify_time` datetime not null default current_timestamp comment 'modify
time',
- `option_time` datetime default null,
- `release` tinyint default 1,
- `build` tinyint default 1,
- `start_time` datetime default null,
- `end_time` datetime default null,
- `alert_id` bigint default null,
- `k8s_pod_template` text ,
- `k8s_jm_pod_template` text ,
- `k8s_tm_pod_template` text ,
- `k8s_hadoop_integration` tinyint default 0,
- `spark_cluster_id` bigint default null,
- `ingress_template` text ,
- `default_mode_ingress` text ,
- `tags` varchar(500) default null,
- `hadoop_user` varchar(500) default null,
- primary key(`id`)
- );
-
- -- ----------------------------
- -- Table structure for t_flink_app
- -- ----------------------------
- create table if not exists t_flink_catalog (
- `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
- `team_id` bigint not null,
- `user_id` bigint default null,
- `catalog_type` varchar(255) not NULL,
- `catalog_name` VARCHAR(255) NOT NULL,
- `configuration` text,
- `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
- `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
- CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
- );
+ `id` bigint generated by default as identity not null,
+ `team_id` bigint not null,
+ `job_type` tinyint default null comment '(1)custom code(2)spark SQL',
+ `app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark',
+ `version_id` bigint default null comment 'spark version',
+ `app_name` varchar(255) default null comment 'spark.app.name',
+ `execution_mode` tinyint default null comment
'spark.submit.deployMode(1)cluster(2)client',
+ `resource_from` tinyint default null,
+ `project_id` bigint default null,
+ `module` varchar(255) default null,
+ `main_class` varchar(255) default null comment 'The entry point for your
application (e.g. org.apache.spark.examples.SparkPi)',
+ `jar` varchar(255) default null,
+ `jar_check_sum` bigint default null,
+ `app_properties` text comment 'Arbitrary Spark configuration property in
key=value format (e.g. spark.driver.cores=1)',
+ `app_args` text comment 'Arguments passed to the main method of your main
class',
+ `app_id` varchar(64) default null comment '(1)application_id on
yarn(2)driver_pod_name on k8s',
+ `yarn_queue` varchar(128) default null,
+ `k8s_master_url` varchar(128) default null,
+ `k8s_container_image` varchar(128) default null,
+ `k8s_image_pull_policy` tinyint default 1,
+ `k8s_service_account` varchar(64) default null,
+ `k8s_namespace` varchar(64) default null,
+ `hadoop_user` varchar(64) default null,
+ `restart_size` int default null,
+ `restart_count` int default null,
+ `state` int default null,
+ `options` text,
+ `option_state` tinyint default null,
+ `option_time` datetime default null,
+ `user_id` bigint default null,
+ `description` varchar(255) default null,
+ `tracking` tinyint default null,
+ `release` tinyint default 1,
+ `build` tinyint default 1,
+ `alert_id` bigint default null,
+ `create_time` datetime default null,
+ `modify_time` datetime default null,
+ `start_time` datetime default null,
+ `end_time` datetime default null,
+ `duration` bigint default null,
+ `tags` varchar(500) default null,
+ `driver_cores` varchar(64) default null,
+ `driver_memory` varchar(64) default null,
+ `executor_cores` varchar(64) default null,
+ `executor_memory` varchar(64) default null,
+ `executor_max_nums` varchar(64) default null,
+ `num_tasks` bigint default null,
+ `num_completed_tasks` bigint default null,
+ `num_stages` bigint default null,
+ `num_completed_stages` bigint default null,
+ `used_memory` bigint default null,
+ `used_v_cores` bigint default null,
+ primary key(`id`)
+);
+
+
+-- ----------------------------
+-- Table structure for t_spark_log
+-- ----------------------------
+create table if not exists `t_spark_log` (
+ `id` bigint generated by default as identity not null,
+ `app_id` bigint default null,
+ `spark_app_id` varchar(64) default null,
+ `track_url` varchar(255) default null,
+ `success` tinyint default null,
+ `exception` text ,
+ `option_time` datetime default null,
+ `option_name` tinyint default null,
+ `user_id` bigint default null comment 'operator user id',
+ primary key(`id`)
+);
+
+
+-- ----------------------------
+-- Table structure for t_spark_effective
+-- ----------------------------
+create table if not exists `t_spark_effective` (
+ `id` bigint generated by default as identity not null,
+ `app_id` bigint not null,
+ `target_type` tinyint not null comment '1) config 2) flink sql',
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]