This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch job-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/job-state by this push:
new 2249c5ebf [Improve] job maven dependency support exclusion
2249c5ebf is described below
commit 2249c5ebf1ee0f0e288a6ed22570062cca179f60
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 11 20:17:28 2023 +0800
[Improve] job maven dependency support exclusion
---
.../console/core/bean/MavenDependency.java | 95 ++++++++++++++++
.../console/core/bean/MavenExclusion.java | 48 +++++----
.../streampark/console/core/bean/MavenPom.java | 82 ++++++++++++++
.../console/core/entity/Application.java | 119 ++-------------------
.../streampark/console/core/entity/FlinkSql.java | 7 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 20 ++--
.../core/service/impl/ApplicationServiceImpl.java | 7 +-
.../components/Table/src/components/HeaderCell.vue | 2 +-
.../src/views/flink/app/components/Dependency.vue | 9 +-
.../src/views/flink/app/utils/Pom.ts | 77 +++++--------
.../{DependencyInfo.scala => MavenArtifact.scala} | 10 +-
.../streampark/flink/packer/maven/MavenTool.scala | 8 +-
.../flink/packer/pipeline/BuildRequest.scala | 16 +--
.../impl/FlinkK8sApplicationBuildPipeline.scala | 2 +-
.../streampark/flink/packer/MavenToolSpec.scala | 4 +-
15 files changed, 278 insertions(+), 228 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
new file mode 100644
index 000000000..aad4d48ec
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.FileUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+import lombok.SneakyThrows;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Data
+public class MavenDependency {
+
+ private Set<MavenPom> pom = Collections.emptySet();
+
+ private Set<String> jar = Collections.emptySet();
+
+ @SneakyThrows
+ public static MavenDependency of(String dependency) {
+ if (StringUtils.isNotBlank(dependency)) {
+ return JacksonUtils.read(dependency, MavenDependency.class);
+ }
+ return new MavenDependency();
+ }
+
+ public boolean eq(MavenDependency that) {
+ if (that == null) {
+ return false;
+ }
+
+ if (this.pom.size() != that.pom.size()
+ || this.jar.size() != that.jar.size()
+ || !this.pom.containsAll(that.pom)) {
+ return false;
+ }
+
+ File localJar = WebUtils.getAppTempDir();
+ File localUploads = new File(Workspace.local().APP_UPLOADS());
+ HashSet<String> otherJars = new HashSet<>(that.jar);
+ for (String jarName : jar) {
+ if (!otherJars.contains(jarName)
+ || !FileUtils.equals(new File(localJar, jarName), new
File(localUploads, jarName))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public MavenArtifact toMavenArtifact() {
+ List<Artifact> mvnArts =
+ this.pom.stream()
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(),
+ pom.getArtifactId(),
+ pom.getVersion(),
+ pom.getClassifier()))
+ .collect(Collectors.toList());
+ List<String> extJars =
+ this.jar.stream()
+ .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
+ .collect(Collectors.toList());
+ return new MavenArtifact(mvnArts, extJars);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
similarity index 52%
copy from
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
index c80e416e6..9ab025f36 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
@@ -15,29 +15,41 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.packer.maven
+package org.apache.streampark.console.core.bean;
-import java.util.{List => JavaList}
+import lombok.Data;
-import scala.collection.JavaConversions._
+import java.util.Objects;
-/**
- * @param mavenArts
- * collection of maven artifacts
- * @param extJarLibs
- * collection of jar lib paths, which elements can be a directory or file
path.
- */
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
+@Data
+public class MavenExclusion {
- def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
- this(mavenArts.toSet, extJarLibs.toSet)
- }
+ private String groupId;
- def merge(jarLibs: Set[String]): DependencyInfo =
- if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
+ private String artifactId;
-}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
-object DependencyInfo {
- def empty: DependencyInfo = new DependencyInfo()
+ MavenExclusion that = (MavenExclusion) o;
+ if (this.groupId == null || that.groupId == null) {
+ return false;
+ }
+ if (this.artifactId == null || that.artifactId == null) {
+ return false;
+ }
+ return this.groupId.equals(that.groupId) &&
this.artifactId.equals(that.artifactId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, artifactId);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
new file mode 100644
index 000000000..85e641bdf
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+@Data
+public class MavenPom {
+ private String groupId;
+ private String artifactId;
+ private String version;
+ private String classifier;
+ private Set<MavenExclusion> exclusions = Collections.emptySet();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MavenPom that = (MavenPom) o;
+ boolean gav =
+ this.groupId.equals(that.groupId)
+ && this.artifactId.equals(that.artifactId)
+ && this.version.equals(that.version)
+ && this.classifier.equals(that.classifier);
+
+ if (gav) {
+ Set<MavenExclusion> thisEx =
+ this.exclusions == null ? Collections.emptySet() : this.exclusions;
+ Set<MavenExclusion> thatEx =
+ that.exclusions == null ? Collections.emptySet() : that.exclusions;
+ return thisEx.containsAll(thatEx);
+ }
+ return false;
+ }
+
+ public String artifactName() {
+ if (StringUtils.isBlank(classifier)) {
+ return String.format("%s-%s.jar", artifactId, version);
+ }
+ return String.format("%s-%s-%s.jar", artifactId, version, classifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, artifactId, version, classifier);
+ }
+
+ @Override
+ public String toString() {
+ return groupId + ":" + artifactId + ":" + version + getClassifier();
+ }
+
+ public String getClassifier() {
+ return StringUtils.isBlank(classifier) ? "" : ":" + classifier;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 3e24e98cb..d963c5477 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -26,20 +26,17 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
-import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.enums.ResourceFrom;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
-import org.apache.streampark.flink.packer.maven.Artifact;
-import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
import org.apache.commons.lang3.StringUtils;
@@ -49,24 +46,19 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
-import java.io.File;
import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
import static org.apache.streampark.console.core.enums.FlinkAppState.of;
@@ -487,13 +479,13 @@ public class Application implements Serializable {
@JsonIgnore
@SneakyThrows
- public Dependency getDependencyObject() {
- return Dependency.toDependency(this.dependency);
+ public MavenDependency getMavenDependency() {
+ return MavenDependency.of(this.dependency);
}
@JsonIgnore
- public DependencyInfo getDependencyInfo() {
- return
Application.Dependency.toDependency(getDependency()).toJarPackDeps();
+ public MavenArtifact getMavenArtifact() {
+ return getMavenDependency().toMavenArtifact();
}
@JsonIgnore
@@ -650,65 +642,6 @@ public class Application implements Serializable {
return ExecutionMode.YARN_PER_JOB.equals(mode) ||
ExecutionMode.YARN_APPLICATION.equals(mode);
}
- @Data
- public static class Dependency {
- private List<Pom> pom = Collections.emptyList();
- private List<String> jar = Collections.emptyList();
-
- @SneakyThrows
- public static Dependency toDependency(String dependency) {
- if (Utils.notEmpty(dependency)) {
- return JacksonUtils.read(dependency, new TypeReference<Dependency>()
{});
- }
- return new Dependency();
- }
-
- public boolean isEmpty() {
- return pom.isEmpty() && jar.isEmpty();
- }
-
- public boolean eq(Dependency other) {
- if (other == null) {
- return false;
- }
- if (this.isEmpty() && other.isEmpty()) {
- return true;
- }
-
- if (this.pom.size() != other.pom.size() || this.jar.size() !=
other.jar.size()) {
- return false;
- }
- File localJar = WebUtils.getAppTempDir();
- File localUploads = new File(Workspace.local().APP_UPLOADS());
- HashSet<String> otherJars = new HashSet<>(other.jar);
- for (String jarName : jar) {
- if (!otherJars.contains(jarName)
- || !FileUtils.equals(new File(localJar, jarName), new
File(localUploads, jarName))) {
- return false;
- }
- }
- return new HashSet<>(pom).containsAll(other.pom);
- }
-
- public DependencyInfo toJarPackDeps() {
- List<Artifact> mvnArts =
- this.pom.stream()
- .map(
- pom ->
- new Artifact(
- pom.getGroupId(),
- pom.getArtifactId(),
- pom.getVersion(),
- pom.getClassifier()))
- .collect(Collectors.toList());
- List<String> extJars =
- this.jar.stream()
- .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
- .collect(Collectors.toList());
- return new DependencyInfo(mvnArts, extJars);
- }
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -724,44 +657,4 @@ public class Application implements Serializable {
public int hashCode() {
return Objects.hash(id);
}
-
- @Data
- public static class Pom {
- private String groupId;
- private String artifactId;
- private String version;
- private String classifier;
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- return this.toString().equals(o.toString());
- }
-
- public String artifactName() {
- if (StringUtils.isBlank(classifier)) {
- return String.format("%s-%s.jar", artifactId, version);
- }
- return String.format("%s-%s-%s.jar", artifactId, version, classifier);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(groupId, artifactId, version, classifier);
- }
-
- @Override
- public String toString() {
- return groupId + ":" + artifactId + ":" + version + getClassifier();
- }
-
- public String getClassifier() {
- return StringUtils.isEmpty(classifier) ? "" : ":" + classifier;
- }
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index 57be322d7..d02348de3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.enums.ChangedType;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -82,10 +83,8 @@ public class FlinkSql {
// 1) determine if sql statement has changed
boolean sqlDifference =
!this.getSql().trim().equals(target.getSql().trim());
// 2) determine if dependency has changed
- Application.Dependency thisDependency =
- Application.Dependency.toDependency(this.getDependency());
- Application.Dependency targetDependency =
- Application.Dependency.toDependency(target.getDependency());
+ MavenDependency thisDependency = MavenDependency.of(this.getDependency());
+ MavenDependency targetDependency =
MavenDependency.of(target.getDependency());
boolean depDifference = !thisDependency.eq(targetDependency);
if (sqlDifference && depDifference) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 7118a97c4..5b71d0d57 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -307,7 +307,7 @@ public class AppBuildPipeServiceImpl
mainClass,
yarnProvidedPath,
app.getDevelopmentMode(),
- app.getDependencyInfo());
+ app.getMavenArtifact());
log.info("Submit params to building pipeline : {}", yarnAppRequest);
return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
case YARN_PER_JOB:
@@ -322,7 +322,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo());
+ app.getMavenArtifact());
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
@@ -335,7 +335,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo(),
+ app.getMavenArtifact(),
app.getClusterId(),
app.getK8sNamespace());
log.info("Submit params to building pipeline : {}",
k8sSessionBuildRequest);
@@ -350,7 +350,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo(),
+ app.getMavenArtifact(),
app.getClusterId(),
app.getK8sNamespace(),
app.getFlinkImage(),
@@ -379,8 +379,8 @@ public class AppBuildPipeServiceImpl
FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
if (app.isFlinkSqlJob() || app.isUploadJob()) {
- if (!app.getDependencyObject().getJar().isEmpty()) {
- for (String jar : app.getDependencyObject().getJar()) {
+ if (!app.getMavenDependency().getJar().isEmpty()) {
+ for (String jar : app.getMavenDependency().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File localUploadJar = new File(localUploadDIR, jar);
if (!localJar.exists() && !localUploadJar.exists()) {
@@ -420,14 +420,12 @@ public class AppBuildPipeServiceImpl
jars.add(libJar);
// 2). jar dependency
- app.getDependencyObject()
- .getJar()
- .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+ app.getMavenDependency().getJar().forEach(jar -> jars.add(new
File(localUploadDIR, jar)));
// 3). pom dependency
- if (!app.getDependencyObject().getPom().isEmpty()) {
+ if (!app.getMavenDependency().getPom().isEmpty()) {
Set<Artifact> artifacts =
- app.getDependencyObject().getPom().stream()
+ app.getMavenDependency().getPom().stream()
.filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
.map(
pom ->
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e894bd626..a6d88e433 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -40,6 +40,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
@@ -852,10 +853,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setRelease(ReleaseState.NEED_RELEASE.get());
if (application.isUploadJob()) {
- Application.Dependency thisDependency =
- Application.Dependency.toDependency(appParam.getDependency());
- Application.Dependency targetDependency =
- Application.Dependency.toDependency(application.getDependency());
+ MavenDependency thisDependency =
MavenDependency.of(appParam.getDependency());
+ MavenDependency targetDependency =
MavenDependency.of(application.getDependency());
if (!thisDependency.eq(targetDependency)) {
application.setDependency(appParam.getDependency());
diff --git
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854c5..35c080269 100644
---
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
- default: () => ({}) as BasicColumn,
+ default: () => ({} as BasicColumn),
},
},
setup(props) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
index bf6aa9543..300ff1390 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
@@ -125,26 +125,23 @@
mvnPom.classifier = classifier;
}
const id = getId(mvnPom);
- const pomExclusion = {};
+ const pomExclusion = [];
if (exclusion != null) {
const exclusions = exclusion.split('<exclusion>');
exclusions.forEach((e) => {
if (e != null && e.length > 0) {
const e_group = e.match(groupExp) ?
groupExp.exec(e)![1].trim() : null;
const e_artifact = e.match(artifactExp) ?
artifactExp.exec(e)![1].trim() : null;
- const id = e_group + '_' + e_artifact;
- pomExclusion[id] = {
+ pomExclusion.push({
groupId: e_group,
artifactId: e_artifact,
- };
+ });
}
});
}
mvnPom.exclusions = pomExclusion;
dependency.pom[id] = mvnPom;
}
- } else {
- console.error('dependency error...');
}
});
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
index 87b8ab397..1e0291fa8 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
@@ -22,65 +22,40 @@ export function toPomString(pom) {
const classifier = pom.classifier;
const exclusions = pom.exclusions || [];
let exclusionString = '';
- let pomString = '';
+ let classifierString = '';
if (exclusions.length > 0) {
exclusions.forEach((item) => {
exclusionString +=
- ' <exclusion>\n' +
- ' <groupId>' +
+ ' <exclusion>\n' +
+ ' <groupId>' +
item.groupId +
'</groupId>\n' +
- ' <artifactId>' +
+ ' <artifactId>' +
item.artifactId +
'</artifactId>\n' +
- ' </exclusion>\n';
+ ' </exclusion>\n';
});
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' <exclusions>\n' +
- exclusionString +
- ' </exclusions>\n' +
- ' </dependency>';
- } else {
- if (classifier != null) {
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' <classifier>' +
- classifier +
- '</classifier>\n' +
- ' </dependency>';
- } else {
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' </dependency>';
- }
+ exclusionString = ' <exclusions>\n' + exclusionString + '
</exclusions>\n';
}
+
+ if (classifier != null) {
+ classifierString = ' <classifier>' + classifier + '</classifier>\n';
+ }
+
+ const pomString =
+ ' <dependency>\n' +
+ ' <groupId>' +
+ groupId +
+ '</groupId>\n' +
+ ' <artifactId>' +
+ artifactId +
+ '</artifactId>\n' +
+ ' <version>' +
+ version +
+ '</version>\n' +
+ classifierString +
+ exclusionString +
+ ' </dependency>';
+
return pomString;
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
similarity index 79%
rename from
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
rename to
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
index c80e416e6..0b39f53d1 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
@@ -27,17 +27,17 @@ import scala.collection.JavaConversions._
* @param extJarLibs
* collection of jar lib paths, which elements can be a directory or file
path.
*/
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
+case class MavenArtifact(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
this(mavenArts.toSet, extJarLibs.toSet)
}
- def merge(jarLibs: Set[String]): DependencyInfo =
- if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
+ def merge(jarLibs: Set[String]): MavenArtifact =
+ if (jarLibs != null) MavenArtifact(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
}
-object DependencyInfo {
- def empty: DependencyInfo = new DependencyInfo()
+object MavenArtifact {
+ def empty: MavenArtifact = new MavenArtifact()
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 35e0b1a6d..bfd9de738 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -160,7 +160,7 @@ object MavenTool extends Logger {
/**
* Build a fat-jar with custom jar librarties and maven artifacts.
*
- * @param dependencyInfo
+ * @param artifact
* maven artifacts and jar libraries for building a fat-jar
* @param outFatJarPath
* output paths of fat-jar, like "/streampark/workspace/233/my-fat.jar"
@@ -168,10 +168,10 @@ object MavenTool extends Logger {
@throws[Exception]
def buildFatJar(
@Nullable mainClass: String,
- @Nonnull dependencyInfo: DependencyInfo,
+ @Nonnull artifact: MavenArtifact,
@Nonnull outFatJarPath: String): File = {
- val jarLibs = dependencyInfo.extJarLibs
- val arts = dependencyInfo.mavenArts
+ val jarLibs = artifact.extJarLibs
+ val arts = artifact.mavenArts
if (jarLibs.isEmpty && arts.isEmpty) {
throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.")
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 5abc55b92..41c56aa6b 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{FlinkVersion,
Workspace}
import org.apache.streampark.common.enums.{DevelopmentMode, ExecutionMode}
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
import org.apache.streampark.flink.packer.docker.DockerConf
-import org.apache.streampark.flink.packer.maven.DependencyInfo
+import org.apache.streampark.flink.packer.maven.MavenArtifact
import scala.collection.mutable.ArrayBuffer
@@ -44,17 +44,17 @@ sealed trait FlinkBuildParam extends BuildParam {
def flinkVersion: FlinkVersion
- def dependencyInfo: DependencyInfo
+ def dependency: MavenArtifact
def customFlinkUserJar: String
- lazy val providedLibs: DependencyInfo = {
+ lazy val providedLibs: MavenArtifact = {
val providedLibs =
ArrayBuffer(localWorkspace.APP_JARS, localWorkspace.APP_PLUGINS,
customFlinkUserJar)
if (developmentMode == DevelopmentMode.FLINK_SQL) {
providedLibs +=
s"${localWorkspace.APP_SHIMS}/flink-${flinkVersion.majorVersion}"
}
- dependencyInfo.merge(providedLibs.toSet)
+ dependency.merge(providedLibs.toSet)
}
def getShadedJarPath(rootWorkspace: String): String = {
@@ -79,7 +79,7 @@ case class FlinkK8sSessionBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo,
+ dependency: MavenArtifact,
clusterId: String,
k8sNamespace: String)
extends FlinkK8sBuildParam
@@ -92,7 +92,7 @@ case class FlinkK8sApplicationBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo,
+ dependency: MavenArtifact,
clusterId: String,
k8sNamespace: String,
flinkBaseImage: String,
@@ -110,7 +110,7 @@ case class FlinkRemotePerJobBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo)
+ dependency: MavenArtifact)
extends FlinkBuildParam
case class FlinkYarnApplicationBuildRequest(
@@ -118,5 +118,5 @@ case class FlinkYarnApplicationBuildRequest(
mainClass: String,
yarnProvidedPath: String,
developmentMode: DevelopmentMode,
- dependencyInfo: DependencyInfo)
+ dependencyInfo: MavenArtifact)
extends BuildParam
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 72d47991e..060209ff9 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -91,7 +91,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(3) {
val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
val extJarLibs = request.developmentMode match {
- case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
+ case DevelopmentMode.FLINK_SQL => request.dependency.extJarLibs
case DevelopmentMode.CUSTOM_CODE => Set[String]()
}
val shadedJar =
diff --git
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..06831510c 100644
---
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.packer
-import org.apache.streampark.flink.packer.maven.{Artifact, DependencyInfo,
MavenTool}
+import org.apache.streampark.flink.packer.maven.{Artifact, MavenArtifact,
MavenTool}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterAll
@@ -87,7 +87,7 @@ class MavenToolSpec extends AnyWordSpec with
BeforeAndAfterAll with Matchers {
val fatJarPath = outputDir.concat("fat-3.jar")
val fatJar = MavenTool.buildFatJar(
null,
- DependencyInfo(
+ MavenArtifact(
Set(Artifact.of("org.apache.flink:flink-connector-kafka_2.11:1.13.0")),
Set(path("jars/commons-dbutils-1.7.jar"))),
fatJarPath)