This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a1a564f75 [improve] improve resource management logic (#3439)
a1a564f75 is described below
commit a1a564f7513667c10c3ca0b8fa0e0b2836b3d873
Author: monrg <[email protected]>
AuthorDate: Fri Dec 29 00:32:17 2023 +0800
[improve] improve resource management logic (#3439)
Co-authored-by: monrg <[email protected]>
---
.../streampark/common/util/ExceptionUtils.java | 14 ++
.../org/apache/streampark/common/util/Utils.scala | 8 +
.../impl/ApplicationInfoServiceImpl.java | 4 +-
.../core/service/impl/ResourceServiceImpl.java | 220 +++++++++------------
4 files changed, 118 insertions(+), 128 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
index 40deb57b5..22d93e52b 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
@@ -48,4 +48,18 @@ public class ExceptionUtils {
return e.getClass().getName() + " (error while printing stack trace)";
}
}
+
+ @FunctionalInterface
+ public interface WrapperRuntimeExceptionHandler<I, O> {
+ O handle(I input) throws Exception;
+ }
+
+ public static <I, O> O wrapRuntimeException(
+ I input, WrapperRuntimeExceptionHandler<I, O> handler) {
+ try {
+ return handler.handle(input);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index f8e3a21e6..fcba212d7 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -96,6 +96,14 @@ object Utils extends Logger {
new JarInputStream(new BufferedInputStream(new
FileInputStream(jarFile))).getManifest
}
+ def getJarManClass(jarFile: File): String = {
+ val manifest = getJarManifest(jarFile)
+ manifest.getMainAttributes.getValue("Main-Class") match {
+ case null => manifest.getMainAttributes.getValue("program-class")
+ case v => v
+ }
+ }
+
def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 95883a07e..36a1d05cc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -78,7 +78,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -498,8 +497,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
jarFile = new File(modulePath, appParam.getJar());
}
- Manifest manifest = Utils.getJarManifest(jarFile);
- return manifest.getMainAttributes().getValue("Main-Class");
+ return Utils.getJarManClass(jarFile);
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index b89e57a9a..b8df85f2b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -45,10 +45,10 @@ import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;
-import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -56,6 +56,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -68,7 +69,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
@@ -77,12 +77,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.ServiceLoader;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
-import java.util.jar.Manifest;
import java.util.stream.Collectors;
@Slf4j
@@ -95,7 +95,9 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
public static final String EXCEPTION = "exception";
@Autowired private ApplicationManageService applicationManageService;
+
@Autowired private CommonService commonService;
+
@Autowired private FlinkSqlService flinkSqlService;
@Override
@@ -138,26 +140,28 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
ApiAlertException.throwIfTrue(
jars.size() + poms.size() > 1, "Please do not add multi dependency at
one time.");
- if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) {
- ApiAlertException.throwIfNull(resource.getResourceName(), "The
resourceName is required.");
- } else {
+ if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
String connector = resource.getConnector();
ApiAlertException.throwIfTrue(connector == null, "the flink connector is
null.");
FlinkConnector connectorResource = JacksonUtils.read(connector,
FlinkConnector.class);
resource.setResourceName(connectorResource.getFactoryIdentifier());
- if (connectorResource.getRequiredOptions() != null) {
- resource.setConnectorRequiredOptions(
- JacksonUtils.write(connectorResource.getRequiredOptions()));
- }
- if (connectorResource.getOptionalOptions() != null) {
- resource.setConnectorOptionalOptions(
- JacksonUtils.write(connectorResource.getOptionalOptions()));
- }
+ Optional.ofNullable(connectorResource.getRequiredOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorRequiredOptions(
+ ExceptionUtils.wrapRuntimeException(v,
JacksonUtils::write)));
+ Optional.ofNullable(connectorResource.getOptionalOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorOptionalOptions(
+ ExceptionUtils.wrapRuntimeException(v,
JacksonUtils::write)));
+ } else {
+ ApiAlertException.throwIfNull(resource.getResourceName(), "The
resourceName is required.");
}
ApiAlertException.throwIfTrue(
this.findByResourceName(resource.getTeamId(),
resource.getResourceName()) != null,
- String.format("Sorry, the resource %s already exists.",
resource.getResourceName()));
+ String.format("the resource %s already exists, please check.",
resource.getResourceName()));
if (!jars.isEmpty()) {
String resourcePath = jars.get(0);
@@ -251,15 +255,8 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
@Override
public String upload(MultipartFile file) throws IOException {
File temp = WebUtils.getAppTempDir();
-
- String name = file.getOriginalFilename();
- String suffix = name.substring(name.lastIndexOf("."));
-
- String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
- String fileName = sha256Hex.concat(suffix);
-
+ String fileName =
FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
File saveFile = new File(temp, fileName);
-
if (!saveFile.exists()) {
// save file to temp dir
try {
@@ -268,114 +265,93 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
throw new ApiDetailException(e);
}
}
-
return saveFile.getAbsolutePath();
}
@Override
public RestResponse checkResource(Resource resourceParam) throws
JsonProcessingException {
ResourceTypeEnum type = resourceParam.getResourceType();
- Map<String, Serializable> resp = new HashMap<>(0);
- resp.put(STATE, 0);
switch (type) {
case FLINK_APP:
- // check main.
- File jarFile;
- try {
- jarFile = getResourceJar(resourceParam);
- } catch (Exception e) {
- // get jarFile error
- resp.put(STATE, 1);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
- if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
- return RestResponse.success().data(resp);
- }
- Manifest manifest = Utils.getJarManifest(jarFile);
- String mainClass = manifest.getMainAttributes().getValue("Main-Class");
-
- if (mainClass == null) {
- // main class is null
- resp.put(STATE, 2);
- return RestResponse.success().data(resp);
- }
- return RestResponse.success().data(resp);
+ return checkFlinkApp(resourceParam);
case CONNECTOR:
- // 1) get connector id
- FlinkConnector connectorResource;
-
- ApiAlertException.throwIfFalse(
- ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(),
- "getConnectorId method error, resource not flink connector.");
+ return checkConnector(resourceParam);
+ }
+ return RestResponse.success().data(ImmutableMap.of(STATE, 0));
+ }
- List<File> jars;
- File connector = null;
- List<String> factories;
+ private RestResponse checkConnector(Resource resourceParam) throws
JsonProcessingException {
+ // 1) get connector jar
+ FlinkConnector connectorResource;
+ List<File> jars;
+ File connector;
+ List<String> factories;
+ try {
+ File file = getResourceJar(resourceParam);
+ connector = file;
+ jars = Collections.singletonList(file);
+ } catch (Exception e) {
+ // get jarFile error
+ return buildExceptResponse(e, 1);
+ }
- Dependency dependency =
Dependency.toDependency(resourceParam.getResource());
+ // 2) parse connector Factory
+ try {
+ factories = getConnectorFactory(connector);
+ } catch (Exception e) {
+ // flink connector invalid
+ return buildExceptResponse(e, 2);
+ }
- // 1) get connector jar
- if (!dependency.getPom().isEmpty()) {
- Artifact artifact = dependency.toArtifact().get(0);
- try {
- jars = MavenTool.resolveArtifacts(artifact);
- } catch (Exception e) {
- // connector download is null
- resp.put(STATE, 1);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
- String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
- Optional<File> file = jars.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
- if (file.isPresent()) {
- connector = file.get();
- }
- } else {
- // 2) jar
- String jar = dependency.getJar().get(0).split(":")[1];
- File file = new File(jar);
- connector = file;
- jars = Collections.singletonList(file);
- }
+ // 3) get connector resource
+ connectorResource = getConnectorResource(jars, factories);
+ if (connectorResource == null) {
+ // connector is null
+ return buildExceptResponse(new RuntimeException("connector is null"), 3);
+ }
- // 2) parse connector Factory
- try {
- factories = getConnectorFactory(connector);
- } catch (Exception e) {
- // flink connector invalid
- resp.put(STATE, 2);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
+ // 2) check connector exists
+ boolean exists =
+ existsFlinkConnector(resourceParam.getId(),
connectorResource.getFactoryIdentifier());
+ if (exists) {
+ return buildExceptResponse(new RuntimeException("connector is already
exists"), 4);
+ }
- // 3) get connector resource
- connectorResource = getConnectorResource(jars, factories);
- if (connectorResource == null) {
- // connector is null
- resp.put(STATE, 3);
- return RestResponse.success().data(resp);
- }
+ if (resourceParam.getId() != null
+ && !(getById(resourceParam.getId())
+ .getResourceName()
+ .equals(connectorResource.getFactoryIdentifier()))) {
+ return buildExceptResponse(
+ new RuntimeException("resource name different with
FactoryIdentifier"), 5);
+ }
+ return RestResponse.success()
+ .data(ImmutableMap.of(STATE, 0, "connector",
JacksonUtils.write(connectorResource)));
+ }
- // 2) check connector exists
- boolean exists =
- existsFlinkConnector(resourceParam.getId(),
connectorResource.getFactoryIdentifier());
- if (exists) {
- resp.put(STATE, 4);
- resp.put("name", connectorResource.getFactoryIdentifier());
- return RestResponse.success(resp);
- }
+ private static RestResponse buildExceptResponse(Exception e, int code) {
+ return RestResponse.success()
+ .data(ImmutableMap.of(STATE, code, EXCEPTION,
ExceptionUtils.stringifyException(e)));
+ }
- if (resourceParam.getId() != null) {
- Resource resource = getById(resourceParam.getId());
- if
(!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
- resp.put(STATE, 5);
- return RestResponse.success().data(resp);
- }
- }
- resp.put(STATE, 0);
- resp.put("connector", JacksonUtils.write(connectorResource));
- return RestResponse.success().data(resp);
+ private RestResponse checkFlinkApp(Resource resourceParam) {
+ // check main.
+ File jarFile;
+ try {
+ jarFile = getResourceJar(resourceParam);
+ } catch (Exception e) {
+ // get jarFile error
+ return buildExceptResponse(e, 1);
+ }
+ ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must
exist.");
+ Map<String, Serializable> resp = new HashMap<>(0);
+ resp.put(STATE, 0);
+ if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
+ return RestResponse.success().data(resp);
+ }
+ String mainClass = Utils.getJarManClass(jarFile);
+ if (mainClass == null) {
+ // main class is null
+ return buildExceptResponse(new RuntimeException("main class is null"),
2);
}
return RestResponse.success().data(resp);
}
@@ -394,13 +370,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
URL[] array =
jars.stream()
.map(
- x -> {
- try {
- return x.toURI().toURL();
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- })
+ file -> ExceptionUtils.wrapRuntimeException(file, handle ->
handle.toURI().toURL()))
.toArray(URL[]::new);
try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
@@ -458,9 +428,9 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
String fileName = String.format("%s-%s.jar", artifact.artifactId(),
artifact.version());
Optional<File> jarFile =
files.stream().filter(x ->
x.getName().equals(fileName)).findFirst();
- if (jarFile.isPresent()) {
- return jarFile.get();
- }
+ jarFile.ifPresent(
+ file -> transferTeamResource(resource.getTeamId(),
file.getAbsolutePath()));
+ return jarFile.orElse(null);
}
return null;
}