This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/resource by this push:
     new 5df483834 flink connector resource improvement
5df483834 is described below

commit 5df4838342d19e51ebaa2c3269b427e19032a6d9
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 00:28:49 2023 +0800

    flink connector resource improvement
---
 .../console/core/service/ResourceService.java      |  2 +
 .../core/service/impl/ResourceServiceImpl.java     | 82 ++++++++++++++++++----
 .../console/base/util/DependencyUtilsTest.java     | 78 +++++---------------
 .../src/main/resources/mysql.sql                   |  0
 .../streampark/flink/packer/maven/MavenTool.scala  | 16 +++--
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |  4 +-
 .../streampark/flink/packer/MavenToolSpec.scala    |  1 +
 7 files changed, 106 insertions(+), 77 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index 8bd05f1af..3bba1aece 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -94,4 +94,6 @@ public interface ResourceService extends IService<Resource> {
   String upload(MultipartFile file) throws IOException;
 
   RestResponse checkResource(Resource resource);
+
+  List<String> getConnectorId(Resource resource);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 23e0817ce..8dab842b6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -36,6 +37,8 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -58,6 +61,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -240,25 +245,78 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     switch (type) {
       case FLINK_APP:
         // check main.
-        break;
+        File jarFile = getResourceJar(resource);
+        Manifest manifest = Utils.getJarManifest(jarFile);
+        String mainClass = manifest.getMainAttributes().getValue("Main-Class");
+        if (mainClass == null) {
+          // main class is null
+          return RestResponse.success().data(1);
+        }
+        // successful.
+        return RestResponse.success().data(0);
       case CONNECTOR:
-        Dependency dependency = 
Dependency.toDependency(resource.getResource());
-        if (!dependency.isEmpty()) {
-          String connectorPath = null;
-          if (!dependency.getJar().isEmpty()) {
-            String jar = dependency.getJar().get(0);
-            File localJar = new File(WebUtils.getAppTempDir(), jar);
-            connectorPath = localJar.getAbsolutePath();
-          } else {
-            Pom pom = dependency.getPom().get(0);
-          }
+        // 1) get connector id
+        List<String> connectorIds = getConnectorId(resource);
+        if (Utils.isEmpty(connectorIds)) {
+          // connector id is null
+          return RestResponse.success().data(1);
+        }
+        // 2) check connector exists
+        boolean exists = existsResourceByConnectorIds(connectorIds);
+        if (exists) {
+          return RestResponse.success(2);
         }
-        break;
+        return RestResponse.success().data(0);
+    }
+    return RestResponse.success().data(0);
+  }
+
+  private boolean existsResourceByConnectorIds(List<String> connectorIds) {
+    return false;
+  }
+
+  @Override
+  public List<String> getConnectorId(Resource resource) {
+    ApiAlertException.throwIfFalse(
+        !ResourceType.CONNECTOR.equals(resource.getResourceType()),
+        "getConnectorId method error, resource not flink connector.");
+    File connector = getResourceJar(resource);
+    if (connector != null) {
+      // TODO parse connector get connectorId
     }
 
     return null;
   }
 
+  private File getResourceJar(Resource resource) {
+    Dependency dependency = Dependency.toDependency(resource.getResource());
+    if (dependency.isEmpty()) {
+      return null;
+    }
+    if (!dependency.getJar().isEmpty()) {
+      String jar = dependency.getJar().get(0);
+      return new File(WebUtils.getAppTempDir(), jar);
+    } else {
+      Pom pom = dependency.getPom().get(0);
+      Artifact artifact =
+          new Artifact(pom.getGroupId(), pom.getArtifactId(), 
pom.getVersion(), null);
+      try {
+        List<File> files = MavenTool.resolveArtifacts(artifact);
+        if (!files.isEmpty()) {
+          String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+          Optional<File> jarFile =
+              files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+          if (jarFile.isPresent()) {
+            return jarFile.get();
+          }
+        }
+      } catch (Exception e) {
+        log.error("download flink connector error: " + e);
+      }
+      return null;
+    }
+  }
+
   private void transferTeamResource(Long teamId, String resourceName) {
     String teamUploads = String.format("%s/%d", 
Workspace.local().APP_UPLOADS(), teamId);
     if (!FsOperator.lfs().exists(teamUploads)) {
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index 0027828e3..58506d9e0 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -17,76 +17,34 @@
 
 package org.apache.streampark.console.base.util;
 
-import org.apache.streampark.common.util.DependencyUtils;
-import org.apache.streampark.console.core.bean.Pom;
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import java.io.File;
 import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import scala.collection.JavaConversions;
+import java.util.Optional;
 
 @Slf4j
 class DependencyUtilsTest {
 
   @Test
-  void resolveMavenDependencies() {
-    /**
-     * <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-common</artifactId>
-     * <version>${flink.version}</version> </dependency>
-     *
-     * <p><dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-java</artifactId>
-     * <version>${flink.version}</version> </dependency>
-     */
-    List<Pom> dependency = new ArrayList<>();
-
-    Pom dept = new Pom();
-    dept.setGroupId("org.apache.flink");
-    dept.setArtifactId("flink-table-common");
-    dept.setVersion("1.17.1");
-    dependency.add(dept);
-
-    StringBuilder builder = new StringBuilder();
-    dependency.forEach(
-        x -> {
-          String info =
-              String.format("%s:%s:%s,", x.getGroupId(), x.getArtifactId(), 
x.getVersion());
-          builder.append(info);
-        });
-    String packages = builder.deleteCharAt(builder.length() - 1).toString();
-
-    Timer timer = new Timer();
-    timer.schedule(
-        new TimerTask() {
-          @Override
-          public void run() {
-            log.info(">>>>> running....");
-          }
-        },
-        0,
-        3000);
-
-    try {
-      Collection<String> jars =
-          JavaConversions.asJavaCollection(
-              DependencyUtils.resolveMavenDependencies(
-                  packages,
-                  null,
-                  null,
-                  null,
-                  out -> {
-                    System.err.println("---------->" + out);
-                  }));
-      System.out.println();
-      
System.out.println("----------------------------------------------------------------");
-      jars.forEach(System.out::println);
-    } catch (Exception e) {
-      System.out.println(e.getMessage());
+  void resolveMavenDependencies() throws Exception {
+    Artifact artifact = new Artifact("org.apache.flink", "flink-table-common", 
"1.17.1", null);
+
+    InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"~/workspace");
+    List<File> files = MavenTool.resolveArtifacts(artifact);
+    if (!files.isEmpty()) {
+      String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+      Optional<File> jarFile = files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+      if (jarFile.isPresent()) {
+        String jar = jarFile.get().getAbsolutePath();
+        System.out.println(jar);
+      }
     }
   }
 }
diff --git 
a/streampark-flink/streampark-flink-catalog-mysql/src/main/resources/mysql.sql 
b/streampark-flink/streampark-flink-catalog-mysql/src/main/resources/mysql.sql
new file mode 100644
index 000000000..e69de29bb
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 2b07cbc74..71d178f24 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,6 +43,7 @@ import javax.annotation.{Nonnull, Nullable}
 
 import java.io.File
 import java.util
+import java.util.{List => JavaList, Set => JavaSet}
 
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -161,6 +162,10 @@ object MavenTool extends Logger {
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
 
+  @throws[Exception]
+  def resolveArtifacts(mavenArtifact: Artifact): JavaList[File] = 
resolveArtifacts(
+    Set(mavenArtifact))
+
   /**
    * Resolve the collectoin of artifacts, Artifacts will be download to 
ConfigConst.MAVEN_LOCAL_DIR
    * if necessary. notes: Only compile scope dependencies will be resolved.
@@ -171,13 +176,16 @@ object MavenTool extends Logger {
    *   jar File Object of resolved artifacts
    */
   @throws[Exception]
-  def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
-    if (mavenArtifacts == null) Set.empty[File];
+  def resolveArtifacts(mavenArtifacts: JavaSet[Artifact]): JavaList[File] = {
+    if (mavenArtifacts == null) List.empty[File]
     else {
       val (repoSystem, session) = getMavenEndpoint()
       val artifacts = mavenArtifacts.map(
         e => {
-          new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version)
+          val artifact =
+            new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version)
+          artifact.getProperties
+          artifact
         })
       logInfo(s"start resolving dependencies: ${artifacts.mkString}")
 
@@ -201,7 +209,7 @@ object MavenTool extends Logger {
       repoSystem
         .resolveArtifacts(session, artReqs)
         .map(_.getArtifact.getFile)
-        .toSet
+        .toList
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..b9167d437 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -28,6 +28,8 @@ import org.apache.commons.codec.digest.DigestUtils
 
 import java.io.{File, FileInputStream, IOException}
 
+import scala.collection.convert.ImplicitConversions._
+
 /** Building pipeline for flink yarn application mode */
 class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildRequest)
   extends BuildPipeline {
@@ -59,7 +61,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
           case DevelopmentMode.FLINK_SQL =>
             val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
             mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs
-          case _ => Set[String]()
+          case _ => List[String]()
         }
       }.getOrElse(throw getError.exception)
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..2bd91b8a7 100644
--- 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -26,6 +26,7 @@ import org.scalatest.wordspec.AnyWordSpec
 import java.io.File
 import java.util.jar.JarFile
 
+import scala.collection.convert.ImplicitConversions._
 import scala.language.postfixOps
 
 class MavenToolSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers {

Reply via email to