This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/resource by this push:
     new 764a8c3d1 get flink connector id
764a8c3d1 is described below

commit 764a8c3d161483d98f47fbb1ccddd1cc6946911a
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 09:29:30 2023 +0800

    get flink connector id
---
 .../streampark/common/util/ClassLoaderUtils.scala  | 15 ++++++-
 .../streampark-console-service/pom.xml             |  8 ++++
 .../console/core/service/ResourceService.java      |  2 +-
 .../core/service/impl/ResourceServiceImpl.java     | 47 +++++++++++++---------
 .../console/base/util/DependencyUtilsTest.java     | 26 ++++++++++--
 5 files changed, 72 insertions(+), 26 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index 620f680e7..eb3f3dadc 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,9 +16,11 @@
  */
 package org.apache.streampark.common.util
 
-import java.io.File
+import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
-import java.util.function.Supplier
+import java.util.function.{Consumer, Supplier}
+
+import scala.collection.mutable.ArrayBuffer
 
 object ClassLoaderUtils extends Logger {
 
@@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
       Thread.currentThread.setContextClassLoader(originalClassLoader)
     }
   }
+  @throws[IOException]
+  def cloneClassLoader(): ClassLoader = {
+    val urls = originalClassLoader.getResources(".")
+    val buffer = ArrayBuffer[URL]()
+    while (urls.hasMoreElements) {
+      buffer += urls.nextElement()
+    }
+    new URLClassLoader(buffer.toArray[URL], originalClassLoader)
+  }
 
   def loadJar(jarFilePath: String): Unit = {
     val jarFile = new File(jarFilePath)
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index bbc761000..b3f9bb908 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -383,6 +383,7 @@
             <version>${project.version}</version>
         </dependency>
 
+
         <dependency>
             <groupId>com.fasterxml.jackson.module</groupId>
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -450,6 +451,13 @@
             <artifactId>force-shading</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index 3bba1aece..be4dafdc8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -95,5 +95,5 @@ public interface ResourceService extends IService<Resource> {
 
   RestResponse checkResource(Resource resource);
 
-  List<String> getConnectorId(Resource resource);
+  List<String> getConnectorId(Resource resource) throws Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 8dab842b6..835db8b21 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -245,26 +245,38 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     switch (type) {
       case FLINK_APP:
         // check main.
-        File jarFile = getResourceJar(resource);
+        File jarFile = null;
+        try {
+          jarFile = getResourceJar(resource);
+        } catch (Exception e) {
+          // get jarFile error
+          return RestResponse.success().data(1);
+        }
         Manifest manifest = Utils.getJarManifest(jarFile);
         String mainClass = manifest.getMainAttributes().getValue("Main-Class");
         if (mainClass == null) {
           // main class is null
-          return RestResponse.success().data(1);
+          return RestResponse.success().data(2);
         }
         // successful.
         return RestResponse.success().data(0);
       case CONNECTOR:
         // 1) get connector id
-        List<String> connectorIds = getConnectorId(resource);
+        List<String> connectorIds;
+        try {
+          connectorIds = getConnectorId(resource);
+        } catch (Exception e) {
+          return RestResponse.success().data(1);
+        }
+
         if (Utils.isEmpty(connectorIds)) {
           // connector id is null
-          return RestResponse.success().data(1);
+          return RestResponse.success().data(2);
         }
         // 2) check connector exists
         boolean exists = existsResourceByConnectorIds(connectorIds);
         if (exists) {
-          return RestResponse.success(2);
+          return RestResponse.success(3);
         }
         return RestResponse.success().data(0);
     }
@@ -276,19 +288,20 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
   }
 
   @Override
-  public List<String> getConnectorId(Resource resource) {
+  public List<String> getConnectorId(Resource resource) throws Exception {
     ApiAlertException.throwIfFalse(
         !ResourceType.CONNECTOR.equals(resource.getResourceType()),
         "getConnectorId method error, resource not flink connector.");
     File connector = getResourceJar(resource);
     if (connector != null) {
+      String spi = 
"META-INF/services/org.apache.flink.table.factories.Factory";
+
       // TODO parse connector get connectorId
     }
-
     return null;
   }
 
-  private File getResourceJar(Resource resource) {
+  private File getResourceJar(Resource resource) throws Exception {
     Dependency dependency = Dependency.toDependency(resource.getResource());
     if (dependency.isEmpty()) {
       return null;
@@ -300,18 +313,14 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
       Pom pom = dependency.getPom().get(0);
       Artifact artifact =
           new Artifact(pom.getGroupId(), pom.getArtifactId(), 
pom.getVersion(), null);
-      try {
-        List<File> files = MavenTool.resolveArtifacts(artifact);
-        if (!files.isEmpty()) {
-          String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
-          Optional<File> jarFile =
-              files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
-          if (jarFile.isPresent()) {
-            return jarFile.get();
-          }
+      List<File> files = MavenTool.resolveArtifacts(artifact);
+      if (!files.isEmpty()) {
+        String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+        Optional<File> jarFile =
+            files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+        if (jarFile.isPresent()) {
+          return jarFile.get();
         }
-      } catch (Exception e) {
-        log.error("download flink connector error: " + e);
       }
       return null;
     }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index 58506d9e0..c70d5be79 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -19,31 +19,49 @@ package org.apache.streampark.console.base.util;
 
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.common.util.ClassLoaderUtils;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
 
+import org.apache.flink.table.factories.Factory;
+
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.List;
 import java.util.Optional;
+import java.util.ServiceLoader;
 
 @Slf4j
 class DependencyUtilsTest {
 
   @Test
-  void resolveMavenDependencies() throws Exception {
-    Artifact artifact = new Artifact("org.apache.flink", "flink-table-common", 
"1.17.1", null);
+  public void resolveFlinkConnector() throws Exception {
+    Artifact artifact = new Artifact("org.apache.flink", 
"flink-connector-kafka", "1.17.1", null);
+
+    InternalConfigHolder.set(
+        CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"/Users/benjobs/Desktop/streamx_workspace");
 
-    InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"~/workspace");
     List<File> files = MavenTool.resolveArtifacts(artifact);
     if (!files.isEmpty()) {
       String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
       Optional<File> jarFile = files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
       if (jarFile.isPresent()) {
         String jar = jarFile.get().getAbsolutePath();
-        System.out.println(jar);
+        Class<Factory> className = Factory.class;
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        // 动态加载jar到classpath,
+        // 优点:简单,
+        // 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ...
+        ClassLoaderUtils.loadJar(jar);
+
+        ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, 
classLoader);
+        for (Factory factory : serviceLoader) {
+          String name = factory.getClass().getName();
+          String id = factory.factoryIdentifier();
+          System.out.println(id + "      :     " + name);
+        }
       }
     }
   }

Reply via email to