This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a1a564f75 [improve] improve resource management logic (#3439)
a1a564f75 is described below

commit a1a564f7513667c10c3ca0b8fa0e0b2836b3d873
Author: monrg <[email protected]>
AuthorDate: Fri Dec 29 00:32:17 2023 +0800

    [improve] improve resource management logic (#3439)
    
    Co-authored-by: monrg <[email protected]>
---
 .../streampark/common/util/ExceptionUtils.java     |  14 ++
 .../org/apache/streampark/common/util/Utils.scala  |   8 +
 .../impl/ApplicationInfoServiceImpl.java           |   4 +-
 .../core/service/impl/ResourceServiceImpl.java     | 220 +++++++++------------
 4 files changed, 118 insertions(+), 128 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
index 40deb57b5..22d93e52b 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
@@ -48,4 +48,18 @@ public class ExceptionUtils {
       return e.getClass().getName() + " (error while printing stack trace)";
     }
   }
+
+  @FunctionalInterface
+  public interface WrapperRuntimeExceptionHandler<I, O> {
+    O handle(I input) throws Exception;
+  }
+
+  public static <I, O> O wrapRuntimeException(
+      I input, WrapperRuntimeExceptionHandler<I, O> handler) {
+    try {
+      return handler.handle(input);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index f8e3a21e6..fcba212d7 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -96,6 +96,14 @@ object Utils extends Logger {
     new JarInputStream(new BufferedInputStream(new 
FileInputStream(jarFile))).getManifest
   }
 
+  def getJarManClass(jarFile: File): String = {
+    val manifest = getJarManifest(jarFile)
+    manifest.getMainAttributes.getValue("Main-Class") match {
+      case null => manifest.getMainAttributes.getValue("program-class")
+      case v => v
+    }
+  }
+
   def copyProperties(original: Properties, target: Properties): Unit =
     original.foreach(x => target.put(x._1, x._2))
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 95883a07e..36a1d05cc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -78,7 +78,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -498,8 +497,7 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
           
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
       jarFile = new File(modulePath, appParam.getJar());
     }
-    Manifest manifest = Utils.getJarManifest(jarFile);
-    return manifest.getMainAttributes().getValue("Main-Class");
+    return Utils.getJarManClass(jarFile);
   }
 
   @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index b89e57a9a..b8df85f2b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -45,10 +45,10 @@ import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.factories.Factory;
-import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -56,6 +56,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -68,7 +69,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.time.Duration;
@@ -77,12 +77,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Scanner;
 import java.util.ServiceLoader;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
-import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -95,7 +95,9 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
   public static final String EXCEPTION = "exception";
 
   @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private CommonService commonService;
+
   @Autowired private FlinkSqlService flinkSqlService;
 
   @Override
@@ -138,26 +140,28 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     ApiAlertException.throwIfTrue(
         jars.size() + poms.size() > 1, "Please do not add multi dependency at 
one time.");
 
-    if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) {
-      ApiAlertException.throwIfNull(resource.getResourceName(), "The 
resourceName is required.");
-    } else {
+    if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
       String connector = resource.getConnector();
       ApiAlertException.throwIfTrue(connector == null, "the flink connector is 
null.");
       FlinkConnector connectorResource = JacksonUtils.read(connector, 
FlinkConnector.class);
       resource.setResourceName(connectorResource.getFactoryIdentifier());
-      if (connectorResource.getRequiredOptions() != null) {
-        resource.setConnectorRequiredOptions(
-            JacksonUtils.write(connectorResource.getRequiredOptions()));
-      }
-      if (connectorResource.getOptionalOptions() != null) {
-        resource.setConnectorOptionalOptions(
-            JacksonUtils.write(connectorResource.getOptionalOptions()));
-      }
+      Optional.ofNullable(connectorResource.getRequiredOptions())
+          .ifPresent(
+              v ->
+                  resource.setConnectorRequiredOptions(
+                      ExceptionUtils.wrapRuntimeException(v, 
JacksonUtils::write)));
+      Optional.ofNullable(connectorResource.getOptionalOptions())
+          .ifPresent(
+              v ->
+                  resource.setConnectorOptionalOptions(
+                      ExceptionUtils.wrapRuntimeException(v, 
JacksonUtils::write)));
+    } else {
+      ApiAlertException.throwIfNull(resource.getResourceName(), "The 
resourceName is required.");
     }
 
     ApiAlertException.throwIfTrue(
         this.findByResourceName(resource.getTeamId(), 
resource.getResourceName()) != null,
-        String.format("Sorry, the resource %s already exists.", 
resource.getResourceName()));
+        String.format("the resource %s already exists, please check.", 
resource.getResourceName()));
 
     if (!jars.isEmpty()) {
       String resourcePath = jars.get(0);
@@ -251,15 +255,8 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
   @Override
   public String upload(MultipartFile file) throws IOException {
     File temp = WebUtils.getAppTempDir();
-
-    String name = file.getOriginalFilename();
-    String suffix = name.substring(name.lastIndexOf("."));
-
-    String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
-    String fileName = sha256Hex.concat(suffix);
-
+    String fileName = 
FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
     File saveFile = new File(temp, fileName);
-
     if (!saveFile.exists()) {
       // save file to temp dir
       try {
@@ -268,114 +265,93 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
         throw new ApiDetailException(e);
       }
     }
-
     return saveFile.getAbsolutePath();
   }
 
   @Override
   public RestResponse checkResource(Resource resourceParam) throws 
JsonProcessingException {
     ResourceTypeEnum type = resourceParam.getResourceType();
-    Map<String, Serializable> resp = new HashMap<>(0);
-    resp.put(STATE, 0);
     switch (type) {
       case FLINK_APP:
-        // check main.
-        File jarFile;
-        try {
-          jarFile = getResourceJar(resourceParam);
-        } catch (Exception e) {
-          // get jarFile error
-          resp.put(STATE, 1);
-          resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-          return RestResponse.success().data(resp);
-        }
-        if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
-          return RestResponse.success().data(resp);
-        }
-        Manifest manifest = Utils.getJarManifest(jarFile);
-        String mainClass = manifest.getMainAttributes().getValue("Main-Class");
-
-        if (mainClass == null) {
-          // main class is null
-          resp.put(STATE, 2);
-          return RestResponse.success().data(resp);
-        }
-        return RestResponse.success().data(resp);
+        return checkFlinkApp(resourceParam);
       case CONNECTOR:
-        // 1) get connector id
-        FlinkConnector connectorResource;
-
-        ApiAlertException.throwIfFalse(
-            ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(),
-            "getConnectorId method error, resource not flink connector.");
+        return checkConnector(resourceParam);
+    }
+    return RestResponse.success().data(ImmutableMap.of(STATE, 0));
+  }
 
-        List<File> jars;
-        File connector = null;
-        List<String> factories;
+  private RestResponse checkConnector(Resource resourceParam) throws 
JsonProcessingException {
+    // 1) get connector jar
+    FlinkConnector connectorResource;
+    List<File> jars;
+    File connector;
+    List<String> factories;
+    try {
+      File file = getResourceJar(resourceParam);
+      connector = file;
+      jars = Collections.singletonList(file);
+    } catch (Exception e) {
+      // get jarFile error
+      return buildExceptResponse(e, 1);
+    }
 
-        Dependency dependency = 
Dependency.toDependency(resourceParam.getResource());
+    // 2) parse connector Factory
+    try {
+      factories = getConnectorFactory(connector);
+    } catch (Exception e) {
+      // flink connector invalid
+      return buildExceptResponse(e, 2);
+    }
 
-        // 1) get connector jar
-        if (!dependency.getPom().isEmpty()) {
-          Artifact artifact = dependency.toArtifact().get(0);
-          try {
-            jars = MavenTool.resolveArtifacts(artifact);
-          } catch (Exception e) {
-            // connector download is null
-            resp.put(STATE, 1);
-            resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-            return RestResponse.success().data(resp);
-          }
-          String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
-          Optional<File> file = jars.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
-          if (file.isPresent()) {
-            connector = file.get();
-          }
-        } else {
-          // 2) jar
-          String jar = dependency.getJar().get(0).split(":")[1];
-          File file = new File(jar);
-          connector = file;
-          jars = Collections.singletonList(file);
-        }
+    // 3) get connector resource
+    connectorResource = getConnectorResource(jars, factories);
+    if (connectorResource == null) {
+      // connector is null
+      return buildExceptResponse(new RuntimeException("connector is null"), 3);
+    }
 
-        // 2) parse connector Factory
-        try {
-          factories = getConnectorFactory(connector);
-        } catch (Exception e) {
-          // flink connector invalid
-          resp.put(STATE, 2);
-          resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-          return RestResponse.success().data(resp);
-        }
+    // 2) check connector exists
+    boolean exists =
+        existsFlinkConnector(resourceParam.getId(), 
connectorResource.getFactoryIdentifier());
+    if (exists) {
+      return buildExceptResponse(new RuntimeException("connector is already 
exists"), 4);
+    }
 
-        // 3) get connector resource
-        connectorResource = getConnectorResource(jars, factories);
-        if (connectorResource == null) {
-          // connector is null
-          resp.put(STATE, 3);
-          return RestResponse.success().data(resp);
-        }
+    if (resourceParam.getId() != null
+        && !(getById(resourceParam.getId())
+            .getResourceName()
+            .equals(connectorResource.getFactoryIdentifier()))) {
+      return buildExceptResponse(
+          new RuntimeException("resource name different with 
FactoryIdentifier"), 5);
+    }
+    return RestResponse.success()
+        .data(ImmutableMap.of(STATE, 0, "connector", 
JacksonUtils.write(connectorResource)));
+  }
 
-        // 2) check connector exists
-        boolean exists =
-            existsFlinkConnector(resourceParam.getId(), 
connectorResource.getFactoryIdentifier());
-        if (exists) {
-          resp.put(STATE, 4);
-          resp.put("name", connectorResource.getFactoryIdentifier());
-          return RestResponse.success(resp);
-        }
+  private static RestResponse buildExceptResponse(Exception e, int code) {
+    return RestResponse.success()
+        .data(ImmutableMap.of(STATE, code, EXCEPTION, 
ExceptionUtils.stringifyException(e)));
+  }
 
-        if (resourceParam.getId() != null) {
-          Resource resource = getById(resourceParam.getId());
-          if 
(!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
-            resp.put(STATE, 5);
-            return RestResponse.success().data(resp);
-          }
-        }
-        resp.put(STATE, 0);
-        resp.put("connector", JacksonUtils.write(connectorResource));
-        return RestResponse.success().data(resp);
+  private RestResponse checkFlinkApp(Resource resourceParam) {
+    // check main.
+    File jarFile;
+    try {
+      jarFile = getResourceJar(resourceParam);
+    } catch (Exception e) {
+      // get jarFile error
+      return buildExceptResponse(e, 1);
+    }
+    ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must 
exist.");
+    Map<String, Serializable> resp = new HashMap<>(0);
+    resp.put(STATE, 0);
+    if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
+      return RestResponse.success().data(resp);
+    }
+    String mainClass = Utils.getJarManClass(jarFile);
+    if (mainClass == null) {
+      // main class is null
+      return buildExceptResponse(new RuntimeException("main class is null"), 
2);
     }
     return RestResponse.success().data(resp);
   }
@@ -394,13 +370,7 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     URL[] array =
         jars.stream()
             .map(
-                x -> {
-                  try {
-                    return x.toURI().toURL();
-                  } catch (MalformedURLException e) {
-                    throw new RuntimeException(e);
-                  }
-                })
+                file -> ExceptionUtils.wrapRuntimeException(file, handle -> 
handle.toURI().toURL()))
             .toArray(URL[]::new);
 
     try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
@@ -458,9 +428,9 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
         String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
         Optional<File> jarFile =
             files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
-        if (jarFile.isPresent()) {
-          return jarFile.get();
-        }
+        jarFile.ifPresent(
+            file -> transferTeamResource(resource.getTeamId(), 
file.getAbsolutePath()));
+        return jarFile.orElse(null);
       }
       return null;
     }

Reply via email to