This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 6c9316942 [WIP][Improve] custom-code job support dependencies (#3291)
6c9316942 is described below
commit 6c9316942682c7b56a4e15aef01548df207d6904
Author: benjobs <[email protected]>
AuthorDate: Thu Oct 26 18:45:57 2023 +0800
[WIP][Improve] custom-code job support dependencies (#3291)
* [Improve] custom-code job support dependencies
* [Improve] build fatjar improvement
* [improve] extractArguments improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/util/PropertiesUtils.scala | 43 +++++++++++++-
.../common/util/PropertiesUtilsTestCase.scala | 66 ++--------------------
.../main/assembly/script/upgrade/mysql/2.1.1.sql | 26 +++++++++
.../main/assembly/script/upgrade/pgsql/2.1.1.sql | 19 +++++++
.../console/core/entity/Application.java | 19 ++-----
.../core/service/impl/AppBuildPipeServiceImpl.java | 36 +++++-------
.../core/service/impl/ApplicationServiceImpl.java | 33 ++++++++++-
.../src/main/resources/db/data-h2.sql | 2 +-
.../src/main/resources/db/schema-h2.sql | 1 +
.../resources/mapper/core/ApplicationMapper.xml | 1 +
.../Application/src/AppDarkModeToggle.vue | 4 +-
.../src/components/ContextMenu/src/ContextMenu.vue | 4 +-
.../src/components/Form/src/BasicForm.vue | 2 +-
.../src/components/Page/src/PageFooter.vue | 4 +-
.../src/hooks/web/useLockPage.ts | 9 +--
.../streampark-console-webapp/src/utils/props.ts | 2 +-
.../src/views/flink/app/Add.vue | 25 ++++----
.../src/views/flink/app/EditFlink.vue | 30 +++++++++-
.../src/views/flink/app/EditStreamPark.vue | 2 +-
.../flink/app/hooks/useCreateAndEditSchema.ts | 16 ++++--
.../src/views/flink/app/hooks/useCreateSchema.ts | 12 +---
.../views/flink/app/hooks/useEditFlinkSchema.ts | 8 +++
.../src/views/flink/app/hooks/useFlinkRender.tsx | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 59 +------------------
.../streampark/flink/packer/maven/MavenTool.scala | 43 +++++++++-----
.../flink/packer/pipeline/BuildRequest.scala | 1 -
.../pipeline/impl/FlinkRemoteBuildPipeline.scala | 35 +++++-------
27 files changed, 265 insertions(+), 239 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 544314801..2272cc5cf 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -30,7 +30,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
object PropertiesUtils extends Logger {
@@ -305,6 +305,47 @@ object PropertiesUtils extends Logger {
}
}
+ @Nonnull def extractArguments(args: String): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ if (StringUtils.isNotEmpty(args)) {
+ val array = args.split("\\s+")
+ val iter = array.iterator
+ def join(s: String, v: String): Unit = {
+ if (v.startsWith(s)) {
+ if (v.endsWith(s)) {
+ programArgs += v.replaceAll(s"^$s|$s$$", "")
+ } else {
+ var value = v
+ while (!value.endsWith(s) && iter.hasNext) {
+ value += s" ${iter.next()}"
+ }
+ programArgs += value.replaceAll(s"^$s|$s$$", "")
+ }
+ }
+ }
+
+ while (iter.hasNext) {
+ val v = iter.next()
+ if (v.startsWith("'")) {
+ if (v.endsWith("'")) {
+ programArgs += v.replaceAll("^'|'$", "")
+ } else {
+ join("'", v)
+ }
+ } else if (v.startsWith("\"")) {
+ if (v.endsWith("\"")) {
+ programArgs += v.replaceAll("^\"|\"$", "")
+ } else {
+ join("\"", v)
+ }
+ } else {
+ programArgs += v
+ }
+ }
+ }
+ programArgs.toList
+ }
+
@Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index e04e0e3be..995cfd2ec 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -16,75 +16,19 @@
*/
package org.apache.streampark.common.util
-import org.apache.commons.lang3.StringUtils
import org.junit.jupiter.api.{Assertions, Test}
import scala.collection.mutable.ArrayBuffer
+import scala.language.postfixOps
class PropertiesUtilsTestCase {
@Test def testExtractProgramArgs(): Unit = {
- val argsStr = "--host localhost:8123\n\n\n" +
- "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
- "--c d\r\n" +
- "--x yyy"
+ val args =
+ "mysql-sync-database \n--database employees \n--mysql-conf
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf
sink.label-prefix=label\n--table-conf replication_num=1 "
val programArgs = new ArrayBuffer[String]()
- if (StringUtils.isNotEmpty(argsStr)) {
- val multiLineChar = "\"\"\""
- val array = argsStr.split("\\s+")
- if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
- array.foreach(programArgs +=)
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- def processElement(index: Int, multiLine: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index)
-
- if (elem.trim.nonEmpty) {
- if (!multiLine) {
- if (elem.startsWith(multiLineChar)) {
- tempBuffer += elem.drop(3)
- processElement(next, true)
- } else {
- argsArray += elem
- processElement(next, false)
- }
- } else {
- if (elem.endsWith(multiLineChar)) {
- tempBuffer += elem.dropRight(3)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, false)
- } else {
- tempBuffer += elem
- processElement(next, multiLine)
- }
- }
- } else {
- tempBuffer += elem
- processElement(next, false)
- }
- }
-
- processElement(0, false)
- argsArray.foreach(x => programArgs += x.trim)
- }
- }
-
- Assertions.assertEquals("localhost:8123", programArgs(1))
- Assertions.assertEquals("insert into table_a select * from table_b",
programArgs(3))
- Assertions.assertEquals("d", programArgs(5))
- Assertions.assertEquals("yyy", programArgs(7))
+ programArgs ++= PropertiesUtils.extractArguments(args)
+ println(programArgs)
}
@Test def testDynamicProperties(): Unit = {
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
new file mode 100644
index 000000000..2205a8fbb
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+ add column `dependency` text collate utf8mb4_general_ci default null after
`main_class`;
+
+set foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
new file mode 100644
index 000000000..a87816bf4
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
@@ -0,0 +1,19 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+alter table "public"."t_flink_app"
+ add column "dependency" text collate "pg_catalog"."default";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 81f4ccc4c..568f7f2a5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -160,6 +160,9 @@ public class Application implements Serializable {
private String mainClass;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
+ private String dependency;
+
private Date startTime;
@TableField(updateStrategy = FieldStrategy.IGNORED)
@@ -220,7 +223,6 @@ public class Application implements Serializable {
/** running job */
private transient JobsOverview.Task overview;
- private transient String dependency;
private transient Long sqlId;
private transient String flinkSql;
@@ -390,15 +392,6 @@ public class Application implements Serializable {
&& this.cpFailureAction != null;
}
- public boolean eqFlinkJob(Application other) {
- if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
- if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
- return this.getDependencyObject().eq(other.getDependencyObject());
- }
- }
- return false;
- }
-
/** Local compilation and packaging working directory */
@JsonIgnore
public String getDistHome() {
@@ -752,11 +745,11 @@ public class Application implements Serializable {
@Override
public String toString() {
- return groupId + ":" + artifactId + ":" + version + getClassifier(":");
+ return groupId + ":" + artifactId + ":" + version + getClassifier();
}
- private String getClassifier(String joiner) {
- return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
+ private String getClassifier() {
+ return StringUtils.isEmpty(classifier) ? "" : ":" + classifier;
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 6f8890bd2..f6fb7e641 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -184,32 +184,24 @@ public class AppBuildPipeServiceImpl
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
- String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
- fsOperator.delete(appHome);
- if (app.isUploadJob()) {
+ if (app.isCICDJob()) {
+ String appHome = app.getAppHome();
+ fsOperator.mkCleanDirs(appHome);
+ fsOperator.upload(app.getDistHome(), appHome);
+ } else {
File localJar = new File(WebUtils.getAppTempDir(),
app.getJar());
// upload jar copy to appHome
String uploadJar = appUploads.concat("/").concat(app.getJar());
checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar,
appUploads);
- switch (app.getApplicationType()) {
- case STREAMPARK_FLINK:
- fsOperator.mkdirs(app.getAppLib());
- fsOperator.copy(uploadJar, app.getAppLib(), false, true);
- break;
- case APACHE_FLINK:
- fsOperator.mkdirs(appHome);
- fsOperator.copy(uploadJar, appHome, false, true);
- break;
- default:
- throw new IllegalArgumentException(
- "[StreamPark] unsupported ApplicationType of custom
code: "
- + app.getApplicationType());
+ if (app.getApplicationType() ==
ApplicationType.STREAMPARK_FLINK) {
+ fsOperator.mkdirs(app.getAppLib());
+ fsOperator.copy(uploadJar, app.getAppLib(), false, true);
}
- } else {
- fsOperator.upload(app.getDistHome(), appHome);
}
- } else {
+ }
+
+ if (app.isFlinkSqlJob() || app.isUploadJob()) {
if (!app.getDependencyObject().getJar().isEmpty()) {
String localUploads = Workspace.local().APP_UPLOADS();
// copy jar to local upload dir
@@ -335,7 +327,8 @@ public class AppBuildPipeServiceImpl
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
ExecutionMode executionMode = app.getExecutionModeEnum();
- String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
+ String mainClass =
+ app.isCustomCodeJob() ? app.getMainClass() :
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
switch (executionMode) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
@@ -364,7 +357,6 @@ public class AppBuildPipeServiceImpl
app.getLocalAppHome(),
mainClass,
flinkUserJar,
- app.isCustomCodeJob(),
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
@@ -424,7 +416,7 @@ public class AppBuildPipeServiceImpl
case STREAMPARK_FLINK:
return String.format("%s/%s", app.getAppLib(),
app.getModule().concat(".jar"));
case APACHE_FLINK:
- return String.format("%s/%s", app.getAppHome(), app.getJar());
+ return String.format("%s/%s", WebUtils.getAppTempDir(),
app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2242b897e..e23c12564 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -703,6 +703,17 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
public boolean create(Application appParam) {
ApiAlertException.throwIfNull(
appParam.getTeamId(), "The teamId can't be null. Create application
failed.");
+
+ if (appParam.isFlinkSqlJob()) {
+ appParam.setBuild(true);
+ } else {
+ if (appParam.isUploadJob()) {
+ appParam.setBuild(!appParam.getDependencyObject().isEmpty());
+ } else {
+ appParam.setBuild(false);
+ }
+ }
+
appParam.setUserId(commonService.getUserId());
appParam.setState(FlinkAppState.ADDED.getValue());
appParam.setRelease(ReleaseState.NEED_RELEASE.get());
@@ -736,6 +747,17 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
+ @Override
+ public boolean save(Application entity) {
+ String dependency = entity.getDependency();
+ if (entity.isFlinkSqlJob()) {
+ entity.setDependency(null);
+ }
+ boolean flag = super.save(entity);
+ entity.setDependency(dependency);
+ return flag;
+ }
+
private boolean existsByJobName(String jobName) {
return this.baseMapper.existsByJobName(jobName);
}
@@ -836,8 +858,17 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(),
appParam.getTeamId()));
application.setRelease(ReleaseState.NEED_RELEASE.get());
+
if (application.isUploadJob()) {
- if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+ Application.Dependency thisDependency =
+ Application.Dependency.toDependency(appParam.getDependency());
+ Application.Dependency targetDependency =
+ Application.Dependency.toDependency(application.getDependency());
+
+ if (!thisDependency.eq(targetDependency)) {
+ application.setDependency(appParam.getDependency());
+ application.setBuild(true);
+ } else if (!ObjectUtils.safeEquals(application.getJar(),
appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 77494579a..7a450f129 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team',
now(), now());
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null,
null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null,
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3f7025a83..d06f907e0 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -46,6 +46,7 @@ create table if not exists `t_flink_app` (
`jar` varchar(255) default null,
`jar_check_sum` bigint default null,
`main_class` varchar(255) default null,
+ `dependency` text ,
`args` text,
`options` text,
`hot_params` text ,
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index a6c97121c..9096158b5 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -40,6 +40,7 @@
<result column="tracking" jdbcType="INTEGER" property="tracking"/>
<result column="jar" jdbcType="VARCHAR" property="jar"/>
<result column="jar_check_sum" jdbcType="VARCHAR"
property="jarCheckSum"/>
+ <result column="dependency" jdbcType="LONGVARCHAR"
property="dependency"/>
<result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
<result column="job_id" jdbcType="VARCHAR" property="jobId"/>
<result column="job_manager_url" jdbcType="VARCHAR"
property="jobManagerUrl"/>
diff --git
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index abf6c47ea..40b5c4964 100644
---
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -79,9 +79,7 @@
height: 18px;
background-color: #fff;
border-radius: 50%;
- transition:
- transform 0.5s,
- background-color 0.5s;
+ transition: transform 0.5s, background-color 0.5s;
will-change: transform;
}
diff --git
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 61cc99b7f..122b6e711 100644
---
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -195,9 +195,7 @@
background-color: @component-background;
border: 1px solid rgb(0 0 0 / 8%);
border-radius: 0.25rem;
- box-shadow:
- 0 2px 2px 0 rgb(0 0 0 / 14%),
- 0 3px 1px -2px rgb(0 0 0 / 10%),
+ box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
0 1px 5px 0 rgb(0 0 0 / 6%);
background-clip: padding-box;
user-select: none;
diff --git
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index ab4c10638..0169b4c86 100644
---
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -129,7 +129,7 @@
});
const getBindValue = computed(
- () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+ () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
);
const getSchema = computed((): FormSchema[] => {
diff --git
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index c499ccf65..153302646 100644
---
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -55,9 +55,7 @@
line-height: 44px;
background-color: @component-background;
border-top: 1px solid @border-color-base;
- box-shadow:
- 0 -6px 16px -8px rgb(0 0 0 / 8%),
- 0 -9px 28px 0 rgb(0 0 0 / 5%),
+ box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 /
5%),
0 -12px 48px 16px rgb(0 0 0 / 3%);
transition: width 0.2s;
diff --git
a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 47aa6eb5f..de0380811 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -48,12 +48,9 @@ export function useLockPage() {
}
clear();
- timeId = setTimeout(
- () => {
- lockPage();
- },
- lockTime * 60 * 1000,
- );
+ timeId = setTimeout(() => {
+ lockPage();
+ }, lockTime * 60 * 1000);
}
function lockPage(): void {
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts
b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 5828c8cf7..92fe2810b 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -191,7 +191,7 @@ export const buildProps = <
: never;
};
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as
PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as
PropWrapper<T>);
export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as
Array<keyof T>;
export const mutable = <T extends readonly any[] | Record<string,
unknown>>(val: T) =>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 541503f69..071231e0c 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -185,7 +185,7 @@
// common params...
const resourceFrom = values.resourceFrom;
if (resourceFrom) {
- if (resourceFrom === 'csv') {
+ if (resourceFrom === 'cvs') {
params['resourceFrom'] = ResourceFromEnum.CICD;
//streampark flink
if (values.appType == AppTypeEnum.STREAMPARK_FLINK) {
@@ -210,15 +210,15 @@
appType: AppTypeEnum.APACHE_FLINK,
jar: unref(uploadJar),
mainClass: values.mainClass,
+ dependency: await getDependency(),
});
handleCreateApp(params);
}
}
}
- /* flink sql mode */
- async function handleSubmitSQL(values: Recordable) {
+ async function getDependency() {
// Trigger a pom confirmation operation.
- await unref(dependencyRef)?.handleApplyPom();
+ unref(dependencyRef)?.handleApplyPom();
// common params...
const dependency: { pom?: string; jar?: string } = {};
const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
@@ -233,14 +233,18 @@
jar: unref(uploadJars),
});
}
-
+ return dependency.pom === undefined && dependency.jar === undefined
+ ? null
+ : JSON.stringify(dependency);
+ }
+ /* flink sql mode */
+ async function handleSubmitSQL(values: Recordable) {
let config = values.configOverride;
- if (config != null && config !== undefined && config.trim() != '') {
+ if (config != null && config.trim() != '') {
config = encryptByBase64(config);
} else {
config = null;
}
-
handleCluster(values);
const params = {
jobType: JobTypeEnum.SQL,
@@ -248,10 +252,7 @@
appType: AppTypeEnum.STREAMPARK_FLINK,
config,
format: values.isSetConfig ? 1 : null,
- dependency:
- dependency.pom === undefined && dependency.jar === undefined
- ? null
- : JSON.stringify(dependency),
+ dependency: await getDependency(),
};
handleSubmitParams(params, values, k8sTemplate);
handleCreateApp(params);
@@ -285,7 +286,7 @@
const param = {};
for (const k in params) {
const v = params[k];
- if (v != null && v !== undefined) {
+ if (v != null) {
param[k] = v;
}
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 021354020..e75e0d544 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -39,6 +39,7 @@
import VariableReview from './components/VariableReview.vue';
import { useDrawer } from '/@/components/Drawer';
import { ExecModeEnum, ResourceFromEnum } from '/@/enums/flinkEnum';
+ import Dependency from '/@/views/flink/app/components/Dependency.vue';
const route = useRoute();
const { t } = useI18n();
@@ -52,6 +53,7 @@
const uploadJar = ref('');
const programArgRef = ref();
const podTemplateRef = ref();
+ const dependencyRef = ref();
const k8sTemplate = reactive({
podTemplate: '',
@@ -116,6 +118,7 @@
setFieldsValue(defaultParams);
app.args && programArgRef.value?.setContent(app.args);
setTimeout(() => {
+ unref(dependencyRef)?.setDefaultValue(JSON.parse(app.dependency ||
'{}'));
unref(podTemplateRef)?.handleChoicePodTemplate('ptVisual',
app.k8sPodTemplate);
unref(podTemplateRef)?.handleChoicePodTemplate('jmPtVisual',
app.k8sJmPodTemplate);
unref(podTemplateRef)?.handleChoicePodTemplate('tmPtVisual',
app.k8sTmPodTemplate);
@@ -142,15 +145,34 @@
/* Handling update parameters */
function handleAppUpdate(values: Recordable) {
+ // Trigger a pom confirmation operation.
+ unref(dependencyRef)?.handleApplyPom();
+ // common params...
+ const dependency: { pom?: string; jar?: string } = {};
+ const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
+ const uploadJars = unref(dependencyRef)?.uploadJars;
+ if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
+ Object.assign(dependency, {
+ pom: unref(dependencyRecords),
+ });
+ }
+ if (uploadJars && unref(uploadJars).length > 0) {
+ Object.assign(dependency, {
+ jar: unref(uploadJars),
+ });
+ }
submitLoading.value = true;
try {
const params = {
id: app.id,
jar: values.jar,
mainClass: values.mainClass,
+ dependency:
+ dependency.pom === undefined && dependency.jar === undefined
+ ? null
+ : JSON.stringify(dependency),
};
handleSubmitParams(params, values, k8sTemplate);
-
handleUpdateApp(params);
} catch (error) {
submitLoading.value = false;
@@ -212,13 +234,17 @@
<template #args="{ model }">
<ProgramArgs
ref="programArgRef"
- v-if="model.args != null && model.args != undefined"
+ v-if="model.args != null"
v-model:value="model.args"
:suggestions="suggestions"
@preview="(value) => openReviewDrawer(true, { value, suggestions })"
/>
</template>
+ <template #dependency="{ model, field }">
+ <Dependency ref="dependencyRef" v-model:value="model[field]"
:form-model="model" />
+ </template>
+
<template #formFooter>
<div class="flex items-center w-full justify-center">
<a-button @click="go('/flink/app')">
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 0d346aa10..9e307ea24 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -360,7 +360,7 @@
<template #args="{ model }">
<ProgramArgs
ref="programArgRef"
- v-if="model.args != null && model.args != undefined"
+ v-if="model.args != null"
v-model:value="model.args"
:suggestions="suggestions"
@preview="(value) => openReviewDrawer(true, { value, suggestions })"
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index a5cad757c..752ad30e0 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -49,7 +49,13 @@ import { fetchFlinkEnv } from
'/@/api/flink/setting/flinkEnv';
import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
import { AlertSetting } from '/@/api/flink/setting/types/alert.type';
import { FlinkCluster } from '/@/api/flink/setting/types/flinkCluster.type';
-import { AppTypeEnum, ClusterStateEnum, ExecModeEnum, JobTypeEnum } from
'/@/enums/flinkEnum';
+import {
+ AppTypeEnum,
+ ClusterStateEnum,
+ ExecModeEnum,
+ JobTypeEnum,
+ ResourceFromEnum,
+} from '/@/enums/flinkEnum';
import { isK8sExecMode } from '../utils';
import { useI18n } from '/@/hooks/web/useI18n';
import { fetchCheckHadoop } from '/@/api/flink/setting';
@@ -79,7 +85,7 @@ export const useCreateAndEditSchema = (
const [registerConfDrawer, { openDrawer: openConfDrawer }] = useDrawer();
- /*
+ /*
!The original item is also unassigned
*/
function getConfigSchemas() {
@@ -126,9 +132,11 @@ export const useCreateAndEditSchema = (
slot: 'dependency',
ifShow: ({ values }) => {
if (edit?.appId) {
- return values.jobType == JobTypeEnum.SQL;
+ return values.jobType == JobTypeEnum.SQL
+ ? true
+ : values.resourceFrom == ResourceFromEnum.UPLOAD;
} else {
- return values?.jobType == 'sql';
+ return values?.jobType == 'sql' ? true : values?.resourceFrom !=
'cvs';
}
},
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 72b11762f..4ef2557b3 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -63,14 +63,6 @@ export const useCreateSchema = (dependencyRef: Ref) => {
suggestions,
} = useCreateAndEditSchema(dependencyRef);
- // async function handleEditConfig(config: string) {
- // console.log('config', config);
- // const res = await fetchAppConf({ config });
- // const conf = decodeByBase64(res);
- // openConfDrawer(true, {
- // configOverride: conf,
- // });
- // }
function handleCheckConfig(_rule: RuleObject, value: StoreValue) {
if (value) {
const confType = getAppConfType(value);
@@ -97,7 +89,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
if (value === 'sql') {
formModel.tableEnv = 1;
} else {
- formModel.resourceFrom = 'csv';
+ formModel.resourceFrom = 'cvs';
}
},
};
@@ -109,7 +101,6 @@ export const useCreateSchema = (dependencyRef: Ref) => {
},
...getExecutionModeSchema.value,
...getFlinkClusterSchemas.value,
- ...getFlinkSqlSchema.value,
{
field: 'resourceFrom',
label: t('flink.app.resourceFrom'),
@@ -133,6 +124,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
ifShow: ({ values }) => values?.jobType !== 'sql' &&
values?.resourceFrom == 'upload',
rules: [{ required: true, message:
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
},
+ ...getFlinkSqlSchema.value,
{
field: 'project',
label: t('flink.app.project'),
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
index aef6f6f81..c7e072ad8 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
@@ -23,6 +23,8 @@ import { Alert } from 'ant-design-vue';
import { useRoute } from 'vue-router';
import { fetchMain } from '/@/api/flink/app/app';
import { ResourceFromEnum } from '/@/enums/flinkEnum';
+import { useI18n } from '/@/hooks/web/useI18n';
+const { t } = useI18n();
export const useEditFlinkSchema = (jars: Ref) => {
const flinkSql = ref();
@@ -115,6 +117,12 @@ export const useEditFlinkSchema = (jars: Ref) => {
},
rules: [{ required: true, message: 'Program Main is required' }],
},
+ {
+ field: 'dependency',
+ label: t('flink.app.dependency'),
+ component: 'Input',
+ slot: 'dependency',
+ },
...getFlinkFormOtherSchemas.value,
];
});
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 4a103f81b..ba00e667a 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -514,7 +514,7 @@ export const renderResourceFrom = (model: Recordable) => {
value={model.resourceFrom}
placeholder="Please select resource from"
>
- <Select.Option value="csv">
+ <Select.Option value="cvs">
<SvgIcon name="github" />
<span class="pl-10px">CICD</span>
<span class="gray">(build from CVS)</span>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index aa91aed32..e7da5525e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
-import org.apache.streampark.common.util.{DeflaterUtils, Logger}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger,
PropertiesUtils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -42,7 +42,6 @@ import org.apache.flink.util.Preconditions.checkNotNull
import java.io.File
import java.util.{Collections, List => JavaList, Map => JavaMap}
-import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -399,63 +398,9 @@ trait FlinkClientTrait extends Logger {
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
- val args = submitRequest.args
-
- if (StringUtils.isNotEmpty(args)) {
- val multiChar = "\""
- val array = args.split("\\s+")
- if (!array.exists(_.startsWith(multiChar))) {
- array.foreach(programArgs +=)
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- @tailrec
- def processElement(index: Int, multi: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index).trim
-
- if (elem.isEmpty) {
- processElement(next, multi = false)
- } else {
- if (multi) {
- if (elem.endsWith(multiChar)) {
- tempBuffer += elem.dropRight(1)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, multi = false)
- } else {
- tempBuffer += elem
- processElement(next, multi)
- }
- } else {
- val until = if (elem.endsWith(multiChar)) 1 else 0
- if (elem.startsWith(multiChar)) {
- tempBuffer += elem.drop(1).dropRight(until)
- processElement(next, multi = true)
- } else {
- argsArray += elem.dropRight(until)
- processElement(next, multi = false)
- }
- }
- }
- }
-
- processElement(0, multi = false)
- argsArray.foreach(x => programArgs += x)
- }
- }
+ programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-
programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
programArgs += PARAM_KEY_APP_NAME +=
DeflaterUtils.zipString(submitRequest.effectiveAppName)
programArgs += PARAM_KEY_FLINK_PARALLELISM +=
getParallelism(submitRequest).toString
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 9c95748dd..5dab91892 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -46,7 +46,7 @@ import java.util
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
object MavenTool extends Logger {
@@ -76,9 +76,6 @@ object MavenTool extends Logger {
List(remoteRepository)
}
- private val isJarFile = (file: File) =>
- file.isFile && Try(Utils.checkJarFile(file.toURI.toURL)).isSuccess
-
/**
* Build a fat-jar with custom jar libraries.
*
@@ -102,16 +99,34 @@ object MavenTool extends Logger {
uberJar.delete()
// resolve all jarLibs
val jarSet = new util.HashSet[File]
- jarLibs
- .map(lib => new File(lib))
- .filter(_.exists)
- .foreach {
- case libFile if isJarFile(libFile) => jarSet.add(libFile)
- case libFile if libFile.isDirectory =>
- libFile.listFiles.filter(isJarFile).foreach(jarSet.add)
- case _ =>
- }
- logInfo(s"start shaded fat-jar: ${jarLibs.mkString(",")}")
+ jarLibs.foreach {
+ x =>
+ new File(x) match {
+ case jarFile if jarFile.exists() =>
+ if (jarFile.isFile) {
+ Try(Utils.checkJarFile(jarFile.toURI.toURL)) match {
+ case Success(_) => jarSet.add(jarFile)
+ case Failure(e) => logWarn(s"buildFatJar: error,
${e.getMessage}")
+ }
+ } else {
+ jarFile.listFiles.foreach(
+ jar => {
+ if (jar.isFile) {
+ Try(Utils.checkJarFile(jar.toURI.toURL)) match {
+ case Success(_) => jarSet.add(jar)
+ case Failure(e) =>
+ logWarn(
+ s"buildFatJar: directory
[${jarFile.getAbsolutePath}], error: ${e.getMessage}")
+ }
+ }
+ })
+ }
+ case _ =>
+ }
+ }
+
+ logInfo(s"start shaded fat-jar: ${jarSet.mkString(",")}")
+
// shade jars
val shadeRequest = {
val req = new ShadeRequest
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 41e6e4f46..62f90935a 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -107,7 +107,6 @@ case class FlinkRemotePerJobBuildRequest(
workspace: String,
mainClass: String,
customFlinkUserJar: String,
- skipBuild: Boolean,
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index d6889caa2..86c523fdd 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -31,28 +31,21 @@ class FlinkRemoteBuildPipeline(request:
FlinkRemotePerJobBuildRequest) extends B
/** The construction logic needs to be implemented by subclasses */
@throws[Throwable]
override protected def buildProcess(): ShadedBuildResponse = {
- // create workspace.
- // the sub workspace path like: APP_WORKSPACE/jobName
- if (request.skipBuild) {
- ShadedBuildResponse(request.workspace, request.customFlinkUserJar)
- } else {
- execStep(1) {
- LfsOperator.mkCleanDirs(request.workspace)
- logInfo(s"recreate building workspace: ${request.workspace}")
+ execStep(1) {
+ LfsOperator.mkCleanDirs(request.workspace)
+ logInfo(s"recreate building workspace: ${request.workspace}")
+ }.getOrElse(throw getError.exception)
+ // build flink job shaded jar
+ val shadedJar =
+ execStep(2) {
+ val output = MavenTool.buildFatJar(
+ request.mainClass,
+ request.providedLibs,
+ request.getShadedJarPath(request.workspace))
+ logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
+ output
}.getOrElse(throw getError.exception)
- // build flink job shaded jar
- val shadedJar =
- execStep(2) {
- val output = MavenTool.buildFatJar(
- request.mainClass,
- request.providedLibs,
- request.getShadedJarPath(request.workspace))
- logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
- output
- }.getOrElse(throw getError.exception)
- ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
- }
-
+ ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
}
}