This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1380d3e4d [INLONG-5335][Manager] Unserialize Sort config when getting
GroupInfo (#5336)
1380d3e4d is described below
commit 1380d3e4d9bb4a8d5765c40f65a3a5fe609bfcab
Author: kipshi <[email protected]>
AuthorDate: Wed Aug 3 15:36:15 2022 +0800
[INLONG-5335][Manager] Unserialize Sort config when getting GroupInfo
(#5336)
---
.../service/group/InlongGroupServiceImpl.java | 72 +++++++++++++++++++++-
1 file changed, 71 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 678551596..ebec5d8d5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.group;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@@ -24,12 +25,15 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.auth.Authentication.AuthType;
+import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
@@ -47,6 +51,10 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sort.BaseSortConf;
+import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
+import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
+import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
@@ -104,7 +112,7 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
}
// only the person in charges can update
- List<String> inCharges =
Arrays.asList(entity.getInCharges().split(","));
+ List<String> inCharges =
Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
if (!inCharges.contains(operator)) {
LOGGER.error("user [{}] has no privilege for the inlong group",
operator);
throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
@@ -165,6 +173,8 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
List<InlongGroupExtEntity> extEntityList =
groupExtMapper.selectByGroupId(groupId);
List<InlongGroupExtInfo> extList =
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
groupInfo.setExtList(extList);
+ BaseSortConf sortConf = buildSortConfig(extList);
+ groupInfo.setSortConf(sortConf);
LOGGER.debug("success to get inlong group for groupId={}", groupId);
return groupInfo;
@@ -409,4 +419,64 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
LOGGER.info("success to save or update inlong group ext for
groupId={}", groupId);
}
+ private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
+ Map<String, String> extMap = extInfos.stream()
+ .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName,
InlongGroupExtInfo::getKeyValue));
+ String type = extMap.get(InlongConstants.SORT_TYPE);
+ if (StringUtils.isBlank(type)) {
+ return null;
+ }
+ SortType sortType = SortType.forType(type);
+ switch (sortType) {
+ case FLINK:
+ return createFlinkSortConfig(extMap);
+ case USER_DEFINED:
+ return createUserDefinedSortConfig(extMap);
+ default:
+ LOGGER.warn("Unsupported sort config for sortType:{}",
sortType);
+ return null;
+ }
+ }
+
+ private FlinkSortConf createFlinkSortConfig(Map<String, String> extMap) {
+ FlinkSortConf sortConf = new FlinkSortConf();
+ sortConf.setServiceUrl(extMap.get(InlongConstants.SORT_URL));
+ String properties = extMap.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotBlank(properties)) {
+ sortConf.setProperties(JsonUtils.parseObject(properties,
+ new TypeReference<Map<String, String>>() {
+ }));
+ } else {
+ sortConf.setProperties(Maps.newHashMap());
+ }
+ String authenticationType =
extMap.get(InlongConstants.SORT_AUTHENTICATION_TYPE);
+ if (StringUtils.isNotBlank(authenticationType)) {
+ AuthType authType = AuthType.forType(authenticationType);
+ Preconditions.checkTrue(authType == AuthType.SECRET_AND_TOKEN,
+ "Only support SECRET_AND_TOKEN for flink sort auth");
+ String authentication =
extMap.get(InlongConstants.SORT_AUTHENTICATION);
+ Map<String, String> authProperties =
JsonUtils.parseObject(authentication,
+ new TypeReference<Map<String, String>>() {
+ });
+ SecretTokenAuthentication secretTokenAuthentication = new
SecretTokenAuthentication();
+ secretTokenAuthentication.configure(authProperties);
+ sortConf.setAuthentication(secretTokenAuthentication);
+ }
+ return sortConf;
+ }
+
+ private UserDefinedSortConf createUserDefinedSortConfig(Map<String,
String> extMap) {
+ UserDefinedSortConf sortConf = new UserDefinedSortConf();
+ String sortName = extMap.get(InlongConstants.SORT_NAME);
+ sortConf.setSortName(sortName);
+ String properties = extMap.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotBlank(properties)) {
+ sortConf.setProperties(JsonUtils.parseObject(properties,
+ new TypeReference<Map<String, String>>() {
+ }));
+ } else {
+ sortConf.setProperties(Maps.newHashMap());
+ }
+ return sortConf;
+ }
}