This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/resource by this push:
new 5df483834 flink connector resource improvement
5df483834 is described below
commit 5df4838342d19e51ebaa2c3269b427e19032a6d9
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 00:28:49 2023 +0800
flink connector resource improvement
---
.../console/core/service/ResourceService.java | 2 +
.../core/service/impl/ResourceServiceImpl.java | 82 ++++++++++++++++++----
.../console/base/util/DependencyUtilsTest.java | 78 +++++---------------
.../src/main/resources/mysql.sql | 0
.../streampark/flink/packer/maven/MavenTool.scala | 16 +++--
.../impl/FlinkYarnApplicationBuildPipeline.scala | 4 +-
.../streampark/flink/packer/MavenToolSpec.scala | 1 +
7 files changed, 106 insertions(+), 77 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index 8bd05f1af..3bba1aece 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -94,4 +94,6 @@ public interface ResourceService extends IService<Resource> {
String upload(MultipartFile file) throws IOException;
RestResponse checkResource(Resource resource);
+
+ List<String> getConnectorId(Resource resource);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 23e0817ce..8dab842b6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -36,6 +37,8 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -58,6 +61,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.jar.Manifest;
import java.util.stream.Collectors;
@Slf4j
@@ -240,25 +245,78 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
switch (type) {
case FLINK_APP:
// check main.
- break;
+ File jarFile = getResourceJar(resource);
+ Manifest manifest = Utils.getJarManifest(jarFile);
+ String mainClass = manifest.getMainAttributes().getValue("Main-Class");
+ if (mainClass == null) {
+ // main class is null
+ return RestResponse.success().data(1);
+ }
+ // successful.
+ return RestResponse.success().data(0);
case CONNECTOR:
- Dependency dependency =
Dependency.toDependency(resource.getResource());
- if (!dependency.isEmpty()) {
- String connectorPath = null;
- if (!dependency.getJar().isEmpty()) {
- String jar = dependency.getJar().get(0);
- File localJar = new File(WebUtils.getAppTempDir(), jar);
- connectorPath = localJar.getAbsolutePath();
- } else {
- Pom pom = dependency.getPom().get(0);
- }
+ // 1) get connector id
+ List<String> connectorIds = getConnectorId(resource);
+ if (Utils.isEmpty(connectorIds)) {
+ // connector id is null
+ return RestResponse.success().data(1);
+ }
+ // 2) check connector exists
+ boolean exists = existsResourceByConnectorIds(connectorIds);
+ if (exists) {
+ return RestResponse.success(2);
}
- break;
+ return RestResponse.success().data(0);
+ }
+ return RestResponse.success().data(0);
+ }
+
+ private boolean existsResourceByConnectorIds(List<String> connectorIds) {
+ return false;
+ }
+
+ @Override
+ public List<String> getConnectorId(Resource resource) {
+ ApiAlertException.throwIfFalse(
+ !ResourceType.CONNECTOR.equals(resource.getResourceType()),
+ "getConnectorId method error, resource not flink connector.");
+ File connector = getResourceJar(resource);
+ if (connector != null) {
+ // TODO parse connector get connectorId
}
return null;
}
+ private File getResourceJar(Resource resource) {
+ Dependency dependency = Dependency.toDependency(resource.getResource());
+ if (dependency.isEmpty()) {
+ return null;
+ }
+ if (!dependency.getJar().isEmpty()) {
+ String jar = dependency.getJar().get(0);
+ return new File(WebUtils.getAppTempDir(), jar);
+ } else {
+ Pom pom = dependency.getPom().get(0);
+ Artifact artifact =
+ new Artifact(pom.getGroupId(), pom.getArtifactId(),
pom.getVersion(), null);
+ try {
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (!files.isEmpty()) {
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> jarFile =
+ files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ if (jarFile.isPresent()) {
+ return jarFile.get();
+ }
+ }
+ } catch (Exception e) {
+ log.error("download flink connector error: " + e);
+ }
+ return null;
+ }
+ }
+
private void transferTeamResource(Long teamId, String resourceName) {
String teamUploads = String.format("%s/%d",
Workspace.local().APP_UPLOADS(), teamId);
if (!FsOperator.lfs().exists(teamUploads)) {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index 0027828e3..58506d9e0 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -17,76 +17,34 @@
package org.apache.streampark.console.base.util;
-import org.apache.streampark.common.util.DependencyUtils;
-import org.apache.streampark.console.core.bean.Pom;
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.io.File;
import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import scala.collection.JavaConversions;
+import java.util.Optional;
@Slf4j
class DependencyUtilsTest {
@Test
- void resolveMavenDependencies() {
- /**
- * <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
- * <version>${flink.version}</version> </dependency>
- *
- * <p><dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
- * <version>${flink.version}</version> </dependency>
- */
- List<Pom> dependency = new ArrayList<>();
-
- Pom dept = new Pom();
- dept.setGroupId("org.apache.flink");
- dept.setArtifactId("flink-table-common");
- dept.setVersion("1.17.1");
- dependency.add(dept);
-
- StringBuilder builder = new StringBuilder();
- dependency.forEach(
- x -> {
- String info =
- String.format("%s:%s:%s,", x.getGroupId(), x.getArtifactId(),
x.getVersion());
- builder.append(info);
- });
- String packages = builder.deleteCharAt(builder.length() - 1).toString();
-
- Timer timer = new Timer();
- timer.schedule(
- new TimerTask() {
- @Override
- public void run() {
- log.info(">>>>> running....");
- }
- },
- 0,
- 3000);
-
- try {
- Collection<String> jars =
- JavaConversions.asJavaCollection(
- DependencyUtils.resolveMavenDependencies(
- packages,
- null,
- null,
- null,
- out -> {
- System.err.println("---------->" + out);
- }));
- System.out.println();
-
System.out.println("----------------------------------------------------------------");
- jars.forEach(System.out::println);
- } catch (Exception e) {
- System.out.println(e.getMessage());
+ void resolveMavenDependencies() throws Exception {
+ Artifact artifact = new Artifact("org.apache.flink", "flink-table-common",
"1.17.1", null);
+
+ InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"~/workspace");
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (!files.isEmpty()) {
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> jarFile = files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ if (jarFile.isPresent()) {
+ String jar = jarFile.get().getAbsolutePath();
+ System.out.println(jar);
+ }
}
}
}
diff --git
a/streampark-flink/streampark-flink-catalog-mysql/src/main/resources/mysql.sql
b/streampark-flink/streampark-flink-catalog-mysql/src/main/resources/mysql.sql
new file mode 100644
index 000000000..e69de29bb
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 2b07cbc74..71d178f24 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,6 +43,7 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
+import java.util.{List => JavaList, Set => JavaSet}
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
@@ -161,6 +162,10 @@ object MavenTool extends Logger {
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
+ @throws[Exception]
+ def resolveArtifacts(mavenArtifact: Artifact): JavaList[File] =
resolveArtifacts(
+ Set(mavenArtifact))
+
/**
* Resolve the collectoin of artifacts, Artifacts will be download to
ConfigConst.MAVEN_LOCAL_DIR
* if necessary. notes: Only compile scope dependencies will be resolved.
@@ -171,13 +176,16 @@ object MavenTool extends Logger {
* jar File Object of resolved artifacts
*/
@throws[Exception]
- def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
- if (mavenArtifacts == null) Set.empty[File];
+ def resolveArtifacts(mavenArtifacts: JavaSet[Artifact]): JavaList[File] = {
+ if (mavenArtifacts == null) List.empty[File]
else {
val (repoSystem, session) = getMavenEndpoint()
val artifacts = mavenArtifacts.map(
e => {
- new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version)
+ val artifact =
+ new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar",
e.version)
+ artifact.getProperties
+ artifact
})
logInfo(s"start resolving dependencies: ${artifacts.mkString}")
@@ -201,7 +209,7 @@ object MavenTool extends Logger {
repoSystem
.resolveArtifacts(session, artReqs)
.map(_.getArtifact.getFile)
- .toSet
+ .toList
}
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..b9167d437 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -28,6 +28,8 @@ import org.apache.commons.codec.digest.DigestUtils
import java.io.{File, FileInputStream, IOException}
+import scala.collection.convert.ImplicitConversions._
+
/** Building pipeline for flink yarn application mode */
class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildRequest)
extends BuildPipeline {
@@ -59,7 +61,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
case DevelopmentMode.FLINK_SQL =>
val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs
- case _ => Set[String]()
+ case _ => List[String]()
}
}.getOrElse(throw getError.exception)
diff --git
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..2bd91b8a7 100644
---
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -26,6 +26,7 @@ import org.scalatest.wordspec.AnyWordSpec
import java.io.File
import java.util.jar.JarFile
+import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
class MavenToolSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers {