This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/resource by this push:
     new cab165ec1 DependencyUtilsTest improvement
cab165ec1 is described below

commit cab165ec11df06e588f56722e4256bfa8678f9f1
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 14:54:38 2023 +0800

    DependencyUtilsTest improvement
---
 .../console/base/util/DependencyUtilsTest.java     | 89 +++++++++++++++++-----
 1 file changed, 72 insertions(+), 17 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index c70d5be79..3570486d1 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -23,46 +23,101 @@ import org.apache.streampark.common.util.ClassLoaderUtils;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.factories.Factory;
 
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Scanner;
 import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
 
 @Slf4j
 class DependencyUtilsTest {
 
   @Test
   public void resolveFlinkConnector() throws Exception {
-    Artifact artifact = new Artifact("org.apache.flink", 
"flink-connector-kafka", "1.17.1", null);
 
-    InternalConfigHolder.set(
-        CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"/Users/benjobs/Desktop/streamx_workspace");
+    Artifact artifact =
+        new Artifact("org.apache.flink", "flink-connector-hive_2.12", 
"1.17.1", null);
+
+    InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"~/tmp");
 
     List<File> files = MavenTool.resolveArtifacts(artifact);
+    if (!files.isEmpty()) {
+      Class<Factory> className = Factory.class;
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      files.forEach(x -> ClassLoaderUtils.loadJar(x.getAbsolutePath()));
+      ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, 
classLoader);
+      for (Factory factory : serviceLoader) {
+        String name = factory.getClass().getName();
+        String id = factory.factoryIdentifier();
+        System.out.println("id: " + id + "          class :" + name);
+
+        Set<ConfigOption<?>> requiredOptions = factory.requiredOptions();
+        System.out.println(" ------------requiredOptions---------- ");
+        requiredOptions.forEach(
+            x -> System.out.println(x.key() + "         defValue: " + 
x.defaultValue()));
+
+        System.out.println(" ------------optionalOptions---------- ");
+        Set<ConfigOption<?>> options = factory.optionalOptions();
+        options.forEach(
+            x -> System.out.println(x.key() + "            defValue: " + 
x.defaultValue()));
+
+        System.out.println();
+      }
+    }
+  }
+
+  @Test
+  public void resolveFlinkConnector2() throws Exception {
+    Artifact artifact =
+        new Artifact("org.apache.flink", "flink-connector-elasticsearch6", 
"3.0.1-1.17", null);
+
+    InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"~/tmp");
+    List<File> files = MavenTool.resolveArtifacts(artifact);
+
+    String path = null;
     if (!files.isEmpty()) {
       String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
       Optional<File> jarFile = files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
       if (jarFile.isPresent()) {
-        String jar = jarFile.get().getAbsolutePath();
-        Class<Factory> className = Factory.class;
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-        // 动态加载jar到classpath,
-        // 优点:简单,
-        // 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ...
-        ClassLoaderUtils.loadJar(jar);
-
-        ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, 
classLoader);
-        for (Factory factory : serviceLoader) {
-          String name = factory.getClass().getName();
-          String id = factory.factoryIdentifier();
-          System.out.println(id + "      :     " + name);
-        }
+        path = jarFile.get().getAbsolutePath();
+      }
+    }
+
+    String configFile = 
"META-INF/services/org.apache.flink.table.factories.Factory";
+
+    JarFile jarFile = new JarFile(path);
+    JarEntry entry = jarFile.getJarEntry(configFile);
+
+    List<String> factories = new ArrayList<>(0);
+    InputStream inputStream = jarFile.getInputStream(entry);
+    Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+    while (scanner.hasNextLine()) {
+      String line = scanner.nextLine().trim();
+      if (line.length() > 0 && !line.startsWith("#")) {
+        factories.add(line);
       }
     }
+
+    factories.forEach(System.out::println);
+    for (String factory : factories) {
+      String packageName = factory.replace('.', '/') + ".class";
+      JarEntry classEntry = jarFile.getJarEntry(packageName);
+      InputStream in = jarFile.getInputStream(classEntry);
+
+      // TODO parse connector
+      System.out.println(in);
+    }
   }
 }

Reply via email to