This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 97cc84c77 [Bug] deploy flink job on yarn, get state bug fixed. (#3329)
97cc84c77 is described below
commit 97cc84c77695814d311bd495921095d0c1405856
Author: benjobs <[email protected]>
AuthorDate: Sun Nov 12 00:17:59 2023 +0800
[Bug] deploy flink job on yarn, get state bug fixed. (#3329)
* [Improve] resolve maven artifacts improvement
* [Bug] extract programArgs bug fixed
* [Improve] job maven dependency support exclusion
* [Improve] maven dependency check improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/util/PropertiesUtils.scala | 13 ++-
.../apache/streampark/common/util/YarnUtils.scala | 11 +-
.../common/util/PropertiesUtilsTestCase.scala | 16 ++-
.../console/core/bean/MavenDependency.java | 101 +++++++++++++++++++
.../console/core/bean/MavenExclusion.java | 48 +++++----
.../streampark/console/core/bean/MavenPom.java | 93 +++++++++++++++++
.../console/core/entity/Application.java | 112 ++-------------------
.../streampark/console/core/entity/FlinkSql.java | 9 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 81 +++++++++------
.../core/service/impl/ApplicationServiceImpl.java | 9 +-
.../console/core/task/FlinkRESTAPIWatcher.java | 3 -
.../components/Table/src/components/HeaderCell.vue | 2 +-
.../src/views/flink/app/components/Dependency.vue | 18 ++--
.../flink/app/hooks/useCreateAndEditSchema.ts | 10 +-
.../src/views/flink/app/utils/Pom.ts | 77 +++++---------
.../flink/client/trait/FlinkClientTrait.scala | 47 +++++----
.../streampark/flink/packer/maven/Artifact.scala | 7 +-
.../{DependencyInfo.scala => MavenArtifact.scala} | 10 +-
.../streampark/flink/packer/maven/MavenTool.scala | 102 +++++++++++--------
.../flink/packer/pipeline/BuildRequest.scala | 16 +--
.../impl/FlinkK8sApplicationBuildPipeline.scala | 2 +-
.../streampark/flink/packer/MavenToolSpec.scala | 4 +-
22 files changed, 467 insertions(+), 324 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 3098daa5d..35facd89f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -322,7 +322,18 @@ object PropertiesUtils extends Logger {
}
}
programArgs += value.substring(1, value.length - 1)
- case _ => programArgs += v
+ case _ =>
+ val regexp = "(.*)='(.*)'$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ val regexp = "(.*)=\"(.*)\"$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ programArgs += v
+ }
+ }
}
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 83db1d78c..d20b7bb6e 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.impl.client.HttpClients
+import java.io.IOException
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.util
@@ -257,20 +258,19 @@ object YarnUtils extends Logger {
* url
* @return
*/
+ @throws[IOException]
def restRequest(url: String): String = {
if (url == null) return null
-
url match {
case u if u.matches("^http(|s)://.*") =>
Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
- logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+ throw new IOException(s"yarnUtils authRestRequest error, url:
$u, detail: $e")
} else {
- logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+ throw new IOException(s"yarnUtils restRequest error, url: $u,
detail: $e")
}
- null
}
case _ =>
Try(request(s"${getRMWebAppURL()}/$url")) match {
@@ -281,8 +281,7 @@ object YarnUtils extends Logger {
} match {
case Success(v) => v
case Failure(e) =>
- logError(s"yarnUtils restRequest retry 5 times all failed.
detail: $e")
- null
+ throw new IOException(s"yarnUtils restRequest retry 5 times
all failed. detail: $e")
}
}
}
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 995cfd2ec..827d0f11c 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -24,8 +24,20 @@ import scala.language.postfixOps
class PropertiesUtilsTestCase {
@Test def testExtractProgramArgs(): Unit = {
- val args =
- "mysql-sync-database \n--database employees \n--mysql-conf
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf
sink.label-prefix=label\n--table-conf replication_num=1 "
+ val args = "mysql-sync-table \n" +
+ "--warehouse hdfs:///paimon \n" +
+ "--database test_db \n" +
+ "--table test_table \n" +
+ "--mysql-conf hostname=localhost \n" +
+ "--mysql-conf username=root \n" +
+ "--mysql-conf password=123456 \n" +
+ "--mysql-conf database-name='employees' \n" +
+ "--mysql-conf table-name='employees' \n" +
+ "--catalog-conf metastore=hive \n" +
+ "--catalog-conf uri=thrift://localhost:9083 \n" +
+ "--table-conf bucket=1 \n" +
+ "--table-conf changelog-producer=input \n" +
+ "--table-conf sink.parallelism=1"
val programArgs = new ArrayBuffer[String]()
programArgs ++= PropertiesUtils.extractArguments(args)
println(programArgs)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
new file mode 100644
index 000000000..ec6bb14a8
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.FileUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+import lombok.SneakyThrows;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Data
+public class MavenDependency {
+
+ private Set<MavenPom> pom = Collections.emptySet();
+
+ private Set<String> jar = Collections.emptySet();
+
+ @SneakyThrows
+ public static MavenDependency of(String dependency) {
+ if (StringUtils.isNotBlank(dependency)) {
+ return JacksonUtils.read(dependency, MavenDependency.class);
+ }
+ return new MavenDependency();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ }
+
+ if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+
+ MavenDependency thatDep = (MavenDependency) that;
+
+ if (this.pom.size() != thatDep.pom.size()
+ || this.jar.size() != thatDep.jar.size()
+ || !this.pom.containsAll(thatDep.pom)) {
+ return false;
+ }
+
+ File localJar = WebUtils.getAppTempDir();
+ File localUploads = new File(Workspace.local().APP_UPLOADS());
+ for (String jarName : jar) {
+ if (!thatDep.jar.contains(jarName)
+ || !FileUtils.equals(new File(localJar, jarName), new
File(localUploads, jarName))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public MavenArtifact toMavenArtifact() {
+ List<Artifact> mvnArts =
+ this.pom.stream()
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(),
+ pom.getArtifactId(),
+ pom.getVersion(),
+ pom.getClassifier(),
+ pom.toExclusionString()))
+ .collect(Collectors.toList());
+ List<String> extJars =
+ this.jar.stream()
+ .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
+ .collect(Collectors.toList());
+ return new MavenArtifact(mvnArts, extJars);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
similarity index 52%
copy from
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
index c80e416e6..9ab025f36 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
@@ -15,29 +15,41 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.packer.maven
+package org.apache.streampark.console.core.bean;
-import java.util.{List => JavaList}
+import lombok.Data;
-import scala.collection.JavaConversions._
+import java.util.Objects;
-/**
- * @param mavenArts
- * collection of maven artifacts
- * @param extJarLibs
- * collection of jar lib paths, which elements can be a directory or file
path.
- */
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
+@Data
+public class MavenExclusion {
- def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
- this(mavenArts.toSet, extJarLibs.toSet)
- }
+ private String groupId;
- def merge(jarLibs: Set[String]): DependencyInfo =
- if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
+ private String artifactId;
-}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
-object DependencyInfo {
- def empty: DependencyInfo = new DependencyInfo()
+ MavenExclusion that = (MavenExclusion) o;
+ if (this.groupId == null || that.groupId == null) {
+ return false;
+ }
+ if (this.artifactId == null || that.artifactId == null) {
+ return false;
+ }
+ return this.groupId.equals(that.groupId) &&
this.artifactId.equals(that.artifactId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, artifactId);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
new file mode 100644
index 000000000..21164061c
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Data
+public class MavenPom {
+ private String groupId;
+ private String artifactId;
+ private String version;
+ private String classifier;
+ private Set<MavenExclusion> exclusions = Collections.emptySet();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MavenPom that = (MavenPom) o;
+
+ boolean basic =
+ this.groupId.equals(that.groupId)
+ && this.artifactId.equals(that.artifactId)
+ && this.version.equals(that.version);
+
+ boolean classify =
+ StringUtils.isAllBlank(this.classifier, that.classifier)
+ || this.classifier.equals(that.classifier);
+
+ if (basic && classify) {
+ Set<MavenExclusion> thisEx =
+ this.exclusions == null ? Collections.emptySet() : this.exclusions;
+ Set<MavenExclusion> thatEx =
+ that.exclusions == null ? Collections.emptySet() : that.exclusions;
+ return thisEx.size() == thatEx.size() && thisEx.containsAll(thatEx);
+ }
+ return false;
+ }
+
+ public String artifactName() {
+ if (StringUtils.isBlank(classifier)) {
+ return String.format("%s-%s.jar", artifactId, version);
+ }
+ return String.format("%s-%s-%s.jar", artifactId, version, classifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, artifactId, version, classifier);
+ }
+
+ @Override
+ public String toString() {
+ return groupId + ":" + artifactId + ":" + version + getClassifier();
+ }
+
+ public Set<String> toExclusionString() {
+ return this.exclusions.stream()
+ .map(x -> String.format("%s:%s", x.getGroupId(), x.getArtifactId()))
+ .collect(Collectors.toSet());
+ }
+
+ public String getClassifier() {
+ return StringUtils.isBlank(classifier) ? "" : ":" + classifier;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 5b3aa4db2..d963c5477 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -26,20 +26,17 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
-import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.enums.ResourceFrom;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
-import org.apache.streampark.flink.packer.maven.Artifact;
-import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
import org.apache.commons.lang3.StringUtils;
@@ -49,24 +46,19 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
-import java.io.File;
import java.io.Serializable;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
import static org.apache.streampark.console.core.enums.FlinkAppState.of;
@@ -487,13 +479,13 @@ public class Application implements Serializable {
@JsonIgnore
@SneakyThrows
- public Dependency getDependencyObject() {
- return Dependency.toDependency(this.dependency);
+ public MavenDependency getMavenDependency() {
+ return MavenDependency.of(this.dependency);
}
@JsonIgnore
- public DependencyInfo getDependencyInfo() {
- return
Application.Dependency.toDependency(getDependency()).toJarPackDeps();
+ public MavenArtifact getMavenArtifact() {
+ return getMavenDependency().toMavenArtifact();
}
@JsonIgnore
@@ -650,65 +642,6 @@ public class Application implements Serializable {
return ExecutionMode.YARN_PER_JOB.equals(mode) ||
ExecutionMode.YARN_APPLICATION.equals(mode);
}
- @Data
- public static class Dependency {
- private List<Pom> pom = Collections.emptyList();
- private List<String> jar = Collections.emptyList();
-
- @SneakyThrows
- public static Dependency toDependency(String dependency) {
- if (Utils.notEmpty(dependency)) {
- return JacksonUtils.read(dependency, new TypeReference<Dependency>()
{});
- }
- return new Dependency();
- }
-
- public boolean isEmpty() {
- return pom.isEmpty() && jar.isEmpty();
- }
-
- public boolean eq(Dependency other) {
- if (other == null) {
- return false;
- }
- if (this.isEmpty() && other.isEmpty()) {
- return true;
- }
-
- if (this.pom.size() != other.pom.size() || this.jar.size() !=
other.jar.size()) {
- return false;
- }
- File localJar = WebUtils.getAppTempDir();
- File localUploads = new File(Workspace.local().APP_UPLOADS());
- HashSet<String> otherJars = new HashSet<>(other.jar);
- for (String jarName : jar) {
- if (!otherJars.contains(jarName)
- || !FileUtils.equals(new File(localJar, jarName), new
File(localUploads, jarName))) {
- return false;
- }
- }
- return new HashSet<>(pom).containsAll(other.pom);
- }
-
- public DependencyInfo toJarPackDeps() {
- List<Artifact> mvnArts =
- this.pom.stream()
- .map(
- pom ->
- new Artifact(
- pom.getGroupId(),
- pom.getArtifactId(),
- pom.getVersion(),
- pom.getClassifier()))
- .collect(Collectors.toList());
- List<String> extJars =
- this.jar.stream()
- .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
- .collect(Collectors.toList());
- return new DependencyInfo(mvnArts, extJars);
- }
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -724,37 +657,4 @@ public class Application implements Serializable {
public int hashCode() {
return Objects.hash(id);
}
-
- @Data
- public static class Pom {
- private String groupId;
- private String artifactId;
- private String version;
- private String classifier;
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- return this.toString().equals(o.toString());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(groupId, artifactId, version, classifier);
- }
-
- @Override
- public String toString() {
- return groupId + ":" + artifactId + ":" + version + getClassifier();
- }
-
- private String getClassifier() {
- return StringUtils.isEmpty(classifier) ? "" : ":" + classifier;
- }
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index 57be322d7..d944e14f8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.enums.ChangedType;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -82,12 +83,10 @@ public class FlinkSql {
// 1) determine if sql statement has changed
boolean sqlDifference =
!this.getSql().trim().equals(target.getSql().trim());
// 2) determine if dependency has changed
- Application.Dependency thisDependency =
- Application.Dependency.toDependency(this.getDependency());
- Application.Dependency targetDependency =
- Application.Dependency.toDependency(target.getDependency());
+ MavenDependency thisDependency = MavenDependency.of(this.getDependency());
+ MavenDependency targetDependency =
MavenDependency.of(target.getDependency());
- boolean depDifference = !thisDependency.eq(targetDependency);
+ boolean depDifference = !thisDependency.equals(targetDependency);
if (sqlDifference && depDifference) {
return ChangedType.ALL;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 07c66ff05..33afcf881 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -50,6 +50,7 @@ import
org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
+import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -71,6 +72,7 @@ import
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipe
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -86,11 +88,15 @@ import
org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -164,7 +170,7 @@ public class AppBuildPipeServiceImpl
pipeline.registerWatcher(
new PipeWatcher() {
@Override
- public void onStart(PipeSnapshot snapshot) {
+ public void onStart(PipeSnapshot snapshot) throws Exception {
AppBuildPipeline buildPipeline =
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
saveEntity(buildPipeline);
@@ -301,7 +307,7 @@ public class AppBuildPipeServiceImpl
mainClass,
yarnProvidedPath,
app.getDevelopmentMode(),
- app.getDependencyInfo());
+ app.getMavenArtifact());
log.info("Submit params to building pipeline : {}", yarnAppRequest);
return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
case YARN_PER_JOB:
@@ -316,7 +322,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo());
+ app.getMavenArtifact());
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
@@ -329,7 +335,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo(),
+ app.getMavenArtifact(),
app.getClusterId(),
app.getK8sNamespace());
log.info("Submit params to building pipeline : {}",
k8sSessionBuildRequest);
@@ -344,7 +350,7 @@ public class AppBuildPipeServiceImpl
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
- app.getDependencyInfo(),
+ app.getMavenArtifact(),
app.getClusterId(),
app.getK8sNamespace(),
app.getFlinkImage(),
@@ -364,7 +370,7 @@ public class AppBuildPipeServiceImpl
}
}
- private void prepareJars(Application app) {
+ private void prepareJars(Application app) throws IOException {
File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
if (!localUploadDIR.exists()) {
localUploadDIR.mkdirs();
@@ -373,8 +379,8 @@ public class AppBuildPipeServiceImpl
FsOperator localFS = FsOperator.lfs();
// 1. copy jar to local upload dir
if (app.isFlinkSqlJob() || app.isUploadJob()) {
- if (!app.getDependencyObject().getJar().isEmpty()) {
- for (String jar : app.getDependencyObject().getJar()) {
+ if (!app.getMavenDependency().getJar().isEmpty()) {
+ for (String jar : app.getMavenDependency().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File localUploadJar = new File(localUploadDIR, jar);
if (!localJar.exists() && !localUploadJar.exists()) {
@@ -398,21 +404,10 @@ public class AppBuildPipeServiceImpl
checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
// 2) copy jar to local $app_home/lib
- boolean cleanUpload = false;
File libJar = new File(app.getLocalAppLib(), app.getJar());
- if (!localFS.exists(app.getLocalAppLib())) {
- cleanUpload = true;
- } else {
- if (libJar.exists()) {
- if (!FileUtils.equals(localJar, libJar)) {
- cleanUpload = true;
- }
- } else {
- cleanUpload = true;
- }
- }
-
- if (cleanUpload) {
+ if (!localFS.exists(app.getLocalAppLib())
+ || !libJar.exists()
+ || !FileUtils.equals(localJar, libJar)) {
localFS.mkCleanDirs(app.getLocalAppLib());
localFS.upload(localUploadJar.getAbsolutePath(),
app.getLocalAppLib());
}
@@ -421,22 +416,48 @@ public class AppBuildPipeServiceImpl
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);
- // 1) user jar
+ // 1). user jar
jars.add(libJar);
// 2). jar dependency
- app.getDependencyObject()
- .getJar()
- .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+ app.getMavenDependency().getJar().forEach(jar -> jars.add(new
File(localUploadDIR, jar)));
// 3). pom dependency
- if (!app.getDependencyInfo().mavenArts().isEmpty()) {
-
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
+ if (!app.getMavenDependency().getPom().isEmpty()) {
+ Set<Artifact> artifacts =
+ app.getMavenDependency().getPom().stream()
+ .filter(x -> !new File(localUploadDIR,
x.artifactName()).exists())
+ .map(
+ pom ->
+ new Artifact(
+ pom.getGroupId(),
+ pom.getArtifactId(),
+ pom.getVersion(),
+ pom.getClassifier(),
+ pom.toExclusionString()))
+ .collect(Collectors.toSet());
+ Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
+ jars.addAll(mavenArts);
+ }
+
+ // 4). local uploadDIR to hdfs uploadsDIR
+ String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+ for (File jarFile : jars) {
+ String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+ if (!fsOperator.exists(hdfsUploadPath)) {
+ fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+ } else {
+ InputStream inputStream = Files.newInputStream(jarFile.toPath());
+ if
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+ fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+ }
+ }
}
+ // 5). copy jars to $hdfs_app_home/lib
fsOperator.mkCleanDirs(app.getAppLib());
- // 4). upload jars to appLibDIR
- jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(),
app.getAppLib()));
+ jars.forEach(
+ jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(),
app.getAppLib()));
}
} else {
String appHome = app.getAppHome();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e894bd626..99da52420 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -40,6 +40,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
@@ -852,12 +853,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setRelease(ReleaseState.NEED_RELEASE.get());
if (application.isUploadJob()) {
- Application.Dependency thisDependency =
- Application.Dependency.toDependency(appParam.getDependency());
- Application.Dependency targetDependency =
- Application.Dependency.toDependency(application.getDependency());
+ MavenDependency thisDependency =
MavenDependency.of(appParam.getDependency());
+ MavenDependency targetDependency =
MavenDependency.of(application.getDependency());
- if (!thisDependency.eq(targetDependency)) {
+ if (!thisDependency.equals(targetDependency)) {
application.setDependency(appParam.getDependency());
application.setBuild(true);
} else if (!ObjectUtils.safeEquals(application.getJar(),
appParam.getJar())) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index b734a7a03..fab29a74e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -738,9 +738,6 @@ public class FlinkRESTAPIWatcher {
private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException
{
String result = YarnUtils.restRequest(url);
- if (null == result) {
- return null;
- }
return JacksonUtils.read(result, clazz);
}
diff --git
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854c5..35c080269 100644
---
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
props: {
column: {
type: Object as PropType<BasicColumn>,
- default: () => ({}) as BasicColumn,
+ default: () => ({} as BasicColumn),
},
},
setup(props) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
index bf6aa9543..5c9686a45 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
@@ -33,6 +33,7 @@
import { fetchUpload } from '/@/api/flink/app/app';
import { fetchUploadJars } from '/@/api/flink/app/flinkHistory';
import UploadJobJar from './UploadJobJar.vue';
+ import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
interface DependencyType {
artifactId: string;
@@ -89,10 +90,16 @@
Swal.fire('Failed', t('flink.app.dependencyError'), 'error');
return;
}
- const scalaVersion = props.flinkEnvs.find((v) => v.id ===
versionId)?.scalaVersion;
+
+ let flinkEnv = props.flinkEnvs || [];
+ if (props.flinkEnvs?.length == 0) {
+ flinkEnv = await fetchFlinkEnv();
+ }
+ const scalaVersion = flinkEnv.find((v) => v.id ===
versionId)?.scalaVersion;
if (props.value == null || props.value.trim() === '') {
return;
}
+
const groupExp = /<groupId>([\s\S]*?)<\/groupId>/;
const artifactExp = /<artifactId>([\s\S]*?)<\/artifactId>/;
const versionExp = /<version>([\s\S]*?)<\/version>/;
@@ -125,26 +132,23 @@
mvnPom.classifier = classifier;
}
const id = getId(mvnPom);
- const pomExclusion = {};
+ const pomExclusion = [];
if (exclusion != null) {
const exclusions = exclusion.split('<exclusion>');
exclusions.forEach((e) => {
if (e != null && e.length > 0) {
const e_group = e.match(groupExp) ?
groupExp.exec(e)![1].trim() : null;
const e_artifact = e.match(artifactExp) ?
artifactExp.exec(e)![1].trim() : null;
- const id = e_group + '_' + e_artifact;
- pomExclusion[id] = {
+ pomExclusion.push({
groupId: e_group,
artifactId: e_artifact,
- };
+ });
}
});
}
mvnPom.exclusions = pomExclusion;
dependency.pom[id] = mvnPom;
}
- } else {
- console.error('dependency error...');
}
});
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 752ad30e0..2a1a3b915 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -597,7 +597,13 @@ export const useCreateAndEditSchema = (
},
];
});
+
onMounted(async () => {
+ //get flinkEnv
+ fetchFlinkEnv().then((res) => {
+ flinkEnvs.value = res;
+ });
+
/* Get project data */
fetchSelect({}).then((res) => {
projectList.value = res;
@@ -608,10 +614,6 @@ export const useCreateAndEditSchema = (
alerts.value = res;
});
- //get flinkEnv
- fetchFlinkEnv().then((res) => {
- flinkEnvs.value = res;
- });
//get flinkCluster
fetchFlinkCluster().then((res) => {
flinkClusters.value = res;
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
index 87b8ab397..1e0291fa8 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
@@ -22,65 +22,40 @@ export function toPomString(pom) {
const classifier = pom.classifier;
const exclusions = pom.exclusions || [];
let exclusionString = '';
- let pomString = '';
+ let classifierString = '';
if (exclusions.length > 0) {
exclusions.forEach((item) => {
exclusionString +=
- ' <exclusion>\n' +
- ' <groupId>' +
+ ' <exclusion>\n' +
+ ' <groupId>' +
item.groupId +
'</groupId>\n' +
- ' <artifactId>' +
+ ' <artifactId>' +
item.artifactId +
'</artifactId>\n' +
- ' </exclusion>\n';
+ ' </exclusion>\n';
});
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' <exclusions>\n' +
- exclusionString +
- ' </exclusions>\n' +
- ' </dependency>';
- } else {
- if (classifier != null) {
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' <classifier>' +
- classifier +
- '</classifier>\n' +
- ' </dependency>';
- } else {
- pomString =
- ' <dependency>\n' +
- ' <groupId>' +
- groupId +
- '</groupId>\n' +
- ' <artifactId>' +
- artifactId +
- '</artifactId>\n' +
- ' <version>' +
- version +
- '</version>\n' +
- ' </dependency>';
- }
+ exclusionString = ' <exclusions>\n' + exclusionString + '
</exclusions>\n';
}
+
+ if (classifier != null) {
+ classifierString = ' <classifier>' + classifier + '</classifier>\n';
+ }
+
+ const pomString =
+ ' <dependency>\n' +
+ ' <groupId>' +
+ groupId +
+ '</groupId>\n' +
+ ' <artifactId>' +
+ artifactId +
+ '</artifactId>\n' +
+ ' <version>' +
+ version +
+ '</version>\n' +
+ classifierString +
+ exclusionString +
+ ' </dependency>';
+
return pomString;
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 58dc7f82b..6a2e8c95c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger {
jobGraphFunc(submitRequest, flinkConfig, jarFile)
} match {
case Failure(e) =>
- logWarn(
- s"""\n
- |[flink-submit] JobGraph Submit Plan failed, error detail:
-
|------------------------------------------------------------------
- |${Utils.stringifyException(e)}
-
|------------------------------------------------------------------
- |Now retry submit with RestAPI Plan ...
- |""".stripMargin
- )
Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
case Success(r) => r
- case Failure(e) =>
- logError(
+ case Failure(e1) =>
+ throw new RuntimeException(
s"""\n
- |[flink-submit] RestAPI Submit failed, error detail:
+ |[flink-submit] Both JobGraph submit plan and Rest API submit
plan all failed!
+ |JobGraph Submit plan failed detail:
|------------------------------------------------------------------
|${Utils.stringifyException(e)}
|------------------------------------------------------------------
- |Both JobGraph submit plan and Rest API submit plan all
failed!
- |""".stripMargin
- )
- throw e
+ |
+ | RestAPI Submit plan failed detail:
+ |
------------------------------------------------------------------
+ |${Utils.stringifyException(e1)}
+
|------------------------------------------------------------------
+ |""".stripMargin)
}
case Success(v) => v
}
@@ -239,18 +233,23 @@ trait FlinkClientTrait extends Logger {
submitRequest: SubmitRequest,
jarFile: File): (PackagedProgram, JobGraph) = {
- val packageProgram = PackagedProgram.newBuilder
+ val pgkBuilder = PackagedProgram.newBuilder
.setJarFile(jarFile)
- .setUserClassPaths(
- Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
- )
.setEntryPointClassName(
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
- .setArguments(flinkConfig
- .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*)
- .build()
+ .setArguments(
+ flinkConfig
+ .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+ .orElse(Lists.newArrayList()): _*)
+ // userClassPath...
+ submitRequest.executionMode match {
+ case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
+ pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
+ case _ =>
+ }
+
+ val packageProgram = pgkBuilder.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
index cff3c0398..1e8245228 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
@@ -19,15 +19,17 @@ package org.apache.streampark.flink.packer.maven
import org.eclipse.aether.artifact.{Artifact => AetherArtifact}
+import java.util.{Collections, Set => JavaSet}
import java.util.regex.Pattern
case class Artifact(
groupId: String,
artifactId: String,
version: String,
- classifier: String = null) {
+ classifier: String = null,
+ extensions: JavaSet[String] = Collections.emptySet()) {
- def eq(artifact: AetherArtifact): Boolean = {
+ def filter(artifact: AetherArtifact): Boolean = {
artifact.getGroupId match {
case g if g == groupId =>
artifact.getArtifactId match {
@@ -37,7 +39,6 @@ case class Artifact(
case _ => false
}
}
-
}
object Artifact {
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
similarity index 79%
rename from
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
rename to
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
index c80e416e6..0b39f53d1 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
@@ -27,17 +27,17 @@ import scala.collection.JavaConversions._
* @param extJarLibs
* collection of jar lib paths, which elements can be a directory or file
path.
*/
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
+case class MavenArtifact(mavenArts: Set[Artifact] = Set(), extJarLibs:
Set[String] = Set()) {
def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
this(mavenArts.toSet, extJarLibs.toSet)
}
- def merge(jarLibs: Set[String]): DependencyInfo =
- if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
+ def merge(jarLibs: Set[String]): MavenArtifact =
+ if (jarLibs != null) MavenArtifact(mavenArts, extJarLibs ++ jarLibs) else
this.copy()
}
-object DependencyInfo {
- def empty: DependencyInfo = new DependencyInfo()
+object MavenArtifact {
+ def empty: MavenArtifact = new MavenArtifact()
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 05562a870..a56f41321 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,7 +43,6 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
-import java.util.{HashSet, Set => JavaSet}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -56,8 +55,10 @@ object MavenTool extends Logger {
private[this] val excludeArtifact = List(
Artifact.of("org.apache.flink:force-shading:*"),
+ Artifact.of("org.apache.flink:flink-shaded-force-shading:*"),
Artifact.of("com.google.code.findbugs:jsr305:*"),
- Artifact.of("org.apache.logging.log4j:*:*"))
+ Artifact.of("org.apache.logging.log4j:*:*")
+ )
private[this] def getRemoteRepos(): List[RemoteRepository] = {
val builder =
@@ -145,7 +146,7 @@ object MavenTool extends Logger {
}
req.setResourceTransformers(transformer.toList)
// issue: https://github.com/apache/incubator-streampark/issues/2350
- req.setFilters(List(new ShadeFilter))
+ req.setFilters(List(new ShadedFilter))
req.setRelocators(Lists.newArrayList())
req
}
@@ -159,7 +160,7 @@ object MavenTool extends Logger {
/**
* Build a fat-jar with custom jar librarties and maven artifacts.
*
- * @param dependencyInfo
+ * @param artifact
* maven artifacts and jar libraries for building a fat-jar
* @param outFatJarPath
* output paths of fat-jar, like "/streampark/workspace/233/my-fat.jar"
@@ -167,10 +168,10 @@ object MavenTool extends Logger {
@throws[Exception]
def buildFatJar(
@Nullable mainClass: String,
- @Nonnull dependencyInfo: DependencyInfo,
+ @Nonnull artifact: MavenArtifact,
@Nonnull outFatJarPath: String): File = {
- val jarLibs = dependencyInfo.extJarLibs
- val arts = dependencyInfo.mavenArts
+ val jarLibs = artifact.extJarLibs
+ val arts = artifact.mavenArts
if (jarLibs.isEmpty && arts.isEmpty) {
throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.")
}
@@ -178,8 +179,8 @@ object MavenTool extends Logger {
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
- def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] =
resolveArtifacts(
- mavenArtifacts).asJava
+ def resolveArtifactsAsJava(mavenArtifacts: util.Set[Artifact]):
util.Set[File] = resolveArtifacts(
+ mavenArtifacts.toSet).asJava
/**
* Resolve the collectoin of artifacts, Artifacts will be download to
ConfigConst.MAVEN_LOCAL_DIR
@@ -192,37 +193,55 @@ object MavenTool extends Logger {
*/
@throws[Exception]
def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
- if (mavenArtifacts == null) Set.empty[File];
- else {
- val (repoSystem, session) = getMavenEndpoint()
- val artifacts = mavenArtifacts.map(
- e => {
- new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version)
- })
- logInfo(s"start resolving dependencies: ${artifacts.mkString}")
-
- val remoteRepos = getRemoteRepos()
- // read relevant artifact descriptor info
- // plz don't simplify the following lambda syntax to maintain the
readability of the code.
- val resolvedArtifacts = artifacts
- .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos,
null))
- .map(artDescReq => repoSystem.readArtifactDescriptor(session,
artDescReq))
- .flatMap(_.getDependencies)
- .filter(_.getScope == "compile")
- .filter(x => !excludeArtifact.exists(_.eq(x.getArtifact)))
- .map(_.getArtifact)
-
- val mergedArtifacts = artifacts ++ resolvedArtifacts
- logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}")
-
- // download artifacts
- val artReqs =
- mergedArtifacts.map(artifact => new ArtifactRequest(artifact,
remoteRepos, null))
- repoSystem
- .resolveArtifacts(session, artReqs)
- .map(_.getArtifact.getFile)
- .toSet
+ if (mavenArtifacts == null) {
+ return Set.empty[File]
}
+
+ val (repoSystem, session) = getMavenEndpoint()
+ val artifacts = mavenArtifacts.map(
+ e => {
+ new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version) -> e.extensions
+ })
+
+ logInfo(s"start resolving dependencies: ${artifacts.mkString}")
+
+ val remoteRepos = getRemoteRepos()
+ // read relevant artifact descriptor info
+ // plz don't simplify the following lambda syntax to maintain the
readability of the code.
+ val dependenciesArtifacts = artifacts
+ .map(artifact => new ArtifactDescriptorRequest(artifact._1, remoteRepos,
null) -> artifact._2)
+ .map(descReq => repoSystem.readArtifactDescriptor(session, descReq._1)
-> descReq._2)
+ .flatMap(
+ result =>
+ result._1.getDependencies
+ .filter(
+ dep => {
+ dep.getScope match {
+ case "compile" if
!excludeArtifact.exists(_.filter(dep.getArtifact)) =>
+ val ga =
s"${dep.getArtifact.getGroupId}:${dep.getArtifact.getArtifactId}"
+ val exclusion = result._2.contains(ga)
+ if (exclusion) {
+ val art = result._1.getArtifact
+ val name = s"${art.getGroupId}:${art.getArtifactId}"
+ logInfo(s"[MavenTool] $name dependencies exclusion $ga")
+ }
+ !exclusion
+ case _ => false
+ }
+ })
+ .map(_.getArtifact))
+
+ val mergedArtifacts = artifacts.map(_._1) ++ dependenciesArtifacts
+
+ logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}")
+
+ // download artifacts
+ val artReqs =
+ mergedArtifacts.map(artifact => new ArtifactRequest(artifact,
remoteRepos, null))
+ repoSystem
+ .resolveArtifacts(session, artReqs)
+ .map(_.getArtifact.getFile)
+ .toSet
}
/** create composite maven endpoint */
@@ -256,13 +275,13 @@ object MavenTool extends Logger {
(repoSystem, session)
}
- class ShadeFilter extends Filter {
+ private class ShadedFilter extends Filter {
override def canFilter(jar: File): Boolean = true
override def isFiltered(name: String): Boolean = {
if (name.startsWith("META-INF/")) {
if (name.endsWith(".SF") || name.endsWith(".DSA") ||
name.endsWith(".RSA")) {
- logInfo(s"shade ignore file: $name")
+ logInfo(s"shaded ignore file: $name")
return true
}
}
@@ -271,5 +290,4 @@ object MavenTool extends Logger {
override def finished(): Unit = {}
}
-
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 5abc55b92..41c56aa6b 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{FlinkVersion,
Workspace}
import org.apache.streampark.common.enums.{DevelopmentMode, ExecutionMode}
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
import org.apache.streampark.flink.packer.docker.DockerConf
-import org.apache.streampark.flink.packer.maven.DependencyInfo
+import org.apache.streampark.flink.packer.maven.MavenArtifact
import scala.collection.mutable.ArrayBuffer
@@ -44,17 +44,17 @@ sealed trait FlinkBuildParam extends BuildParam {
def flinkVersion: FlinkVersion
- def dependencyInfo: DependencyInfo
+ def dependency: MavenArtifact
def customFlinkUserJar: String
- lazy val providedLibs: DependencyInfo = {
+ lazy val providedLibs: MavenArtifact = {
val providedLibs =
ArrayBuffer(localWorkspace.APP_JARS, localWorkspace.APP_PLUGINS,
customFlinkUserJar)
if (developmentMode == DevelopmentMode.FLINK_SQL) {
providedLibs +=
s"${localWorkspace.APP_SHIMS}/flink-${flinkVersion.majorVersion}"
}
- dependencyInfo.merge(providedLibs.toSet)
+ dependency.merge(providedLibs.toSet)
}
def getShadedJarPath(rootWorkspace: String): String = {
@@ -79,7 +79,7 @@ case class FlinkK8sSessionBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo,
+ dependency: MavenArtifact,
clusterId: String,
k8sNamespace: String)
extends FlinkK8sBuildParam
@@ -92,7 +92,7 @@ case class FlinkK8sApplicationBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo,
+ dependency: MavenArtifact,
clusterId: String,
k8sNamespace: String,
flinkBaseImage: String,
@@ -110,7 +110,7 @@ case class FlinkRemotePerJobBuildRequest(
executionMode: ExecutionMode,
developmentMode: DevelopmentMode,
flinkVersion: FlinkVersion,
- dependencyInfo: DependencyInfo)
+ dependency: MavenArtifact)
extends FlinkBuildParam
case class FlinkYarnApplicationBuildRequest(
@@ -118,5 +118,5 @@ case class FlinkYarnApplicationBuildRequest(
mainClass: String,
yarnProvidedPath: String,
developmentMode: DevelopmentMode,
- dependencyInfo: DependencyInfo)
+ dependencyInfo: MavenArtifact)
extends BuildParam
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 72d47991e..060209ff9 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -91,7 +91,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(3) {
val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
val extJarLibs = request.developmentMode match {
- case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
+ case DevelopmentMode.FLINK_SQL => request.dependency.extJarLibs
case DevelopmentMode.CUSTOM_CODE => Set[String]()
}
val shadedJar =
diff --git
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..06831510c 100644
---
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.packer
-import org.apache.streampark.flink.packer.maven.{Artifact, DependencyInfo,
MavenTool}
+import org.apache.streampark.flink.packer.maven.{Artifact, MavenArtifact,
MavenTool}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterAll
@@ -87,7 +87,7 @@ class MavenToolSpec extends AnyWordSpec with
BeforeAndAfterAll with Matchers {
val fatJarPath = outputDir.concat("fat-3.jar")
val fatJar = MavenTool.buildFatJar(
null,
- DependencyInfo(
+ MavenArtifact(
Set(Artifact.of("org.apache.flink:flink-connector-kafka_2.11:1.13.0")),
Set(path("jars/commons-dbutils-1.7.jar"))),
fatJarPath)