This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 483517242 [INLONG-4464][Manager] Fix problems emerged from full link 
path tests (#4465)
483517242 is described below

commit 483517242e2f5e9b3fcc9785dca85f8c34e144df
Author: woofyzhao <[email protected]>
AuthorDate: Thu Jun 2 19:19:56 2022 +0800

    [INLONG-4464][Manager] Fix problems emerged from full link path tests 
(#4465)
---
 .../common/pojo/group/InlongGroupRequest.java      |  2 +-
 .../manager/plugin/flink/FlinkOperation.java       | 11 +++++--
 .../inlong/manager/plugin/flink/FlinkService.java  | 17 +++++++++--
 .../inlong/manager/plugin/flink/dto/FlinkInfo.java |  2 ++
 .../inlong/manager/plugin/util/FlinkUtils.java     | 34 ++++++++++++++++++----
 .../repository/DataProxyConfigRepository.java      | 12 +++++---
 .../service/sort/CreateSortConfigListenerV2.java   |  4 +++
 .../manager/service/sort/util/LoadNodeUtils.java   |  4 +--
 .../main/resources/sql/apache_inlong_manager.sql   |  2 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  2 +-
 10 files changed, 71 insertions(+), 19 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
index a787d09f7..79e6c9754 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
@@ -73,7 +73,7 @@ public class InlongGroupRequest {
 
     @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: 
true")
     @Builder.Default
-    private Integer lightweight = 1;
+    private Integer lightweight = 0;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to 
inlong_cluster table")
     private String inlongClusterTag;
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 384202837..a22afb221 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -27,11 +27,11 @@ import 
org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
 import org.apache.inlong.manager.plugin.util.FlinkUtils;
 
 import java.io.File;
+import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.api.common.JobStatus.RUNNING;
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
 
 /**
  * Flink task operation, such restart or stop flink job.
@@ -44,6 +44,8 @@ public class FlinkOperation {
     private static final String INLONG_MANAGER = "inlong-manager";
     private static final String INLONG_SORT = "inlong-sort";
     private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
+    private static final String SORT_PLUGIN = "sort-plugin" + File.separator + 
"connectors";
+    private static final String CONNECTOR_JAR_PATTERN = 
"^sort-connector.*jar$";
 
     private final FlinkService flinkService;
 
@@ -105,10 +107,15 @@ public class FlinkOperation {
             throw new Exception(message);
         }
 
-        String jarPath = findFiles(basePath, SORT_JAR_PATTERN);
+        String jarPath = FlinkUtils.findFile(basePath, SORT_JAR_PATTERN);
         flinkInfo.setLocalJarPath(jarPath);
         log.info("get sort jar path success, path: {}", jarPath);
 
+        String pluginPath = startPath + SORT_PLUGIN;
+        List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, 
CONNECTOR_JAR_PATTERN, -1);
+        flinkInfo.setConnectorJarPaths(connectorPaths);
+        log.info("get sort connector paths success, paths: {}", 
connectorPaths);
+
         if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(), 
dataflow)) {
             flinkInfo.setLocalConfPath(path + File.separator + 
flinkInfo.getJobName());
         } else {
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index d180c6bc7..5a8161579 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -40,13 +40,17 @@ import 
org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
 
 import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Flink service, such as save or get flink config info, etc.
@@ -191,13 +195,22 @@ public class FlinkService {
      */
     private String submitJobBySavepoint(FlinkInfo flinkInfo, 
SavepointRestoreSettings settings) throws Exception {
         String localJarPath = flinkInfo.getLocalJarPath();
-        File jarFile = new File(localJarPath);
-        String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+        final File jarFile = new File(localJarPath);
+        final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+
+        List<URL> classPaths = flinkInfo.getConnectorJarPaths().stream().map(p 
-> {
+            try {
+                return new File(p).toURI().toURL();
+            } catch (MalformedURLException e) {
+                return null;
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
 
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
                 .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
                 .setJarFile(jarFile)
+                .setUserClassPaths(classPaths)
                 .setArguments(programArgs)
                 .setSavepointRestoreSettings(settings).build();
         JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, 
configuration, parallelism, false);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index 86d6d8ca1..badb339de 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -36,6 +36,8 @@ public class FlinkInfo {
 
     private String localJarPath;
 
+    private List<String> connectorJarPaths;
+
     private String localConfPath;
 
     private String sourceType;
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 54f601052..25052df49 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.plugin.util;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 
@@ -27,6 +28,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -77,20 +79,37 @@ public class FlinkUtils {
      *
      * @param baseDirName base directory name.
      * @param pattern pattern of file
-     * @return  sort-single-tenant jar path
+     * @return sort-single-tenant jar path
      */
-    public static String findFiles(String baseDirName, String pattern) {
+    public static String findFile(String baseDirName, String pattern) {
+        List<String> files = listFiles(baseDirName, pattern, 1);
+        if (CollectionUtils.isEmpty(files)) {
+            return null;
+        }
+        return files.get(0);
+    }
+
+    /**
+     * fetch target file path
+     *
+     * @param baseDirName base directory name.
+     * @param pattern pattern of file
+     * @return matched files
+     */
+    public static List<String> listFiles(String baseDirName, String pattern, 
int limit) {
+        List<String> result = new ArrayList<>();
+
         File baseDir = new File(baseDirName);
         if (!baseDir.exists() || !baseDir.isDirectory()) {
             log.error("baseDirName find fail :{}", baseDirName);
-            return null;
+            return result;
         }
         String tempName;
         File tempFile;
         File[] files = baseDir.listFiles();
         if (files == null || files.length == 0) {
             log.info("baseDirName is empty");
-            return null;
+            return result;
         }
         for (File file : files) {
             tempFile = file;
@@ -99,10 +118,13 @@ public class FlinkUtils {
             Matcher matcher = jarPathPattern.matcher(tempName);
             boolean matches = matcher.matches();
             if (matches) {
-                return tempFile.getAbsoluteFile().toString();
+                result.add(tempFile.getAbsoluteFile().toString());
+            }
+            if (limit > 0 && result.size() >= limit) {
+                return result;
             }
         }
-        return null;
+        return result;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 7bc2f4dca..0f5fadb4d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.repository;
 
 import com.google.common.base.Splitter;
 import com.google.gson.Gson;
-
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
@@ -41,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -50,8 +50,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.annotation.PostConstruct;
-
 /**
  * DataProxyConfigRepository
  */
@@ -60,9 +58,9 @@ public class DataProxyConfigRepository implements IRepository 
{
 
     public static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(SEPARATOR).trimResults()
             .withKeyValueSeparator(KEY_VALUE_SEPARATOR);
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataProxyConfigRepository.class);
     public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
     public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataProxyConfigRepository.class);
     private static final Gson gson = new Gson();
 
     // key: proxyClusterName, value: jsonString
@@ -136,6 +134,9 @@ public class DataProxyConfigRepository implements 
IRepository {
     private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
         Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new 
HashMap<>();
         for (CacheCluster cacheCluster : 
clusterSetMapper.selectCacheCluster()) {
+            if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
+                continue;
+            }
             Map<String, String> tagMap = 
MAP_SPLITTER.split(cacheCluster.getExtTag());
             String producerTag = 
tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
             if (StringUtils.equalsIgnoreCase(producerTag, 
Boolean.TRUE.toString())) {
@@ -290,6 +291,7 @@ public class DataProxyConfigRepository implements 
IRepository {
 
     /**
      * isSubTag
+     *
      * @param wholeTagMap
      * @param subTagMap
      * @return
@@ -306,6 +308,7 @@ public class DataProxyConfigRepository implements 
IRepository {
 
     /**
      * getProxyMd5
+     *
      * @param clusterName
      * @return
      */
@@ -315,6 +318,7 @@ public class DataProxyConfigRepository implements 
IRepository {
 
     /**
      * getProxyConfigJson
+     *
      * @param clusterName
      * @return
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 262a0e7e3..80678d19e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
@@ -153,6 +154,9 @@ public class CreateSortConfigListenerV2 implements 
SortOperateListener {
                     pulsarSource.setPrimaryKey(((KafkaSource) 
sourceInfo).getPrimaryKey());
                 }
             }
+            if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
+                pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
+            }
             
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key -> 
Lists.newArrayList())
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index fc81e9721..6307dd071 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -287,7 +287,7 @@ public class LoadNodeUtils {
         return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, 
null, 1, properties,
                 dbName, tableName, null, null, uri, warehouse);
     }
-  
+
     /**
      * Create SqlServer load node based on SqlServerSink
      *
@@ -325,7 +325,7 @@ public class LoadNodeUtils {
                 schemaName,
                 tablename,
                 primaryKey
-                );
+        );
     }
 
     /**
diff --git 
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index e20437a39..ac133bd38 100644
--- 
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `followers`              varchar(512)      DEFAULT NULL COMMENT 'Name of 
followers, separated by commas',
     `enable_zookeeper`       tinyint(2)        DEFAULT '0' COMMENT 'Whether to 
enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(2)        DEFAULT '1' COMMENT 'Whether to 
enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(2)        DEFAULT '1' COMMENT 'Whether to 
use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(2)        DEFAULT '0' COMMENT 'Whether to 
use lightweight mode, 0-false, 1-true',
     `inlong_cluster_tag`     varchar(128)      DEFAULT NULL COMMENT 'The 
cluster tag, which links to inlong_cluster table',
     `ext_params`             text              DEFAULT NULL COMMENT 'Extended 
params, will be saved as JSON string, such as queue_module, partition_num,',
     `status`                 int(4)            DEFAULT '100' COMMENT 'Inlong 
group status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index fe68bba78..f0b9d3a44 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `max_length`             int(11)           DEFAULT '10240' COMMENT 'The 
maximum length of a single piece of data, unit: Byte',
     `enable_zookeeper`       tinyint(2)        DEFAULT '0' COMMENT 'Whether to 
enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(2)        DEFAULT '1' COMMENT 'Whether to 
enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(2)        DEFAULT '1' COMMENT 'Whether to 
use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(2)        DEFAULT '0' COMMENT 'Whether to 
use lightweight mode, 0-false, 1-true',
     `inlong_cluster_tag`     varchar(128)      DEFAULT NULL COMMENT 'The 
cluster tag, which links to inlong_cluster table',
     `ext_params`             text              DEFAULT NULL COMMENT 'Extended 
params, will be saved as JSON string, such as queue_module, partition_num',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of 
responsible person, separated by commas',

Reply via email to