This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/resource by this push:
new cab165ec1 DependencyUtilsTest improvement
cab165ec1 is described below
commit cab165ec11df06e588f56722e4256bfa8678f9f1
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 14:54:38 2023 +0800
DependencyUtilsTest improvement
---
.../console/base/util/DependencyUtilsTest.java | 89 +++++++++++++++++-----
1 file changed, 72 insertions(+), 17 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index c70d5be79..3570486d1 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -23,46 +23,101 @@ import org.apache.streampark.common.util.ClassLoaderUtils;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Scanner;
import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
@Slf4j
class DependencyUtilsTest {
@Test
public void resolveFlinkConnector() throws Exception {
- Artifact artifact = new Artifact("org.apache.flink",
"flink-connector-kafka", "1.17.1", null);
- InternalConfigHolder.set(
- CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"/Users/benjobs/Desktop/streamx_workspace");
+ Artifact artifact =
+ new Artifact("org.apache.flink", "flink-connector-hive_2.12",
"1.17.1", null);
+
+ InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"~/tmp");
List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (!files.isEmpty()) {
+ Class<Factory> className = Factory.class;
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ files.forEach(x -> ClassLoaderUtils.loadJar(x.getAbsolutePath()));
+ ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className,
classLoader);
+ for (Factory factory : serviceLoader) {
+ String name = factory.getClass().getName();
+ String id = factory.factoryIdentifier();
+ System.out.println("id: " + id + " class :" + name);
+
+ Set<ConfigOption<?>> requiredOptions = factory.requiredOptions();
+ System.out.println(" ------------requiredOptions---------- ");
+ requiredOptions.forEach(
+ x -> System.out.println(x.key() + " defValue: " +
x.defaultValue()));
+
+ System.out.println(" ------------optionalOptions---------- ");
+ Set<ConfigOption<?>> options = factory.optionalOptions();
+ options.forEach(
+ x -> System.out.println(x.key() + " defValue: " +
x.defaultValue()));
+
+ System.out.println();
+ }
+ }
+ }
+
+ @Test
+ public void resolveFlinkConnector2() throws Exception {
+ Artifact artifact =
+ new Artifact("org.apache.flink", "flink-connector-elasticsearch6",
"3.0.1-1.17", null);
+
+ InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"~/tmp");
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+
+ String path = null;
if (!files.isEmpty()) {
String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
Optional<File> jarFile = files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
if (jarFile.isPresent()) {
- String jar = jarFile.get().getAbsolutePath();
- Class<Factory> className = Factory.class;
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- // 动态加载jar到classpath,
- // 优点:简单,
- // 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ...
- ClassLoaderUtils.loadJar(jar);
-
- ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className,
classLoader);
- for (Factory factory : serviceLoader) {
- String name = factory.getClass().getName();
- String id = factory.factoryIdentifier();
- System.out.println(id + " : " + name);
- }
+ path = jarFile.get().getAbsolutePath();
+ }
+ }
+
+ String configFile =
"META-INF/services/org.apache.flink.table.factories.Factory";
+
+ JarFile jarFile = new JarFile(path);
+ JarEntry entry = jarFile.getJarEntry(configFile);
+
+ List<String> factories = new ArrayList<>(0);
+ InputStream inputStream = jarFile.getInputStream(entry);
+ Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine().trim();
+ if (line.length() > 0 && !line.startsWith("#")) {
+ factories.add(line);
}
}
+
+ factories.forEach(System.out::println);
+ for (String factory : factories) {
+ String packageName = factory.replace('.', '/') + ".class";
+ JarEntry classEntry = jarFile.getJarEntry(packageName);
+ InputStream in = jarFile.getInputStream(classEntry);
+
+ // TODO parse connector
+ System.out.println(in);
+ }
}
}