This is an automated email from the ASF dual-hosted git repository.
kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 279e2d696 [Improve] resource improvement (#2860)
279e2d696 is described below
commit 279e2d696fe96f0bc9cf709cbd8720e0fea78923
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 6 13:14:58 2023 +0800
[Improve] resource improvement (#2860)
---
streampark-common/pom.xml | 2 +
.../streampark/common/conf/ConfigConst.scala | 12 +-
.../streampark/common/util/ClassLoaderUtils.scala | 15 +-
.../streampark-console-service/pom.xml | 9 +-
.../main/assembly/script/schema/mysql-schema.sql | 23 ++
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 16 +-
.../console/base/domain/RestRequest.java | 2 +-
.../console/base/domain/RestResponse.java | 2 +-
.../console/base/domain/router/RouterMeta.java | 2 +-
.../console/base/domain/router/VueRouter.java | 2 +-
.../streampark/console/base/util/CommonUtils.java | 6 +-
.../streampark/console/base/util/JacksonUtils.java | 2 -
.../streampark/console/base/util/ObjectUtils.java | 12 +-
.../streampark/console/core/bean/Dependency.java | 22 +-
.../bean/FlinkConnectorResource.java} | 23 +-
.../apache/streampark/console/core/bean/Pom.java | 11 +-
.../core/controller/ApplicationController.java | 5 +-
.../core/controller/ResourceController.java | 17 +-
.../console/core/entity/Application.java | 85 +----
.../streampark/console/core/entity/FlinkSql.java | 8 +-
.../streampark/console/core/entity/Resource.java | 15 +-
.../streampark/console/core/entity/Variable.java | 2 +-
.../console/core/service/ApplicationService.java | 3 -
.../console/core/service/ResourceService.java | 9 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 5 +-
.../core/service/impl/ApplicationServiceImpl.java | 89 ++---
.../core/service/impl/ResourceServiceImpl.java | 376 ++++++++++++++++++---
.../console/system/authentication/JWTToken.java | 2 +-
.../console/system/entity/AccessToken.java | 2 +-
.../streampark/console/system/entity/Member.java | 2 +-
.../streampark/console/system/entity/Menu.java | 2 +-
.../streampark/console/system/entity/Role.java | 2 +-
.../streampark/console/system/entity/RoleMenu.java | 2 +-
.../streampark/console/system/entity/SysLog.java | 2 +-
.../streampark/console/system/entity/Team.java | 2 +-
.../streampark/console/system/entity/User.java | 2 +-
.../src/main/resources/db/schema-h2.sql | 3 +
.../main/resources/mapper/core/ResourceMapper.xml | 3 +
.../console/base/util/DependencyUtilsTest.java | 151 +++++++++
.../core/service/ApplicationServiceTest.java | 29 --
.../console/core/service/ResourceServiceTest.java | 61 ++++
.../src/api/flink/resource/index.ts | 18 +
.../src/api/flink/resource/model/resourceModel.ts | 1 +
.../src/locales/lang/en/flink/resource.ts | 13 +-
.../src/locales/lang/zh-CN/flink/resource.ts | 13 +-
.../src/views/flink/app/styles/Add.less | 21 +-
.../src/views/flink/resource/View.vue | 88 +++--
.../views/flink/resource/components/Resource.vue | 21 +-
.../flink/resource/components/ResourceDrawer.vue | 124 +++++--
.../src/views/flink/resource/useResourceRender.tsx | 37 +-
.../doris/bean/LoadStatusFailedException.java | 2 +-
.../streampark/flink/packer/maven/MavenTool.scala | 18 +-
.../impl/FlinkYarnApplicationBuildPipeline.scala | 4 +-
.../streampark/flink/packer/MavenToolSpec.scala | 1 +
54 files changed, 1012 insertions(+), 389 deletions(-)
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 64694d942..32babf65c 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -115,6 +115,7 @@
<optional>true</optional>
</dependency>
+
<!--logback -->
<dependency>
<groupId>org.apache.streampark</groupId>
@@ -128,6 +129,7 @@
<version>${streampark.shaded.version}</version>
</dependency>
+
<!-- ZIO -->
<dependency>
<groupId>dev.zio</groupId>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 9f39a914c..75c0b4e9c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -80,18 +80,16 @@ object ConfigConst {
val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"
// flink
- def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf"
else s"${prefix}conf"
+ def KEY_APP_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}conf"
- def KEY_FLINK_CONF(prefix: String = null): String =
- if (prefix == null) "flink.conf" else s"${prefix}flink.conf"
+ def KEY_FLINK_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}flink.conf"
- def KEY_APP_NAME(prefix: String = null): String =
- if (prefix == null) "app.name" else s"${prefix}app.name"
+ def KEY_APP_NAME(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}app.name"
- def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql"
else s"${prefix}sql"
+ def KEY_FLINK_SQL(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}sql"
def KEY_FLINK_PARALLELISM(prefix: String = null): String =
- if (prefix == null) "parallelism.default" else
s"${prefix}parallelism.default"
+ s"${Option(prefix).getOrElse("")}parallelism.default"
val KEY_FLINK_OPTION_PREFIX = "flink.option."
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index 620f680e7..eb3f3dadc 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,9 +16,11 @@
*/
package org.apache.streampark.common.util
-import java.io.File
+import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
-import java.util.function.Supplier
+import java.util.function.{Consumer, Supplier}
+
+import scala.collection.mutable.ArrayBuffer
object ClassLoaderUtils extends Logger {
@@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
+ @throws[IOException]
+ def cloneClassLoader(): ClassLoader = {
+ val urls = originalClassLoader.getResources(".")
+ val buffer = ArrayBuffer[URL]()
+ while (urls.hasMoreElements) {
+ buffer += urls.nextElement()
+ }
+ new URLClassLoader(buffer.toArray[URL], originalClassLoader)
+ }
def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index f88f57aa3..4202d7c96 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -380,6 +380,7 @@
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -436,11 +437,17 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>
-
+
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 7783e6c66..52b885c7d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -531,4 +531,27 @@ create table `t_yarn_queue` (
unique key `unq_team_id_queue_label` (`team_id`, `queue_label`) using btree
) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci;
+
+
+-- ----------------------------
+-- Table of t_resource
+-- ----------------------------
+drop table if exists `t_resource`;
+create table if not exists `t_resource` (
+`id` bigint not null auto_increment primary key,
+`resource_name` varchar(128) not null comment 'The name of the resource',
+`resource_type` int not null comment '0:app 1:common 2:connector 3:format
4:udf',
+`resource_path` varchar(255) default null,
+`resource` text,
+`engine_type` int not null comment 'compute engine type, 0:apache flink
1:apache spark',
+`main_class` varchar(255) default null,
+`description` text default null comment 'More detailed description of
resource',
+`creator_id` bigint not null comment 'user id of creator',
+`connector_required_options` text default null,
+`connector_optional_options` text default null,
+`team_id` bigint not null comment 'team id',
+`create_time` datetime not null default current_timestamp comment 'create
time',
+`modify_time` datetime not null default current_timestamp on update
current_timestamp comment 'modify time'
+);
+
set foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 757ac6cd7..f428f9216 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -76,14 +76,14 @@ alter table `t_user` modify column `login_type` tinyint
default 0 comment 'login
-- ----------------------------
drop table if exists `t_flink_gateway`;
create table `t_flink_gateway` (
- `id` bigint not null auto_increment,
- `gateway_name` varchar(128) collate
utf8mb4_general_ci not null comment 'The name of the gateway',
- `description` text collate utf8mb4_general_ci
default null comment 'More detailed description of resource',
- `gateway_type` int not null comment 'The type
of the gateway',
- `address` varchar(150) default null comment
'url address of gateway endpoint',
- `create_time` datetime not null default
current_timestamp comment 'create time',
- `modify_time` datetime not null default
current_timestamp on update current_timestamp comment 'modify time',
- primary key (`id`) using btree
+`id` bigint not null auto_increment,
+`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The
name of the gateway',
+`description` text collate utf8mb4_general_ci default null comment 'More
detailed description of resource',
+`gateway_type` int not null comment 'The type of the gateway',
+`address` varchar(150) default null comment 'url address of gateway endpoint',
+`create_time` datetime not null default current_timestamp comment 'create
time',
+`modify_time` datetime not null default current_timestamp on update
current_timestamp comment 'modify time',
+primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
-- menu level 2
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
index 42fe5084a..753458404 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
@Data
public class RestRequest implements Serializable {
- private static final long serialVersionUID = -4869594085374385813L;
+ private static final long serialVersionUID = 1L;
@Schema(example = "10", required = true)
private int pageSize = 10;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
index f15fdc410..e5a2610e3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
@@ -24,7 +24,7 @@ public class RestResponse extends HashMap<String, Object> {
public static final String STATUS_SUCCESS = "success";
public static final String STATUS_FAIL = "error";
- private static final long serialVersionUID = -8713837118340960775L;
+ private static final long serialVersionUID = 1L;
public static RestResponse success(Object data) {
RestResponse resp = new RestResponse();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
index 5753b6f64..1939e99db 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class RouterMeta implements Serializable {
- private static final long serialVersionUID = 5499925008927195914L;
+ private static final long serialVersionUID = 1L;
private Boolean closeable;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
index 45688413c..da3dffd04 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
@@ -30,7 +30,7 @@ import java.util.List;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class VueRouter<T> implements Serializable {
- private static final long serialVersionUID = -3327478146308500708L;
+ private static final long serialVersionUID = 1L;
@JsonIgnore private String id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 5b80ba623..04b90c621 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -47,7 +47,7 @@ public final class CommonUtils implements Serializable {
private CommonUtils() {}
- private static final long serialVersionUID = 6458428317155311192L;
+ private static final long serialVersionUID = 1L;
private static final String OS = System.getProperty("os.name").toLowerCase();
@@ -199,7 +199,7 @@ public final class CommonUtils implements Serializable {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
- if (ObjectUtils.safeEquals(candidate, element)) {
+ if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
@@ -218,7 +218,7 @@ public final class CommonUtils implements Serializable {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
- if (ObjectUtils.safeEquals(candidate, element)) {
+ if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
index 2d99c7bbf..8fe562759 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.base.util;
import org.apache.streampark.common.util.DateUtils;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -41,7 +40,6 @@ public final class JacksonUtils {
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
- MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
MAPPER.setDateFormat(new SimpleDateFormat(DateUtils.fullFormat()));
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
index f375dad52..c16d2a42f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
@@ -104,7 +104,7 @@ public final class ObjectUtils {
return false;
}
for (Object arrayEle : array) {
- if (safeEquals(arrayEle, element)) {
+ if (equals(arrayEle, element)) {
return true;
}
}
@@ -238,7 +238,7 @@ public final class ObjectUtils {
* @return whether the given objects are equal
* @see Arrays#equals
*/
- public static boolean safeEquals(Object o1, Object o2) {
+ public static boolean equals(Object o1, Object o2) {
if (o1 == null || o2 == null) {
return false;
}
@@ -282,8 +282,8 @@ public final class ObjectUtils {
return false;
}
- public static boolean safeTrimEquals(Object o1, Object o2) {
- boolean equals = safeEquals(o1, o2);
+ public static boolean trimEquals(Object o1, Object o2) {
+ boolean equals = equals(o1, o2);
if (!equals) {
if (o1 != null && o2 != null) {
if (o1 instanceof String && o2 instanceof String) {
@@ -294,6 +294,10 @@ public final class ObjectUtils {
return equals;
}
+ public static boolean trimNoEquals(Object o1, Object o2) {
+ return !trimEquals(o1, o2);
+ }
+
/**
* Return as hash code for the given object; typically the value of <code>
* {@link Object#hashCode()}</code>. If the object is an array, this method
will delegate to any
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
index 6e15e62f5..413a2299a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
@@ -52,7 +52,7 @@ public class Dependency {
return pom.isEmpty() && jar.isEmpty();
}
- public boolean eq(Dependency other) {
+ public boolean equals(Dependency other) {
if (other == null) {
return false;
}
@@ -76,20 +76,20 @@ public class Dependency {
}
public DependencyInfo toJarPackDeps() {
- List<Artifact> mvnArts =
- this.pom.stream()
- .map(
- pom ->
- new Artifact(
- pom.getGroupId(),
- pom.getArtifactId(),
- pom.getVersion(),
- pom.getClassifier()))
- .collect(Collectors.toList());
+ List<Artifact> mvnArts = toArtifact();
List<String> extJars =
this.jar.stream()
.map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
.collect(Collectors.toList());
return new DependencyInfo(mvnArts, extJars);
}
+
+ public List<Artifact> toArtifact() {
+ return this.pom.stream()
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(), pom.getArtifactId(), pom.getVersion(),
pom.getClassifier()))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
similarity index 63%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
index e2f28c6b2..b6dc9db68 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.streampark.console.system.entity;
+package org.apache.streampark.console.core.bean;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
-import java.io.Serializable;
+import java.util.Map;
-@TableName("t_role_menu")
@Data
-public class RoleMenu implements Serializable {
-
- private static final long serialVersionUID = -7573904024872252113L;
-
- @TableId(type = IdType.AUTO)
- private Long id;
-
- private Long roleId;
-
- private Long menuId;
+public class FlinkConnectorResource {
+ private String className;
+ private String factoryIdentifier;
+ Map<String, String> requiredOptions;
+ Map<String, String> optionalOptions;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
index da10a529d..479195915 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
@@ -48,10 +48,11 @@ public class Pom {
@Override
public String toString() {
- return groupId + ":" + artifactId + ":" + version + getClassifier(":");
- }
-
- private String getClassifier(String joiner) {
- return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
+ return String.format(
+ "%s:%s:%s%s",
+ groupId,
+ artifactId,
+ version,
+ StringUtils.isEmpty(classifier) ? "" : ":".concat(classifier));
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 12486ef8a..0f9825f3f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -36,6 +36,7 @@ import
org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -80,6 +81,8 @@ public class ApplicationController {
@Autowired private AppBuildPipeService appBuildPipeService;
+ @Autowired private ResourceService resourceService;
+
@Operation(summary = "Get application")
@ApiAccess
@PostMapping("get")
@@ -397,7 +400,7 @@ public class ApplicationController {
@PostMapping("upload")
@RequiresPermissions("app:create")
public RestResponse upload(MultipartFile file) throws Exception {
- String uploadPath = applicationService.upload(file);
+ String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
index 27c95b555..a4608d134 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
@@ -36,6 +36,7 @@ import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
import javax.validation.Valid;
@@ -53,11 +54,17 @@ public class ResourceController {
@Operation(summary = "add resource")
@PostMapping("add")
@RequiresPermissions("resource:add")
- public RestResponse addResource(@Valid Resource resource) {
+ public RestResponse addResource(@Valid Resource resource) throws Exception {
this.resourceService.addResource(resource);
return RestResponse.success();
}
+ @Operation(summary = "check resource")
+ @PostMapping("check")
+ public RestResponse checkResource(@Valid Resource resource) throws Exception
{
+ return this.resourceService.checkResource(resource);
+ }
+
@Operation(summary = "List resources")
@PostMapping("page")
public RestResponse page(RestRequest restRequest, Resource resource) {
@@ -87,4 +94,12 @@ public class ResourceController {
List<Resource> resourceList = resourceService.findByTeamId(teamId);
return RestResponse.success(resourceList);
}
+
+ @Operation(summary = "Upload the resource jar")
+ @PostMapping("upload")
+ @RequiresPermissions("resource:add")
+ public RestResponse upload(MultipartFile file) throws Exception {
+ String uploadPath = resourceService.upload(file);
+ return RestResponse.success(uploadPath);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4fc0cf147..7bbeffccb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -252,14 +251,6 @@ public class Application implements Serializable {
return ingressTemplate;
}
- public void setIngressTemplate(String ingressTemplate) {
- this.ingressTemplate = ingressTemplate;
- }
-
- public String getDefaultModeIngress() {
- return defaultModeIngress;
- }
-
public void setDefaultModeIngress(String defaultModeIngress) {
this.defaultModeIngress = defaultModeIngress;
}
@@ -357,11 +348,6 @@ public class Application implements Serializable {
return DevelopmentMode.of(jobType);
}
- @JsonIgnore
- public void setDevelopmentMode(DevelopmentMode mode) {
- this.jobType = mode.getValue();
- }
-
@JsonIgnore
public FlinkAppState getFlinkAppStateEnum() {
return FlinkAppState.of(state);
@@ -386,7 +372,7 @@ public class Application implements Serializable {
public boolean eqFlinkJob(Application other) {
if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
- return this.getDependencyObject().eq(other.getDependencyObject());
+ return this.getDependencyObject().equals(other.getDependencyObject());
}
}
return false;
@@ -509,75 +495,6 @@ public class Application implements Serializable {
return false;
}
- /**
- * Parameter comparison, mainly to compare whether the parameters related to
Flink runtime have
- * changed
- */
- public boolean eqJobParam(Application other) {
- // 1) Resolve Order has it changed
- // 2) flink Version has it changed
- // 3) Execution Mode has it changed
- // 4) Parallelism has it changed
- // 5) Task Slots has it changed
- // 6) Options has it changed
- // 7) properties has it changed
- // 8) Program Args has it changed
- // 9) Flink Version has it changed
-
- if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
- return false;
- }
-
- if (!ObjectUtils.safeEquals(this.getResolveOrder(),
other.getResolveOrder())
- || !ObjectUtils.safeEquals(this.getExecutionMode(),
other.getExecutionMode())
- || !ObjectUtils.safeEquals(this.getK8sRestExposedType(),
other.getK8sRestExposedType())) {
- return false;
- }
-
- if (this.getOptions() != null) {
- if (other.getOptions() != null) {
- if (!this.getOptions().trim().equals(other.getOptions().trim())) {
- Map<String, Object> optMap = this.getOptionMap();
- Map<String, Object> otherMap = other.getOptionMap();
- if (optMap.size() != otherMap.size()) {
- return false;
- }
- for (Map.Entry<String, Object> entry : optMap.entrySet()) {
- if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
- return false;
- }
- }
- }
- } else {
- return false;
- }
- } else if (other.getOptions() != null) {
- return false;
- }
-
- if (this.getDynamicProperties() != null) {
- if (other.getDynamicProperties() != null) {
- if
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
{
- return false;
- }
- } else {
- return false;
- }
- } else if (other.getDynamicProperties() != null) {
- return false;
- }
-
- if (this.getArgs() != null) {
- if (other.getArgs() != null) {
- return this.getArgs().trim().equals(other.getArgs().trim());
- } else {
- return false;
- }
- } else {
- return other.getArgs() == null;
- }
- }
-
@JsonIgnore
public StorageType getStorageType() {
return getStorageType(getExecutionMode());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index cb3fc4f97..60fc07dbd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.ChangedType;
@@ -30,6 +29,7 @@ import lombok.Data;
import java.util.Base64;
import java.util.Date;
+import java.util.Objects;
@Data
@TableName("t_flink_sql")
@@ -89,10 +89,10 @@ public class FlinkSql {
// 2) determine if dependency has changed
Dependency thisDependency = Dependency.toDependency(this.getDependency());
Dependency targetDependency =
Dependency.toDependency(target.getDependency());
- boolean depDifference = !thisDependency.eq(targetDependency);
+ boolean depDifference = !thisDependency.equals(targetDependency);
// 3) determine if team resource has changed
- boolean teamResDifference =
- !ObjectUtils.safeEquals(this.teamResource, target.getTeamResource());
+ boolean teamResDifference = !Objects.equals(this.teamResource,
target.getTeamResource());
+
if (sqlDifference && depDifference && teamResDifference) {
return ChangedType.ALL;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
index ac5eea6a5..22a1fb3e0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
@@ -35,13 +35,17 @@ import java.util.Date;
@TableName("t_resource")
public class Resource implements Serializable {
- private static final long serialVersionUID = -7720746591258904369L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
+ // resourceName unique
private String resourceName;
+ // resource path
+ private String resourcePath;
+
private String resource;
@Size(max = 100, message = "{noMoreThan}")
@@ -54,8 +58,15 @@ public class Resource implements Serializable {
private EngineType engineType;
+ // for flink app
private String mainClass;
+ // for flink connector
+ private String connectorRequiredOptions;
+
+ // for flink connector
+ private String connectorOptionalOptions;
+
/** user name of creator */
private transient String creatorName;
@@ -69,4 +80,6 @@ public class Resource implements Serializable {
private transient String sortField;
private transient String sortOrder;
+
+ private transient String connector;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
index 75571f53a..458c74d3d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
@@ -35,7 +35,7 @@ import java.util.Date;
@TableName("t_variable")
public class Variable implements Serializable {
- private static final long serialVersionUID = -7720746591258904369L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index cba588c65..8e51c155f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -25,7 +25,6 @@ import
org.apache.streampark.console.core.enums.AppExistsState;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
-import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.Serializable;
@@ -75,8 +74,6 @@ public interface ApplicationService extends
IService<Application> {
Map<String, Serializable> dashboard(Long teamId);
- String upload(MultipartFile file) throws Exception;
-
/** set the latest to Effective, it will really become the current effective
*/
void toEffective(Application application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index d59df4a59..4a4781911 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -18,11 +18,14 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.Resource;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
+import org.springframework.web.multipart.MultipartFile;
+import java.io.IOException;
import java.util.List;
public interface ResourceService extends IService<Resource> {
@@ -49,7 +52,7 @@ public interface ResourceService extends IService<Resource> {
*
* @param resource resource
*/
- void addResource(Resource resource);
+ void addResource(Resource resource) throws Exception;
/**
* @param teamId team id
@@ -87,4 +90,8 @@ public interface ResourceService extends IService<Resource> {
* @param targetUserId target user id
*/
void changeOwnership(Long userId, Long targetUserId);
+
+ String upload(MultipartFile file) throws IOException;
+
+ RestResponse checkResource(Resource resource) throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2a38308cb..b7c37eee6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -108,6 +108,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.streampark.console.core.enums.Operation.RELEASE;
+
@Service
@Slf4j
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor =
Exception.class)
@@ -170,8 +172,7 @@ public class AppBuildPipeServiceImpl
Application app = applicationService.getById(appId);
ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(
- org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+ applicationLog.setOptionName(RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 7c9140dc1..ee2d996d5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -96,7 +96,6 @@ import
org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CoreOptions;
@@ -120,7 +119,6 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
@@ -307,24 +305,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return map;
}
- @Override
- public String upload(MultipartFile file) throws Exception {
- File temp = WebUtils.getAppTempDir();
- String fileName =
FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
- File saveFile = new File(temp, fileName);
- // delete when exists
- if (saveFile.exists()) {
- saveFile.delete();
- }
- // save file to temp dir
- try {
- file.transferTo(saveFile);
- } catch (Exception e) {
- throw new ApiDetailException(e);
- }
- return saveFile.getAbsolutePath();
- }
-
@Override
public void toEffective(Application application) {
// set latest to Effective
@@ -861,57 +841,60 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(),
appParam.getTeamId()));
application.setRelease(ReleaseState.NEED_RELEASE.get());
+
+ // 1) jar job jar file changed
if (application.isUploadJob()) {
- if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+ if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
- long checkSum = 0;
try {
- checkSum = FileUtils.checksumCRC32(jarFile);
+ long checkSum = FileUtils.checksumCRC32(jarFile);
+ if (!Objects.equals(checkSum, application.getJarCheckSum())) {
+ application.setBuild(true);
+ }
} catch (IOException e) {
log.error("Error in checksumCRC32 for {}.", jarFile);
throw new RuntimeException(e);
}
- if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum()))
{
- application.setBuild(true);
- }
- }
- }
- }
-
- if (!application.getBuild()) {
- if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
- if
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
- ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
- application.setBuild(true);
}
}
}
- if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
- if (!ObjectUtils.safeTrimEquals(
+ // 2) k8s podTemplate changed..
+ if (application.getBuild() &&
ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
+ if (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(),
appParam.getK8sRestExposedType())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(),
appParam.getK8sJmPodTemplate())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(),
appParam.getK8sTmPodTemplate())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
- || !ObjectUtils.safeTrimEquals(
+ || ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(),
appParam.getK8sHadoopIntegration())
- || !ObjectUtils.safeTrimEquals(application.getFlinkImage(),
appParam.getFlinkImage())) {
+ || ObjectUtils.trimNoEquals(application.getFlinkImage(),
appParam.getFlinkImage())) {
application.setBuild(true);
}
}
- // when flink version has changed, we should rebuild the application.
Otherwise, the shims jar
- // may be not suitable for the new flink version.
- if (!ObjectUtils.safeEquals(application.getVersionId(),
appParam.getVersionId())) {
+ // 3) flink version changed
+ if (!application.getBuild()
+ && !Objects.equals(application.getVersionId(),
appParam.getVersionId())) {
application.setBuild(true);
}
+ // 4) yarn application mode change
+ if (!application.getBuild()) {
+ if (!application.getExecutionMode().equals(appParam.getExecutionMode()))
{
+ if
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
+ ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+ application.setBuild(true);
+ }
+ }
+ }
+
appParam.setJobType(application.getJobType());
// changes to the following parameters need to be re-release to take effect
application.setJobName(appParam.getJobName());
@@ -959,15 +942,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
+ return true;
+ }
+
+ if (application.isStreamParkJob()) {
+ configService.update(appParam, application.isRunning());
} else {
- if (application.isStreamParkJob()) {
- configService.update(appParam, application.isRunning());
- } else {
- application.setJar(appParam.getJar());
- application.setMainClass(appParam.getMainClass());
- }
+ application.setJar(appParam.getJar());
+ application.setMainClass(appParam.getMainClass());
}
- baseMapper.updateById(application);
+ this.updateById(application);
return true;
}
@@ -1039,6 +1023,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
}
+ this.updateById(application);
this.configService.update(appParam, application.isRunning());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index f702ad004..de3ec905c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -19,11 +19,16 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.Dependency;
+import org.apache.streampark.console.core.bean.FlinkConnectorResource;
import org.apache.streampark.console.core.bean.Pom;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkSql;
@@ -34,25 +39,47 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.Factory;
+import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.multipart.MultipartFile;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Scanner;
+import java.util.ServiceLoader;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
import java.util.stream.Collectors;
@Slf4j
@@ -65,6 +92,8 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
@Autowired private CommonService commonService;
@Autowired private FlinkSqlService flinkSqlService;
+ public ResourceServiceImpl() {}
+
@Override
public IPage<Resource> page(Resource resource, RestRequest restRequest) {
if (resource.getTeamId() == null) {
@@ -86,50 +115,53 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
}
@Override
- public void addResource(Resource resource) {
+ public void addResource(Resource resource) throws Exception {
String resourceStr = resource.getResource();
ApiAlertException.throwIfNull(resourceStr, "Please add pom or jar
resource.");
- if (resource.getResourceType() == ResourceType.GROUP) {
- ApiAlertException.throwIfNull(
- resource.getResourceName(), "The name of resource group is
required.");
+ // check
+ Dependency dependency = Dependency.toDependency(resourceStr);
+ List<String> jars = dependency.getJar();
+ List<Pom> poms = dependency.getPom();
+
+ ApiAlertException.throwIfTrue(
+ jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
+
+ ApiAlertException.throwIfTrue(
+ resource.getResourceType() == ResourceType.FLINK_APP && jars.isEmpty(),
+ "Please upload jar for Flink_App resource");
+
+ ApiAlertException.throwIfTrue(
+ jars.size() + poms.size() > 1, "Please do not add multi dependency at
one time.");
+
+ if (resource.getResourceType() != ResourceType.CONNECTOR) {
+ ApiAlertException.throwIfNull(resource.getResourceName(), "The
resourceName is required.");
} else {
- Dependency dependency = Dependency.toDependency(resourceStr);
- List<String> jars = dependency.getJar();
- List<Pom> poms = dependency.getPom();
-
- ApiAlertException.throwIfTrue(
- jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
- ApiAlertException.throwIfTrue(
- jars.size() + poms.size() > 1, "Please do not add multi dependency
at one time.");
- ApiAlertException.throwIfTrue(
- resource.getResourceType() == ResourceType.FLINK_APP &&
jars.isEmpty(),
- "Please upload jar for Flink_App resource");
-
- Long teamId = resource.getTeamId();
- String resourceName = null;
-
- if (poms.isEmpty()) {
- resourceName = jars.get(0);
- ApiAlertException.throwIfTrue(
- this.findByResourceName(teamId, resourceName) != null,
- String.format("Sorry, the resource %s already exists.",
resourceName));
-
- // copy jar to team upload directory
- transferTeamResource(teamId, resourceName);
- } else {
- Pom pom = poms.get(0);
- resourceName =
- String.format("%s:%s:%s", pom.getGroupId(), pom.getArtifactId(),
pom.getVersion());
- if (StringUtils.isNotBlank(pom.getClassifier())) {
- resourceName = resourceName + ":" + pom.getClassifier();
- }
- ApiAlertException.throwIfTrue(
- this.findByResourceName(teamId, resourceName) != null,
- String.format("Sorry, the resource %s already exists.",
resourceName));
+ String connector = resource.getConnector();
+ ApiAlertException.throwIfTrue(connector == null, "the flink connector is
null.");
+ FlinkConnectorResource connectorResource =
+ JacksonUtils.read(connector, FlinkConnectorResource.class);
+ resource.setResourceName(connectorResource.getFactoryIdentifier());
+ if (connectorResource.getRequiredOptions() != null) {
+ resource.setConnectorRequiredOptions(
+ JacksonUtils.write(connectorResource.getRequiredOptions()));
}
+ if (connectorResource.getOptionalOptions() != null) {
+ resource.setConnectorOptionalOptions(
+ JacksonUtils.write(connectorResource.getOptionalOptions()));
+ }
+ }
- resource.setResourceName(resourceName);
+ ApiAlertException.throwIfTrue(
+ this.findByResourceName(resource.getTeamId(),
resource.getResourceName()) != null,
+ String.format("Sorry, the resource %s already exists.",
resource.getResourceName()));
+
+ if (!jars.isEmpty()) {
+ String resourcePath = jars.get(0);
+ resource.setResourcePath(resourcePath);
+ // copy jar to team upload directory
+ String upFile = resourcePath.split(":")[1];
+ transferTeamResource(resource.getTeamId(), upFile);
}
resource.setCreatorId(commonService.getUserId());
@@ -155,7 +187,12 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
ApiAlertException.throwIfFalse(
resourceName.equals(findResource.getResourceName()),
"Please make sure the resource name is not changed.");
- transferTeamResource(findResource.getTeamId(), resourceName);
+
+ Dependency dependency = Dependency.toDependency(resource.getResource());
+ if (!dependency.getJar().isEmpty()) {
+ String jarFile = dependency.getJar().get(0).split(":")[1];
+ transferTeamResource(findResource.getTeamId(), jarFile);
+ }
}
findResource.setDescription(resource.getDescription());
@@ -199,15 +236,234 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
this.baseMapper.update(null, updateWrapper);
}
- private void transferTeamResource(Long teamId, String resourceName) {
+ /**
+ * @param file
+ * @return
+ */
+ @Override
+ public String upload(MultipartFile file) throws IOException {
+ File temp = WebUtils.getAppTempDir();
+
+ String name = file.getOriginalFilename();
+ String suffix = name.substring(name.lastIndexOf("."));
+
+ String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
+ String fileName = sha256Hex.concat(suffix);
+
+ File saveFile = new File(temp, fileName);
+
+ if (!saveFile.exists()) {
+ // save file to temp dir
+ try {
+ file.transferTo(saveFile);
+ } catch (Exception e) {
+ throw new ApiDetailException(e);
+ }
+ }
+
+ return saveFile.getAbsolutePath();
+ }
+
+ @Override
+ public RestResponse checkResource(Resource resourceParam) throws
JsonProcessingException {
+ ResourceType type = resourceParam.getResourceType();
+ Map<String, Serializable> resp = new HashMap<>(0);
+ resp.put("state", 0);
+ switch (type) {
+ case FLINK_APP:
+ // check main.
+ File jarFile;
+ try {
+ jarFile = getResourceJar(resourceParam);
+ } catch (Exception e) {
+ // get jarFile error
+ resp.put("state", 1);
+ resp.put("exception", Utils.stringifyException(e));
+ return RestResponse.success().data(resp);
+ }
+ Manifest manifest = Utils.getJarManifest(jarFile);
+ String mainClass = manifest.getMainAttributes().getValue("Main-Class");
+
+ if (mainClass == null) {
+ // main class is null
+ resp.put("state", 2);
+ return RestResponse.success().data(resp);
+ }
+ return RestResponse.success().data(resp);
+ case CONNECTOR:
+ // 1) get connector id
+ FlinkConnectorResource connectorResource;
+
+ ApiAlertException.throwIfFalse(
+ ResourceType.CONNECTOR.equals(resourceParam.getResourceType()),
+ "getConnectorId method error, resource not flink connector.");
+
+ List<File> jars;
+ File connector = null;
+ List<String> factories;
+
+ Dependency dependency =
Dependency.toDependency(resourceParam.getResource());
+
+ // 1) get connector jar
+ if (!dependency.getPom().isEmpty()) {
+ Artifact artifact = dependency.toArtifact().get(0);
+ try {
+ jars = MavenTool.resolveArtifacts(artifact);
+ } catch (Exception e) {
+ // connector download is null
+ resp.put("state", 1);
+ resp.put("exception", Utils.stringifyException(e));
+ return RestResponse.success().data(resp);
+ }
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> file = jars.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ if (file.isPresent()) {
+ connector = file.get();
+ }
+ } else {
+ // 2) jar
+ String jar = dependency.getJar().get(0).split(":")[1];
+ File file = new File(jar);
+ connector = file;
+ jars = Collections.singletonList(file);
+ }
+
+ // 2) parse connector Factory
+ try {
+ factories = getConnectorFactory(connector);
+ } catch (Exception e) {
+ // flink connector invalid
+ resp.put("state", 2);
+ resp.put("exception", Utils.stringifyException(e));
+ return RestResponse.success().data(resp);
+ }
+
+ // 3) get connector resource
+ connectorResource = getConnectorResource(jars, factories);
+ if (connectorResource == null) {
+ // connector is null
+ resp.put("state", 3);
+ return RestResponse.success().data(resp);
+ }
+
+ // 2) check connector exists
+ boolean exists =
+ existsFlinkConnector(resourceParam.getId(),
connectorResource.getFactoryIdentifier());
+ if (exists) {
+ resp.put("state", 4);
+ resp.put("name", connectorResource.getFactoryIdentifier());
+ return RestResponse.success(resp);
+ }
+
+ if (resourceParam.getId() != null) {
+ Resource resource = getById(resourceParam.getId());
+ if
(!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
+ resp.put("state", 5);
+ return RestResponse.success().data(resp);
+ }
+ }
+ resp.put("state", 0);
+ resp.put("connector", JacksonUtils.write(connectorResource));
+ return RestResponse.success().data(resp);
+ }
+ return RestResponse.success().data(resp);
+ }
+
+ private boolean existsFlinkConnector(Long id, String connectorId) {
+ LambdaQueryWrapper<Resource> lambdaQueryWrapper =
+ new LambdaQueryWrapper<Resource>().eq(Resource::getResourceName,
connectorId);
+ if (id != null) {
+ lambdaQueryWrapper.ne(Resource::getId, id);
+ }
+ return getBaseMapper().exists(lambdaQueryWrapper);
+ }
+
+ private FlinkConnectorResource getConnectorResource(List<File> jars,
List<String> factories) {
+ Class<Factory> className = Factory.class;
+ URL[] array =
+ jars.stream()
+ .map(
+ x -> {
+ try {
+ return x.toURI().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .toArray(URL[]::new);
+
+ try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
+ ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className,
urlClassLoader);
+ for (Factory factory : serviceLoader) {
+ String factoryClassName = factory.getClass().getName();
+ if (factories.contains(factoryClassName)) {
+ FlinkConnectorResource connectorResource = new
FlinkConnectorResource();
+ try {
+ connectorResource.setClassName(factoryClassName);
+
connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
+ } catch (Exception ignored) {
+ }
+
+ try {
+ Map<String, String> requiredOptions = new HashMap<>(0);
+ factory
+ .requiredOptions()
+ .forEach(x -> requiredOptions.put(x.key(),
getOptionDefaultValue(x)));
+ connectorResource.setRequiredOptions(requiredOptions);
+ } catch (Exception ignored) {
+
+ }
+
+ try {
+ Map<String, String> optionalOptions = new HashMap<>(0);
+ factory
+ .optionalOptions()
+ .forEach(x -> optionalOptions.put(x.key(),
getOptionDefaultValue(x)));
+ connectorResource.setOptionalOptions(optionalOptions);
+ } catch (Exception ignored) {
+ }
+ return connectorResource;
+ }
+ }
+ return null;
+ } catch (Exception e) {
+ log.error("getConnectorResource failed. " + e);
+ }
+ return null;
+ }
+
+ private File getResourceJar(Resource resource) throws Exception {
+ Dependency dependency = Dependency.toDependency(resource.getResource());
+ if (dependency.isEmpty()) {
+ return null;
+ }
+ if (!dependency.getJar().isEmpty()) {
+ String jar = dependency.getJar().get(0).split(":")[1];
+ return new File(jar);
+ } else {
+ Artifact artifact = dependency.toArtifact().get(0);
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (!files.isEmpty()) {
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> jarFile =
+ files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ if (jarFile.isPresent()) {
+ return jarFile.get();
+ }
+ }
+ return null;
+ }
+ }
+
+ private void transferTeamResource(Long teamId, String resourcePath) {
String teamUploads = String.format("%s/%d",
Workspace.local().APP_UPLOADS(), teamId);
if (!FsOperator.lfs().exists(teamUploads)) {
FsOperator.lfs().mkdirs(teamUploads);
}
- File localJar = new File(WebUtils.getAppTempDir(), resourceName);
- File teamUploadJar = new File(teamUploads, resourceName);
+ File localJar = new File(resourcePath);
+ File teamUploadJar = new File(teamUploads, localJar.getName());
ApiAlertException.throwIfFalse(
- localJar.exists(), "Missing file: " + resourceName + ", please upload
again");
+ localJar.exists(), "Missing file: " + resourcePath + ", please upload
again");
FsOperator.lfs()
.upload(localJar.getAbsolutePath(), teamUploadJar.getAbsolutePath(),
false, true);
}
@@ -246,4 +502,36 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
return dependApplications;
}
+
+ private List<String> getConnectorFactory(File connector) throws Exception {
+ String configFile =
"META-INF/services/org.apache.flink.table.factories.Factory";
+ JarFile jarFile = new JarFile(connector);
+ JarEntry entry = jarFile.getJarEntry(configFile);
+ if (entry == null) {
+ throw new IllegalArgumentException("invalid flink connector");
+ }
+ List<String> factories = new ArrayList<>(0);
+ try (InputStream inputStream = jarFile.getInputStream(entry)) {
+ Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine().trim();
+ if (line.length() > 0 && !line.startsWith("#")) {
+ factories.add(line);
+ }
+ }
+ scanner.close();
+ }
+ return factories;
+ }
+
+ private String getOptionDefaultValue(ConfigOption<?> option) {
+ if (!option.hasDefaultValue()) {
+ return null;
+ }
+ Object value = option.defaultValue();
+ if (value instanceof Duration) {
+ return value.toString().replace("PT", "").toLowerCase();
+ }
+ return value.toString();
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
index b035253f2..f04adad00 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
@@ -25,7 +25,7 @@ import lombok.Data;
@Data
public class JWTToken implements AuthenticationToken {
- private static final long serialVersionUID = 1282057025599826155L;
+ private static final long serialVersionUID = 1L;
private String token;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
index cc89f609d..901b61153 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
@@ -32,7 +32,7 @@ import java.util.Date;
@TableName("t_access_token")
public class AccessToken implements Serializable {
- private static final long serialVersionUID = 7187628714679791772L;
+ private static final long serialVersionUID = 1L;
public static final String DEFAULT_EXPIRE_TIME = "9999-01-01 00:00:00";
public static final String IS_API_TOKEN = "is_api_token";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
index d3e7e029b..6c85a3b96 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
@@ -29,7 +29,7 @@ import java.util.Date;
@Data
public class Member implements Serializable {
- private static final long serialVersionUID = -3166012934498268403L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
index 61533daa2..17d9e914e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
@@ -32,7 +32,7 @@ import java.util.Date;
@TableName("t_menu")
public class Menu implements Serializable {
- private static final long serialVersionUID = 7187628714679791771L;
+ private static final long serialVersionUID = 1L;
public static final String TYPE_MENU = "0";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
index 16d853b30..ed412d340 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
@@ -32,7 +32,7 @@ import java.util.Date;
@TableName("t_role")
public class Role implements Serializable {
- private static final long serialVersionUID = -1714476694755654924L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long roleId;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
index e2f28c6b2..5658e9c2a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
@Data
public class RoleMenu implements Serializable {
- private static final long serialVersionUID = -7573904024872252113L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
index cdfd51572..cdf029eda 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
@@ -29,7 +29,7 @@ import java.util.Date;
@TableName("t_log")
public class SysLog implements Serializable {
- private static final long serialVersionUID = -8878596941954995444L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
index 415fb43e0..8d7bca18e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
@@ -31,7 +31,7 @@ import java.util.Date;
@TableName("t_team")
public class Team implements Serializable {
- private static final long serialVersionUID = -1714476694755654924L;
+ private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
index 4b3fc3375..7a328a1f9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
@@ -37,7 +37,7 @@ import java.util.Date;
@TableName("t_user")
public class User implements Serializable {
- private static final long serialVersionUID = -4852732617765810959L;
+ private static final long serialVersionUID = 1L;
/** user status */
public static final String STATUS_VALID = "1";
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index fcb05c25e..16b6503f1 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -220,11 +220,14 @@ create table if not exists `t_resource` (
`id` bigint generated by default as identity not null,
`resource_name` varchar(128) not null comment 'The name of the resource
file',
`resource_type` int not null comment '0:app 1:common 2:connector 3:format
4:udf',
+ `resource_path` varchar(255) default null,
`resource` text ,
`engine_type` int not null comment 'compute engine type, 0:apache flink
1:apache spark',
`main_class` varchar(255) default null,
`description` text default null comment 'More detailed description of
resource',
`creator_id` bigint not null comment 'user id of creator',
+ `connector_required_options` text default null,
+ `connector_optional_options` text default null,
`team_id` bigint not null comment 'team id',
`create_time` datetime not null default current_timestamp comment 'create
time',
`modify_time` datetime not null default current_timestamp on update
current_timestamp comment 'modify time',
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
index c8020991a..21228ca97 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
@@ -20,11 +20,14 @@
<resultMap id="ResourceMap"
type="org.apache.streampark.console.core.entity.Resource">
<result column="id" jdbcType="BIGINT" property="id"/>
<result column="resource_name" jdbcType="VARCHAR"
property="resourceName"/>
+ <result column="connector_path" jdbcType="VARCHAR"
property="connectorPath"/>
<result column="description" jdbcType="VARCHAR"
property="description"/>
<result column="resource_type" jdbcType="BIGINT"
property="resourceType"/>
<result column="resource" jdbcType="VARCHAR" property="resource"/>
<result column="engine_type" jdbcType="BIGINT" property="engineType"/>
<result column="creator_id" jdbcType="BIGINT" property="creatorId"/>
+ <result column="connector_required_options" jdbcType="LONGVARCHAR"
property="connectorRequiredOptions"/>
+ <result column="connector_optional_options" jdbcType="LONGVARCHAR"
property="connectorOptionalOptions"/>
<result column="team_id" jdbcType="BIGINT" property="teamId"/>
<result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
<result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
new file mode 100644
index 000000000..10dc40ccb
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.console.core.bean.FlinkConnectorResource;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.Factory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Scanner;
+import java.util.ServiceLoader;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+@Slf4j
+class DependencyUtilsTest {
+
+ @Test
+ public void resolveFlinkConnector() throws Exception {
+
+ Artifact artifact = new Artifact("com.ververica",
"flink-connector-mysql-cdc", "2.4.1", null);
+
+ InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"~/tmp");
+
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (files.isEmpty()) {
+ return;
+ }
+
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> jarFile = files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ File connector = jarFile.get();
+
+ List<String> factories = getConnectorFactory(connector);
+
+ Class<Factory> className = Factory.class;
+ URL[] array =
+ files.stream()
+ .map(
+ x -> {
+ try {
+ return x.toURI().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .toArray(URL[]::new);
+
+ URLClassLoader urlClassLoader = URLClassLoader.newInstance(array);
+ ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className,
urlClassLoader);
+
+ List<FlinkConnectorResource> connectorResources = new ArrayList<>();
+ try {
+ for (Factory factory : serviceLoader) {
+ String factoryClassName = factory.getClass().getName();
+ if (factories.contains(factoryClassName)) {
+ FlinkConnectorResource connectorResource = new
FlinkConnectorResource();
+ connectorResource.setClassName(factoryClassName);
+ connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
+ Map<String, String> requiredOptions = new HashMap<>(0);
+ factory
+ .requiredOptions()
+ .forEach(x -> requiredOptions.put(x.key(),
getOptionDefaultValue(x)));
+ connectorResource.setRequiredOptions(requiredOptions);
+
+ Map<String, String> optionalOptions = new HashMap<>(0);
+ factory
+ .optionalOptions()
+ .forEach(x -> optionalOptions.put(x.key(),
getOptionDefaultValue(x)));
+ connectorResource.setOptionalOptions(optionalOptions);
+
+ connectorResources.add(connectorResource);
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ urlClassLoader.close();
+ System.out.println(connectorResources);
+ }
+
+ private String getOptionDefaultValue(ConfigOption<?> option) {
+ if (!option.hasDefaultValue()) {
+ return null;
+ }
+ Object value = option.defaultValue();
+ if (value instanceof Duration) {
+ return value.toString().replace("PT", "").toLowerCase();
+ }
+ return value.toString();
+ }
+
+ @Test
+ public void testDuration() {
+ String s = "PT30H";
+ Duration duration = Duration.parse(s);
+ System.out.println(duration.getSeconds());
+ }
+
+ private List<String> getConnectorFactory(File connector) throws Exception {
+ String configFile =
"META-INF/services/org.apache.flink.table.factories.Factory";
+ JarFile jarFile = new JarFile(connector);
+ JarEntry entry = jarFile.getJarEntry(configFile);
+ List<String> factories = new ArrayList<>(0);
+ try (InputStream inputStream = jarFile.getInputStream(entry)) {
+ Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine().trim();
+ if (line.length() > 0 && !line.startsWith("#")) {
+ factories.add(line);
+ }
+ }
+ scanner.close();
+ }
+ return factories;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index aae093e7f..ea5c34560 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -23,22 +23,13 @@ import
org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.YarnQueue;
import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl;
-import org.apache.hc.core5.http.ContentType;
-
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import org.h2.store.fs.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.mock.web.MockMultipartFile;
-import org.springframework.web.multipart.MultipartFile;
-import java.io.File;
-import java.io.FileInputStream;
-import java.nio.file.Path;
import java.util.Date;
import static org.assertj.core.api.Assertions.assertThat;
@@ -105,26 +96,6 @@ class ApplicationServiceTest extends SpringTestBase {
applicationService.start(application, false);
}
- @Test
- void testUpload(@TempDir Path tempDir) throws Exception {
- // specify the file path
- File fileToStoreUploadFile =
- new File(tempDir.toFile().getAbsolutePath() +
"/fileToStoreUploadFile");
- FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath());
-
- File fileToUpload = new File(tempDir.toFile().getAbsolutePath() +
"/fileToUpload.jar");
- FileUtils.createFile(fileToUpload.getAbsolutePath());
- assertThat(fileToUpload).exists();
- MultipartFile mulFile =
- new MockMultipartFile(
- "test", // fileName (eg: streampark.jar)
- fileToUpload.getAbsolutePath(), // originalFilename (eg: path +
fileName =
- // /tmp/file/streampark.jar)
- ContentType.APPLICATION_OCTET_STREAM.toString(),
- new FileInputStream(fileToStoreUploadFile));
- applicationService.upload(mulFile);
- }
-
@Test
void testCheckQueueValidationIfNeeded() {
ApplicationServiceImpl applicationServiceImpl = (ApplicationServiceImpl)
applicationService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
new file mode 100644
index 000000000..e6c00ec71
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.SpringTestBase;
+
+import org.apache.hc.core5.http.ContentType;
+
+import org.h2.store.fs.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.mock.web.MockMultipartFile;
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** org.apache.streampark.console.core.service.ResourceServiceTest. */
+class ResourceServiceTest extends SpringTestBase {
+
+ @Autowired private ResourceService resourceService;
+
+ @Test
+ void testUpload(@TempDir Path tempDir) throws Exception {
+ // specify the file path
+ File fileToStoreUploadFile =
+ new File(tempDir.toFile().getAbsolutePath() +
"/fileToStoreUploadFile");
+ FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath());
+
+ File fileToUpload = new File(tempDir.toFile().getAbsolutePath() +
"/fileToUpload.jar");
+ FileUtils.createFile(fileToUpload.getAbsolutePath());
+ assertThat(fileToUpload).exists();
+ MultipartFile mulFile =
+ new MockMultipartFile(
+ "test", // fileName (eg: streampark.jar)
+ fileToUpload.getAbsolutePath(), // originalFilename (eg: path +
fileName =
+ // /tmp/file/streampark.jar)
+ ContentType.APPLICATION_OCTET_STREAM.toString(),
+ new FileInputStream(fileToStoreUploadFile));
+ resourceService.upload(mulFile);
+ }
+}
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
index 1b279c7a6..a0d045284 100644
---
a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
+++
b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
@@ -23,13 +23,16 @@ import {
ResourceListRecord,
ResourceParam,
} from './model/resourceModel';
+import { ContentTypeEnum } from '/@/enums/httpEnum';
enum RESOURCE_API {
PAGE = '/resource/page',
POST = '/resource/add',
UPDATE = '/resource/update',
+ UPLOAD = '/resource/upload',
DELETE = '/resource/delete',
LIST = '/resource/list',
+ CHECK = '/resource/check',
}
/**
@@ -76,3 +79,18 @@ export function fetchResourceDelete(data:
ResourceDeleteParam): Promise<AxiosRes
export function fetchTeamResource(data: Recordable):
Promise<ResourceListRecord[]> {
return defHttp.post({ url: RESOURCE_API.LIST, data });
}
+
+export function checkResource(data: ResourceParam):
Promise<AxiosResponse<Result>> {
+ return defHttp.post({ url: RESOURCE_API.CHECK, data });
+}
+
+export function fetchUpload(params) {
+ return defHttp.post<string>({
+ url: RESOURCE_API.UPLOAD,
+ params,
+ headers: {
+ 'Content-Type': ContentTypeEnum.FORM_DATA,
+ },
+ timeout: 1000 * 60 * 10, // Uploading files timed out for 10 minutes
+ });
+}
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
index 4de0f0f5e..3c720d38a 100644
---
a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
+++
b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
@@ -34,6 +34,7 @@ export interface ResourceListRecord {
export interface ResourceParam {
id?: string;
resourceName: string;
+ connector?: string;
engineType: string;
description: string;
}
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
index 19ffd1618..7e031498c 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
@@ -21,24 +21,32 @@ export default {
deleteResource: 'Delete Resource',
deletePopConfirm: 'Are you sure delete this resource ?',
uploadResource: 'Upload Resource',
+ resourceName: 'Resource Name',
resourceType: 'Resource Type',
engineType: 'Engine Type',
resourceGroup: 'Resource Group',
groupName: 'Group Name',
+ resourceNamePlaceholder: 'Please input resource name',
engineTypePlaceholder: 'Please select compute engine type',
resourceGroupPlaceholder: 'Please choose resource',
groupNamePlaceholder: 'Please input the group name',
groupNameIsRequiredMessage: 'Group Name is required',
multiPomTip: 'Do not add multiple dependencies at one time',
addResourceTip: 'Please add a resource',
+ jarFileErrorTip: 'Jar file is null, please try again',
+ mainNullTip: 'Flink app invalid, main class is null',
+ connectorExistsTip: 'this connector already exists',
+ connectorInvalidTip: 'flink connector invalid, please check',
+ connectorInfoErrorTip: 'get flink connector information error.',
+ connectorModifyTip: 'this connector cannot be modified, because
factoryIdentifier has changed',
add: 'Add',
success: ' successful',
fail: ' failed',
table: {
title: 'Resource List',
resourceName: 'Resource Name',
- resourceNamePlaceholder: 'Please enter the resource name to search',
- descriptionPlaceholder: 'Please enter description to search',
+ resourceNamePlaceholder: 'Please enter the resource name',
+ descriptionPlaceholder: 'Please enter description',
createUser: 'Create User',
createTime: 'Create Time',
modifyTime: 'Modify Time',
@@ -46,6 +54,7 @@ export default {
},
form: {
descriptionMessage: 'exceeds maximum length limit of 100 characters',
+ resourceNameIsRequiredMessage: 'resource name is required',
engineTypeIsRequiredMessage: 'compute engine type is required',
resourceTypeIsRequiredMessage: 'resource type is required',
resourceTypePlaceholder: 'Please select resource type',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
index 023d0f36b..5b0dc99e5 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
@@ -21,16 +21,24 @@ export default {
deleteResource: '删除资源',
deletePopConfirm: '你确定要删除这个资源?',
uploadResource: '上传资源',
+ resourceName: '资源名称',
resourceType: '资源类型',
engineType: '计算引擎类型',
resourceGroup: '资源组',
groupName: '资源组名称',
resourceGroupPlaceholder: '请选择组资源',
+ resourceNamePlaceholder: '请输入资源名称',
groupNamePlaceholder: '请输入资源组名称',
groupNameIsRequiredMessage: '资源组名称必填',
engineTypePlaceholder: '请选择计算引擎类型',
multiPomTip: '不支持同时添加多个依赖',
addResourceTip: '请添加资源',
+ jarFileErrorTip: 'Jar 文件为空,请重试',
+ mainNullTip: 'Flink app 无效,主类为空',
+ connectorInvalidTip: '该连接器无效,请检查',
+ connectorExistsTip: '该连接器已经存在',
+ connectorModifyTip: '该连接器无法修改,factoryIdentifier 不能更改',
+ connectorInfoErrorTip: '获取改连接器信息出错',
add: '添加',
success: '成功',
fail: '失败',
@@ -46,8 +54,9 @@ export default {
},
form: {
descriptionMessage: '超过 100 个字符的最大长度限制',
- engineTypeIsRequiredMessage: '计算引擎类型必选',
- resourceTypeIsRequiredMessage: '资源类型必选',
+ resourceNameIsRequiredMessage: '资源名称为必填项',
+ engineTypeIsRequiredMessage: '计算引擎类型为必填项',
+ resourceTypeIsRequiredMessage: '资源类型为必填项',
resourceTypePlaceholder: '请选择资源类型',
exists: '资源已存在',
empty: '资源不能为空',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
index b6acf41cd..a45c73ed2 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
@@ -186,27 +186,26 @@
}
.dependency-box {
- margin-right: 10px;
margin-top: 15px;
- margin-bottom: -10px;
+ width: 100%;
border-radius: 5px;
- background-color: @background-color-base;
+ background-color: #e6f4ff;
+ border: 1px solid #91caff;
display: inline-block;
+ margin-bottom: -25px;
.dependency-item {
position: relative;
border: unset;
+ background-color: unset;
line-height: 35px;
padding: 0 6px;
width: unset;
float: left;
- margin: 2px 4px 0;
-
.ant-alert-close-icon {
position: relative;
left: 5px;
top: 6px;
- color: @background-color-base;
}
}
}
@@ -267,8 +266,14 @@
}
[data-theme='dark'] {
- .app_controller .icon-close {
- color: #ffffff73;
+ .app_controller {
+ .icon-close {
+ color: #ffffff73;
+ }
+ .dependency-box {
+ background-color: #111b26;
+ border: 1px solid #153450;
+ }
}
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
index 0cae4e79e..bdb7ae75a 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
@@ -26,45 +26,48 @@
<template #resetBefore> 1111 </template>
<template #bodyCell="{ column, record }">
<template v-if="column.dataIndex === 'resourceType'">
- <Tag
- class="bold-tag"
- color="#52c41a"
- v-if="record.resourceType == ResourceTypeEnum.FLINK_APP"
- >
- FLINK_APP
+ <Tag color="processing" v-if="record.resourceType ===
ResourceTypeEnum.FLINK_APP">
+ <template #icon>
+ <img :src="flinkAppSvg" class="svg-icon" alt="Flink App" />
+ </template>
+ Flink App
</Tag>
- <Tag
- class="bold-tag"
- color="#2db7f5"
- v-if="record.resourceType == ResourceTypeEnum.NORMAL_JAR"
- >
- NORMAL_JAR
- </Tag>
- <Tag
- class="bold-tag"
- color="#108ee9"
- v-if="record.resourceType == ResourceTypeEnum.CONNECTOR"
- >
- CONNECTOR
+
+ <Tag color="processing" v-if="record.resourceType ===
ResourceTypeEnum.CONNECTOR">
+ <template #icon>
+ <img :src="connectorSvg" class="svg-icon" alt="Connector" />
+ </template>
+ Connector
</Tag>
- <Tag class="bold-tag" color="#79f379" v-if="record.resourceType ==
ResourceTypeEnum.UDXF">
+
+ <Tag color="processing" v-if="record.resourceType ===
ResourceTypeEnum.UDXF">
+ <template #icon>
+ <img :src="udxfSvg" class="svg-icon" alt="UDXF" />
+ </template>
UDXF
</Tag>
- <Tag
- class="bold-tag"
- color="#fcaa80"
- v-if="record.resourceType == ResourceTypeEnum.GROUP"
- >
+
+ <Tag color="processing" v-if="record.resourceType ===
ResourceTypeEnum.NORMAL_JAR">
+ <template #icon>
+ <img :src="normalJarSvg" class="svg-icon" alt="Normal jar" />
+ </template>
+ Normal Jar
+ </Tag>
+
+ <Tag color="processing" v-if="record.resourceType ===
ResourceTypeEnum.GROUP">
+ <template #icon>
+ <img :src="groupSvg" class="svg-icon" alt="GROUP" />
+ </template>
GROUP
</Tag>
</template>
<template v-if="column.dataIndex === 'engineType'">
- <Tag class="bold-tag" color="#e65270" v-if="record.engineType ==
EngineTypeEnum.FLINK">
- FLINK
- </Tag>
- <Tag class="bold-tag" color="#f5be07" v-if="record.engineType ==
EngineTypeEnum.SPARK">
- SPARK
- </Tag>
+ <span v-if="record.engineType === EngineTypeEnum.FLINK">
+ <SvgIcon name="flink" /> Apache Flink
+ </span>
+ <span v-if="record.engineType === EngineTypeEnum.SPARK">
+ <SvgIcon name="spark" /> Apache Spark
+ </span>
</template>
<template v-if="column.dataIndex === 'action'">
<TableAction
@@ -115,6 +118,13 @@
import { fetchResourceDelete, fetchResourceList, fetchTeamResource } from
'/@/api/flink/resource';
import { EngineTypeEnum, ResourceTypeEnum } from
'/@/views/flink/resource/resource.data';
import { Tag } from 'ant-design-vue';
+ import SvgIcon from '/@/components/Icon/src/SvgIcon.vue';
+
+ import flinkAppSvg from '/@/assets/icons/flink2.svg';
+ import connectorSvg from '/@/assets/icons/connector.svg';
+ import udxfSvg from '/@/assets/icons/fx.svg';
+ import normalJarSvg from '/@/assets/icons/jar.svg';
+ import groupSvg from '/@/assets/icons/group.svg';
const teamResource = ref<Array<any>>([]);
const [registerDrawer, { openDrawer }] = useDrawer();
@@ -176,7 +186,7 @@
});
if (data.status === 'success') {
createMessage.success(t('flink.resource.deleteResource') +
t('flink.resource.success'));
- reload();
+ await reload();
updateTeamResource();
} else {
createMessage.error(t('flink.resource.deleteResource') +
t('flink.resource.fail'));
@@ -202,3 +212,17 @@
updateTeamResource();
});
</script>
+
+<style lang="less" scoped>
+ .svg-icon {
+ display: inline-block;
+ width: 14px;
+ height: 14px;
+
+ .svg-connector {
+ svg path {
+ fill: #fff0f6 !important;
+ }
+ }
+ }
+</style>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
index 7c6db92e2..626180012 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
@@ -28,8 +28,10 @@
import { Icon } from '/@/components/Icon';
import { useMonaco } from '/@/hooks/web/useMonaco';
import { Tabs, Alert, Tag, Space } from 'ant-design-vue';
- import { fetchUpload } from '/@/api/flink/app/app';
+ import { fetchUpload } from '/@/api/flink/resource';
+
import UploadJobJar from '/@/views/flink/app/components/UploadJobJar.vue';
+ import { ResourceTypeEnum } from '/@/views/flink/resource/resource.data';
interface DependencyType {
artifactId: string;
@@ -136,9 +138,8 @@
loading.value = true;
const formData = new FormData();
formData.append('file', data.file);
- await fetchUpload(formData);
dependency.jar = {};
- dependency.jar[data.file.name] = data.file.name;
+ dependency.jar[data.file.name] = await fetchUpload(formData);
handleUpdateDependency();
} catch (error) {
console.error(error);
@@ -155,15 +156,15 @@
deps.push(dependency.pom[v]);
});
Object.keys(dependency.jar).forEach((v: string) => {
- jars.push(v);
+ jars.push(v + ':' + dependency.jar[v]);
});
-
dependencyRecords.value = deps;
uploadJars.value = jars;
}
function handleRemoveJar(jar: string) {
- delete dependency.jar[jar];
+ console.log(jar);
+ delete dependency.jar[jar.split(':')[0]];
handleUpdateDependency();
}
@@ -211,7 +212,7 @@
</script>
<template>
- <template v-if="props.formModel.resourceType == 'FLINK_APP'">
+ <template v-if="props.formModel.resourceType === ResourceTypeEnum.FLINK_APP">
<UploadJobJar :custom-request="handleCustomDepsRequest"
v-model:loading="loading" />
</template>
<template v-else>
@@ -237,7 +238,7 @@
<template #message>
<Space>
<Tag class="tag-dependency" color="#108ee9">JAR</Tag>
- {{ jar }}
+ {{ jar.split(':')[0] }}
<Icon
icon="ant-design:close-outlined"
class="icon-close cursor-pointer"
@@ -251,7 +252,9 @@
</template>
<style lang="less">
- @import url('/@/views/flink/app/styles/Add.less');
+ .dependency-box {
+ margin-top: 10px;
+ }
.apply-pom {
z-index: 99;
position: absolute;
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
index 2d217d3fe..ba9c1e61c 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
@@ -41,18 +41,20 @@
<script lang="ts" setup>
import { ref, computed, unref } from 'vue';
- import { BasicForm, FormSchema, useForm } from '/@/components/Form';
+ import { BasicForm, useForm } from '/@/components/Form';
import { BasicDrawer, useDrawerInner } from '/@/components/Drawer';
import { Icon } from '/@/components/Icon';
import { useI18n } from '/@/hooks/web/useI18n';
import Resource from '/@/views/flink/resource/components/Resource.vue';
- import { fetchAddResource, fetchUpdateResource } from
'/@/api/flink/resource';
- import { EngineTypeEnum } from '/@/views/flink/resource/resource.data';
+ import { fetchAddResource, fetchUpdateResource, checkResource } from
'/@/api/flink/resource';
+ import { EngineTypeEnum, ResourceTypeEnum } from
'/@/views/flink/resource/resource.data';
import {
+ renderEngineType,
renderResourceType,
renderStreamParkResourceGroup,
} from '/@/views/flink/resource/useResourceRender';
import { useMessage } from '/@/hooks/web/useMessage';
+ import { exceptionPropWidth } from '/@/utils';
const emit = defineEmits(['success', 'register']);
@@ -70,7 +72,7 @@
const resourceId = ref<Nullable<number>>(null);
const resourceRef = ref();
- const getResourceFormSchema = computed((): FormSchema[] => {
+ const getResourceFormSchema = computed(() => {
return [
{
field: 'resourceType',
@@ -81,18 +83,22 @@
{ required: true, message:
t('flink.resource.form.resourceTypeIsRequiredMessage') },
],
},
+ {
+ field: 'resourceName',
+ label: t('flink.resource.resourceName'),
+ component: 'Input',
+ ifShow: ({ values }) => values?.resourceType !==
ResourceTypeEnum.CONNECTOR,
+ componentProps: { placeholder:
t('flink.resource.resourceNamePlaceholder') },
+ rules: [
+ { required: true, message:
t('flink.resource.form.resourceNameIsRequiredMessage') },
+ ],
+ },
{
field: 'engineType',
label: t('flink.resource.engineType'),
component: 'Select',
+ render: ({ model }) => renderEngineType({ model }),
defaultValue: EngineTypeEnum.FLINK,
- componentProps: {
- placeholder: t('flink.resource.engineTypePlaceholder'),
- options: [
- { label: 'apache flink', value: EngineTypeEnum.FLINK, disabled:
false },
- { label: 'apache spark', value: EngineTypeEnum.SPARK, disabled:
true },
- ],
- },
rules: [{ required: true, message:
t('flink.resource.form.engineTypeIsRequiredMessage') }],
},
{
@@ -100,7 +106,7 @@
label: t('flink.resource.groupName'),
component: 'Input',
componentProps: { placeholder:
t('flink.resource.groupNamePlaceholder') },
- ifShow: ({ values }) => values?.resourceType == 'GROUP',
+ ifShow: ({ values }) => values?.resourceType ===
ResourceTypeEnum.GROUP,
rules: [{ required: true, message:
t('flink.resource.groupNameIsRequiredMessage') }],
},
{
@@ -109,21 +115,21 @@
component: 'Select',
render: ({ model }) =>
renderStreamParkResourceGroup({ model, resources:
unref(props.teamResource) }),
- ifShow: ({ values }) => values?.resourceType == 'GROUP',
+ ifShow: ({ values }) => values?.resourceType ===
ResourceTypeEnum.GROUP,
},
{
field: 'dependency',
label: t('flink.resource.addResource'),
component: 'Input',
slot: 'resource',
- ifShow: ({ values }) => values?.resourceType !== 'GROUP',
+ ifShow: ({ values }) => values?.resourceType !==
ResourceTypeEnum.GROUP,
},
{
field: 'mainClass',
label: t('flink.app.mainClass'),
component: 'Input',
componentProps: { placeholder:
t('flink.app.addAppTips.mainClassPlaceholder') },
- ifShow: ({ values }) => values?.resourceType == 'FLINK_APP',
+ ifShow: ({ values }) => values?.resourceType ===
ResourceTypeEnum.FLINK_APP,
rules: [{ required: true, message:
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
},
{
@@ -148,17 +154,16 @@
const [registerDrawer, { setDrawerProps, closeDrawer }] = useDrawerInner(
async (data: Recordable) => {
unref(resourceRef)?.setDefaultValue({});
- resetFields();
+ await resetFields();
setDrawerProps({ confirmLoading: false });
isUpdate.value = !!data?.isUpdate;
if (unref(isUpdate)) {
resourceId.value = data.record.id;
- setFieldsValue(data.record);
-
- if (data.record?.resourceType == 'GROUP') {
- setFieldsValue({ resourceGroup: JSON.parse(data.record.resource ||
'[]') });
+ await setFieldsValue(data.record);
+ if (data.record?.resourceType === ResourceTypeEnum.GROUP) {
+ await setFieldsValue({ resourceGroup:
JSON.parse(data.record.resource || '[]') });
} else {
- setFieldsValue({ dependency: data.record.resource });
+ await setFieldsValue({ dependency: data.record.resource });
unref(resourceRef)?.setDefaultValue(JSON.parse(data.record.resource
|| '{}'));
}
}
@@ -172,17 +177,17 @@
// form submit
async function handleSubmit() {
try {
+ const id = resourceId.value;
+ resourceId.value = null;
const values = await validate();
let resourceJson = '';
-
- if (values.resourceType == 'GROUP') {
+ if (values.resourceType == ResourceTypeEnum.GROUP) {
resourceJson = JSON.stringify(values.resourceGroup);
} else {
const resource: { pom?: string; jar?: string } = {};
unref(resourceRef).handleApplyPom();
const dependencyRecords = unref(resourceRef)?.dependencyRecords;
const uploadJars = unref(resourceRef)?.uploadJars;
-
if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
if (unref(dependencyRecords).length > 1) {
Swal.fire('Failed', t('flink.resource.multiPomTip'), 'error');
@@ -210,16 +215,69 @@
}
resourceJson = JSON.stringify(resource);
+ // check resource
+ const resp = await checkResource({
+ id: id,
+ resource: resourceJson,
+ ...values,
+ });
+ const state = resp['state'];
+ switch (state) {
+ case 1:
+ // download error
+ if (resource.pom?.length > 0) {
+ Swal.fire({
+ icon: 'error',
+ title: t('sys.api.errorTip'),
+ width: exceptionPropWidth(),
+ html: '<pre class="api-exception">' + resp['exception'] +
'</pre>',
+ focusConfirm: false,
+ });
+ } else {
+ Swal.fire('Failed', t('flink.resource.jarFileErrorTip'),
'error');
+ }
+ break;
+ case 2:
+ if (values.resourceType == ResourceTypeEnum.FLINK_APP) {
+ Swal.fire('Failed', t('flink.resource.mainNullTip'), 'error');
+ }
+ if (values.resourceType == ResourceTypeEnum.CONNECTOR) {
+ Swal.fire('Failed', t('flink.resource.connectorInvalidTip'),
'error');
+ }
+ break;
+ case 3:
+ Swal.fire(
+ 'Failed',
+ t('flink.resource.connectorInfoErrorTip').concat(':
').concat(resp['name']),
+ 'error',
+ );
+ break;
+ case 4:
+ Swal.fire('Failed', t('flink.resource.connectorExistsTip'),
'error');
+ break;
+ case 5:
+ Swal.fire('Failed', t('flink.resource.connectorModifyTip'),
'error');
+ break;
+ case 0:
+ const connector = resp['connector'] || null;
+ setDrawerProps({ confirmLoading: true });
+ await (isUpdate.value
+ ? fetchUpdateResource({
+ id: id,
+ resource: resourceJson,
+ connector: connector,
+ ...values,
+ })
+ : fetchAddResource({ resource: resourceJson, connector:
connector, ...values }));
+ unref(resourceRef)?.setDefaultValue({});
+ closeDrawer();
+ emit('success', isUpdate.value);
+ break;
+ default:
+ break;
+ }
+ await resetFields();
}
-
- setDrawerProps({ confirmLoading: true });
- await (isUpdate.value
- ? fetchUpdateResource({ id: resourceId.value, resource: resourceJson,
...values })
- : fetchAddResource({ resource: resourceJson, ...values }));
- unref(resourceRef)?.setDefaultValue({});
- resetFields();
- closeDrawer();
- emit('success', isUpdate.value);
} finally {
setDrawerProps({ confirmLoading: false });
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
index 57b2c077b..1a35f1b22 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
@@ -16,8 +16,10 @@
*/
import { Select, Tag } from 'ant-design-vue';
import { useI18n } from '/@/hooks/web/useI18n';
-import { ResourceTypeEnum } from '/@/views/flink/resource/resource.data';
+import { EngineTypeEnum, ResourceTypeEnum } from
'/@/views/flink/resource/resource.data';
+
import flinkAppSvg from '/@/assets/icons/flink2.svg';
+import sparkSvg from '/@/assets/icons/spark.svg';
import connectorSvg from '/@/assets/icons/connector.svg';
import udxfSvg from '/@/assets/icons/fx.svg';
import normalJarSvg from '/@/assets/icons/jar.svg';
@@ -61,6 +63,39 @@ export const renderResourceType = ({ model }) => {
);
};
+/* render resource type label */
+export const renderEngineType = ({ model }) => {
+ const renderOptions = () => {
+ const options = [
+ { label: 'Apache Flink', value: EngineTypeEnum.FLINK, disabled: false,
src: flinkAppSvg },
+ { label: 'Apache Spark', value: EngineTypeEnum.SPARK, disabled: true,
src: sparkSvg },
+ ];
+ return options.map(({ label, value, disabled, src }) => {
+ return (
+ <Select.Option key={value} label={label} disabled={disabled}>
+ <div>
+ <img src={src} style="display: inline-block; width: 20px; height:
20px"></img>
+ <span style="vertical-align: middle; margin-left:
5px;">{label}</span>
+ </div>
+ </Select.Option>
+ );
+ });
+ };
+
+ return (
+ <div>
+ <Select
+ allow-clear
+ placeholder={t('flink.resource.engineTypePlaceholder')}
+ value={model.engineType}
+ onChange={(value) => (model.engineType = value)}
+ >
+ {renderOptions()}
+ </Select>
+ </div>
+ );
+};
+
export const renderStreamParkResourceGroup = ({ model, resources }) => {
const renderOptions = () => {
console.log('resources', resources);
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
index 77617b7a0..afb6b49ab 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
@@ -22,7 +22,7 @@ import java.util.Map;
public class LoadStatusFailedException extends IOException {
- static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
private final Map<String, Object> response;
private boolean reCreateLabel;
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 2b07cbc74..03eb83839 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,6 +43,7 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
+import java.util.{List => JavaList, Set => JavaSet}
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
@@ -161,6 +162,10 @@ object MavenTool extends Logger {
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
+ @throws[Exception]
+ def resolveArtifacts(mavenArtifact: Artifact): JavaList[File] =
resolveArtifacts(
+ Set(mavenArtifact))
+
/**
* Resolve the collectoin of artifacts, Artifacts will be download to
ConfigConst.MAVEN_LOCAL_DIR
* if necessary. notes: Only compile scope dependencies will be resolved.
@@ -171,13 +176,16 @@ object MavenTool extends Logger {
* jar File Object of resolved artifacts
*/
@throws[Exception]
- def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
- if (mavenArtifacts == null) Set.empty[File];
+ def resolveArtifacts(mavenArtifacts: JavaSet[Artifact]): JavaList[File] = {
+ if (mavenArtifacts == null) List.empty[File]
else {
val (repoSystem, session) = getMavenEndpoint()
val artifacts = mavenArtifacts.map(
e => {
- new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version)
+ val artifact =
+ new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version)
+ artifact.getProperties
+ artifact
})
logInfo(s"start resolving dependencies: ${artifacts.mkString}")
@@ -201,7 +209,7 @@ object MavenTool extends Logger {
repoSystem
.resolveArtifacts(session, artReqs)
.map(_.getArtifact.getFile)
- .toSet
+ .toList
}
}
@@ -236,7 +244,7 @@ object MavenTool extends Logger {
(repoSystem, session)
}
- class ShadeFilter extends Filter {
+ private[this] class ShadeFilter extends Filter {
override def canFilter(jar: File): Boolean = true
override def isFiltered(name: String): Boolean = {
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..b9167d437 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -28,6 +28,8 @@ import org.apache.commons.codec.digest.DigestUtils
import java.io.{File, FileInputStream, IOException}
+import scala.collection.convert.ImplicitConversions._
+
/** Building pipeline for flink yarn application mode */
class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildRequest)
extends BuildPipeline {
@@ -59,7 +61,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
case DevelopmentMode.FLINK_SQL =>
val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs
- case _ => Set[String]()
+ case _ => List[String]()
}
}.getOrElse(throw getError.exception)
diff --git
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..2bd91b8a7 100644
---
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -26,6 +26,7 @@ import org.scalatest.wordspec.AnyWordSpec
import java.io.File
import java.util.jar.JarFile
+import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
class MavenToolSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers {