This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 483517242 [INLONG-4464][Manager] Fix problems emerged from full link
path tests (#4465)
483517242 is described below
commit 483517242e2f5e9b3fcc9785dca85f8c34e144df
Author: woofyzhao <[email protected]>
AuthorDate: Thu Jun 2 19:19:56 2022 +0800
[INLONG-4464][Manager] Fix problems emerged from full link path tests
(#4465)
---
.../common/pojo/group/InlongGroupRequest.java | 2 +-
.../manager/plugin/flink/FlinkOperation.java | 11 +++++--
.../inlong/manager/plugin/flink/FlinkService.java | 17 +++++++++--
.../inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 ++
.../inlong/manager/plugin/util/FlinkUtils.java | 34 ++++++++++++++++++----
.../repository/DataProxyConfigRepository.java | 12 +++++---
.../service/sort/CreateSortConfigListenerV2.java | 4 +++
.../manager/service/sort/util/LoadNodeUtils.java | 4 +--
.../main/resources/sql/apache_inlong_manager.sql | 2 +-
.../manager-web/sql/apache_inlong_manager.sql | 2 +-
10 files changed, 71 insertions(+), 19 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
index a787d09f7..79e6c9754 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
@@ -73,7 +73,7 @@ public class InlongGroupRequest {
@ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1:
true")
@Builder.Default
- private Integer lightweight = 1;
+ private Integer lightweight = 0;
@ApiModelProperty(value = "Inlong cluster tag, which links to
inlong_cluster table")
private String inlongClusterTag;
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 384202837..a22afb221 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -27,11 +27,11 @@ import
org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import java.io.File;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.api.common.JobStatus.RUNNING;
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.findFiles;
/**
* Flink task operation, such restart or stop flink job.
@@ -44,6 +44,8 @@ public class FlinkOperation {
private static final String INLONG_MANAGER = "inlong-manager";
private static final String INLONG_SORT = "inlong-sort";
private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
+ private static final String SORT_PLUGIN = "sort-plugin" + File.separator +
"connectors";
+ private static final String CONNECTOR_JAR_PATTERN =
"^sort-connector.*jar$";
private final FlinkService flinkService;
@@ -105,10 +107,15 @@ public class FlinkOperation {
throw new Exception(message);
}
- String jarPath = findFiles(basePath, SORT_JAR_PATTERN);
+ String jarPath = FlinkUtils.findFile(basePath, SORT_JAR_PATTERN);
flinkInfo.setLocalJarPath(jarPath);
log.info("get sort jar path success, path: {}", jarPath);
+ String pluginPath = startPath + SORT_PLUGIN;
+ List<String> connectorPaths = FlinkUtils.listFiles(pluginPath,
CONNECTOR_JAR_PATTERN, -1);
+ flinkInfo.setConnectorJarPaths(connectorPaths);
+ log.info("get sort connector paths success, paths: {}",
connectorPaths);
+
if (FlinkUtils.writeConfigToFile(path, flinkInfo.getJobName(),
dataflow)) {
flinkInfo.setLocalConfPath(path + File.separator +
flinkInfo.getJobName());
} else {
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index d180c6bc7..5a8161579 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -40,13 +40,17 @@ import
org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Flink service, such as save or get flink config info, etc.
@@ -191,13 +195,22 @@ public class FlinkService {
*/
private String submitJobBySavepoint(FlinkInfo flinkInfo,
SavepointRestoreSettings settings) throws Exception {
String localJarPath = flinkInfo.getLocalJarPath();
- File jarFile = new File(localJarPath);
- String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+ final File jarFile = new File(localJarPath);
+ final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
+
+ List<URL> classPaths = flinkInfo.getConnectorJarPaths().stream().map(p
-> {
+ try {
+ return new File(p).toURI().toURL();
+ } catch (MalformedURLException e) {
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toList());
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
.setJarFile(jarFile)
+ .setUserClassPaths(classPaths)
.setArguments(programArgs)
.setSavepointRestoreSettings(settings).build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
configuration, parallelism, false);
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index 86d6d8ca1..badb339de 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -36,6 +36,8 @@ public class FlinkInfo {
private String localJarPath;
+ private List<String> connectorJarPaths;
+
private String localConfPath;
private String sourceType;
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 54f601052..25052df49 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.plugin.util;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
@@ -27,6 +28,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -77,20 +79,37 @@ public class FlinkUtils {
*
* @param baseDirName base directory name.
* @param pattern pattern of file
- * @return sort-single-tenant jar path
+ * @return sort-single-tenant jar path
*/
- public static String findFiles(String baseDirName, String pattern) {
+ public static String findFile(String baseDirName, String pattern) {
+ List<String> files = listFiles(baseDirName, pattern, 1);
+ if (CollectionUtils.isEmpty(files)) {
+ return null;
+ }
+ return files.get(0);
+ }
+
+ /**
+ * fetch target file path
+ *
+ * @param baseDirName base directory name.
+ * @param pattern pattern of file
+ * @return matched files
+ */
+ public static List<String> listFiles(String baseDirName, String pattern,
int limit) {
+ List<String> result = new ArrayList<>();
+
File baseDir = new File(baseDirName);
if (!baseDir.exists() || !baseDir.isDirectory()) {
log.error("baseDirName find fail :{}", baseDirName);
- return null;
+ return result;
}
String tempName;
File tempFile;
File[] files = baseDir.listFiles();
if (files == null || files.length == 0) {
log.info("baseDirName is empty");
- return null;
+ return result;
}
for (File file : files) {
tempFile = file;
@@ -99,10 +118,13 @@ public class FlinkUtils {
Matcher matcher = jarPathPattern.matcher(tempName);
boolean matches = matcher.matches();
if (matches) {
- return tempFile.getAbsoluteFile().toString();
+ result.add(tempFile.getAbsoluteFile().toString());
+ }
+ if (limit > 0 && result.size() >= limit) {
+ return result;
}
}
- return null;
+ return result;
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 7bc2f4dca..0f5fadb4d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.repository;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
@@ -41,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -50,8 +50,6 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.PostConstruct;
-
/**
* DataProxyConfigRepository
*/
@@ -60,9 +58,9 @@ public class DataProxyConfigRepository implements IRepository
{
public static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(SEPARATOR).trimResults()
.withKeyValueSeparator(KEY_VALUE_SEPARATOR);
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataProxyConfigRepository.class);
public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataProxyConfigRepository.class);
private static final Gson gson = new Gson();
// key: proxyClusterName, value: jsonString
@@ -136,6 +134,9 @@ public class DataProxyConfigRepository implements
IRepository {
private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new
HashMap<>();
for (CacheCluster cacheCluster :
clusterSetMapper.selectCacheCluster()) {
+ if (StringUtils.isEmpty(cacheCluster.getExtTag())) {
+ continue;
+ }
Map<String, String> tagMap =
MAP_SPLITTER.split(cacheCluster.getExtTag());
String producerTag =
tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
if (StringUtils.equalsIgnoreCase(producerTag,
Boolean.TRUE.toString())) {
@@ -290,6 +291,7 @@ public class DataProxyConfigRepository implements
IRepository {
/**
* isSubTag
+ *
* @param wholeTagMap
* @param subTagMap
* @return
@@ -306,6 +308,7 @@ public class DataProxyConfigRepository implements
IRepository {
/**
* getProxyMd5
+ *
* @param clusterName
* @return
*/
@@ -315,6 +318,7 @@ public class DataProxyConfigRepository implements
IRepository {
/**
* getProxyConfigJson
+ *
* @param clusterName
* @return
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 262a0e7e3..80678d19e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
@@ -153,6 +154,9 @@ public class CreateSortConfigListenerV2 implements
SortOperateListener {
pulsarSource.setPrimaryKey(((KafkaSource)
sourceInfo).getPrimaryKey());
}
}
+ if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
+ pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
+ }
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key ->
Lists.newArrayList())
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index fc81e9721..6307dd071 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -287,7 +287,7 @@ public class LoadNodeUtils {
return new IcebergLoadNode(id, name, fields, fieldRelationShips, null,
null, 1, properties,
dbName, tableName, null, null, uri, warehouse);
}
-
+
/**
* Create SqlServer load node based on SqlServerSink
*
@@ -325,7 +325,7 @@ public class LoadNodeUtils {
schemaName,
tablename,
primaryKey
- );
+ );
}
/**
diff --git
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index e20437a39..ac133bd38 100644
---
a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`followers` varchar(512) DEFAULT NULL COMMENT 'Name of
followers, separated by commas',
`enable_zookeeper` tinyint(2) DEFAULT '0' COMMENT 'Whether to
enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(2) DEFAULT '1' COMMENT 'Whether to
enable create resource? 0-disable, 1-enable',
- `lightweight` tinyint(2) DEFAULT '1' COMMENT 'Whether to
use lightweight mode, 0-false, 1-true',
+ `lightweight` tinyint(2) DEFAULT '0' COMMENT 'Whether to
use lightweight mode, 0-false, 1-true',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The
cluster tag, which links to inlong_cluster table',
`ext_params` text DEFAULT NULL COMMENT 'Extended
params, will be saved as JSON string, such as queue_module, partition_num,',
`status` int(4) DEFAULT '100' COMMENT 'Inlong
group status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index fe68bba78..f0b9d3a44 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`max_length` int(11) DEFAULT '10240' COMMENT 'The
maximum length of a single piece of data, unit: Byte',
`enable_zookeeper` tinyint(2) DEFAULT '0' COMMENT 'Whether to
enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(2) DEFAULT '1' COMMENT 'Whether to
enable create resource? 0-disable, 1-enable',
- `lightweight` tinyint(2) DEFAULT '1' COMMENT 'Whether to
use lightweight mode, 0-false, 1-true',
+ `lightweight` tinyint(2) DEFAULT '0' COMMENT 'Whether to
use lightweight mode, 0-false, 1-true',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The
cluster tag, which links to inlong_cluster table',
`ext_params` text DEFAULT NULL COMMENT 'Extended
params, will be saved as JSON string, such as queue_module, partition_num',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of
responsible person, separated by commas',