This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch resource
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/resource by this push:
new 764a8c3d1 get flink connector id
764a8c3d1 is described below
commit 764a8c3d161483d98f47fbb1ccddd1cc6946911a
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 09:29:30 2023 +0800
get flink connector id
---
.../streampark/common/util/ClassLoaderUtils.scala | 15 ++++++-
.../streampark-console-service/pom.xml | 8 ++++
.../console/core/service/ResourceService.java | 2 +-
.../core/service/impl/ResourceServiceImpl.java | 47 +++++++++++++---------
.../console/base/util/DependencyUtilsTest.java | 26 ++++++++++--
5 files changed, 72 insertions(+), 26 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index 620f680e7..eb3f3dadc 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,9 +16,11 @@
*/
package org.apache.streampark.common.util
-import java.io.File
+import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
-import java.util.function.Supplier
+import java.util.function.{Consumer, Supplier}
+
+import scala.collection.mutable.ArrayBuffer
object ClassLoaderUtils extends Logger {
@@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
+ @throws[IOException]
+ def cloneClassLoader(): ClassLoader = {
+ val urls = originalClassLoader.getResources(".")
+ val buffer = ArrayBuffer[URL]()
+ while (urls.hasMoreElements) {
+ buffer += urls.nextElement()
+ }
+ new URLClassLoader(buffer.toArray[URL], originalClassLoader)
+ }
def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index bbc761000..b3f9bb908 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -383,6 +383,7 @@
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -450,6 +451,13 @@
<artifactId>force-shading</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index 3bba1aece..be4dafdc8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -95,5 +95,5 @@ public interface ResourceService extends IService<Resource> {
RestResponse checkResource(Resource resource);
- List<String> getConnectorId(Resource resource);
+ List<String> getConnectorId(Resource resource) throws Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 8dab842b6..835db8b21 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -245,26 +245,38 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
switch (type) {
case FLINK_APP:
// check main.
- File jarFile = getResourceJar(resource);
+ File jarFile = null;
+ try {
+ jarFile = getResourceJar(resource);
+ } catch (Exception e) {
+ // get jarFile error
+ return RestResponse.success().data(1);
+ }
Manifest manifest = Utils.getJarManifest(jarFile);
String mainClass = manifest.getMainAttributes().getValue("Main-Class");
if (mainClass == null) {
// main class is null
- return RestResponse.success().data(1);
+ return RestResponse.success().data(2);
}
// successful.
return RestResponse.success().data(0);
case CONNECTOR:
// 1) get connector id
- List<String> connectorIds = getConnectorId(resource);
+ List<String> connectorIds;
+ try {
+ connectorIds = getConnectorId(resource);
+ } catch (Exception e) {
+ return RestResponse.success().data(1);
+ }
+
if (Utils.isEmpty(connectorIds)) {
// connector id is null
- return RestResponse.success().data(1);
+ return RestResponse.success().data(2);
}
// 2) check connector exists
boolean exists = existsResourceByConnectorIds(connectorIds);
if (exists) {
- return RestResponse.success(2);
+ return RestResponse.success(3);
}
return RestResponse.success().data(0);
}
@@ -276,19 +288,20 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
}
@Override
- public List<String> getConnectorId(Resource resource) {
+ public List<String> getConnectorId(Resource resource) throws Exception {
ApiAlertException.throwIfFalse(
!ResourceType.CONNECTOR.equals(resource.getResourceType()),
"getConnectorId method error, resource not flink connector.");
File connector = getResourceJar(resource);
if (connector != null) {
+ String spi =
"META-INF/services/org.apache.flink.table.factories.Factory";
+
// TODO parse connector get connectorId
}
-
return null;
}
- private File getResourceJar(Resource resource) {
+ private File getResourceJar(Resource resource) throws Exception {
Dependency dependency = Dependency.toDependency(resource.getResource());
if (dependency.isEmpty()) {
return null;
@@ -300,18 +313,14 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
Pom pom = dependency.getPom().get(0);
Artifact artifact =
new Artifact(pom.getGroupId(), pom.getArtifactId(),
pom.getVersion(), null);
- try {
- List<File> files = MavenTool.resolveArtifacts(artifact);
- if (!files.isEmpty()) {
- String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
- Optional<File> jarFile =
- files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
- if (jarFile.isPresent()) {
- return jarFile.get();
- }
+ List<File> files = MavenTool.resolveArtifacts(artifact);
+ if (!files.isEmpty()) {
+ String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
+ Optional<File> jarFile =
+ files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
+ if (jarFile.isPresent()) {
+ return jarFile.get();
}
- } catch (Exception e) {
- log.error("download flink connector error: " + e);
}
return null;
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
index 58506d9e0..c70d5be79 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -19,31 +19,49 @@ package org.apache.streampark.console.base.util;
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.common.util.ClassLoaderUtils;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
+import org.apache.flink.table.factories.Factory;
+
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.List;
import java.util.Optional;
+import java.util.ServiceLoader;
@Slf4j
class DependencyUtilsTest {
@Test
- void resolveMavenDependencies() throws Exception {
- Artifact artifact = new Artifact("org.apache.flink", "flink-table-common",
"1.17.1", null);
+ public void resolveFlinkConnector() throws Exception {
+ Artifact artifact = new Artifact("org.apache.flink",
"flink-connector-kafka", "1.17.1", null);
+
+ InternalConfigHolder.set(
+ CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"/Users/benjobs/Desktop/streamx_workspace");
- InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(),
"~/workspace");
List<File> files = MavenTool.resolveArtifacts(artifact);
if (!files.isEmpty()) {
String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
Optional<File> jarFile = files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
if (jarFile.isPresent()) {
String jar = jarFile.get().getAbsolutePath();
- System.out.println(jar);
+ Class<Factory> className = Factory.class;
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ // 动态加载jar到classpath,
+ // 优点:简单,
+ // 缺点: 会污染当前主线程,每上传一个新的connector都会加载到当前的主线程中,导致加载太多的 class, ...
+ ClassLoaderUtils.loadJar(jar);
+
+ ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className,
classLoader);
+ for (Factory factory : serviceLoader) {
+ String name = factory.getClass().getName();
+ String id = factory.factoryIdentifier();
+ System.out.println(id + " : " + name);
+ }
}
}
}