This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1380d3e4d [INLONG-5335][Manager] Unserialize Sort config when getting 
GroupInfo (#5336)
1380d3e4d is described below

commit 1380d3e4d9bb4a8d5765c40f65a3a5fe609bfcab
Author: kipshi <[email protected]>
AuthorDate: Wed Aug 3 15:36:15 2022 +0800

    [INLONG-5335][Manager] Unserialize Sort config when getting GroupInfo 
(#5336)
---
 .../service/group/InlongGroupServiceImpl.java      | 72 +++++++++++++++++++++-
 1 file changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 678551596..ebec5d8d5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.group;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
@@ -24,12 +25,15 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.auth.Authentication.AuthType;
+import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
@@ -47,6 +51,10 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sort.BaseSortConf;
+import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
+import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
+import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
@@ -104,7 +112,7 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         }
 
         // only the person in charges can update
-        List<String> inCharges = 
Arrays.asList(entity.getInCharges().split(","));
+        List<String> inCharges = 
Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
         if (!inCharges.contains(operator)) {
             LOGGER.error("user [{}] has no privilege for the inlong group", 
operator);
             throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
@@ -165,6 +173,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         List<InlongGroupExtEntity> extEntityList = 
groupExtMapper.selectByGroupId(groupId);
         List<InlongGroupExtInfo> extList = 
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
         groupInfo.setExtList(extList);
+        BaseSortConf sortConf = buildSortConfig(extList);
+        groupInfo.setSortConf(sortConf);
 
         LOGGER.debug("success to get inlong group for groupId={}", groupId);
         return groupInfo;
@@ -409,4 +419,64 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         LOGGER.info("success to save or update inlong group ext for 
groupId={}", groupId);
     }
 
+    private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
+        Map<String, String> extMap = extInfos.stream()
+                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, 
InlongGroupExtInfo::getKeyValue));
+        String type = extMap.get(InlongConstants.SORT_TYPE);
+        if (StringUtils.isBlank(type)) {
+            return null;
+        }
+        SortType sortType = SortType.forType(type);
+        switch (sortType) {
+            case FLINK:
+                return createFlinkSortConfig(extMap);
+            case USER_DEFINED:
+                return createUserDefinedSortConfig(extMap);
+            default:
+                LOGGER.warn("Unsupported sort config for sortType:{}", 
sortType);
+                return null;
+        }
+    }
+
+    private FlinkSortConf createFlinkSortConfig(Map<String, String> extMap) {
+        FlinkSortConf sortConf = new FlinkSortConf();
+        sortConf.setServiceUrl(extMap.get(InlongConstants.SORT_URL));
+        String properties = extMap.get(InlongConstants.SORT_PROPERTIES);
+        if (StringUtils.isNotBlank(properties)) {
+            sortConf.setProperties(JsonUtils.parseObject(properties,
+                    new TypeReference<Map<String, String>>() {
+                    }));
+        } else {
+            sortConf.setProperties(Maps.newHashMap());
+        }
+        String authenticationType = 
extMap.get(InlongConstants.SORT_AUTHENTICATION_TYPE);
+        if (StringUtils.isNotBlank(authenticationType)) {
+            AuthType authType = AuthType.forType(authenticationType);
+            Preconditions.checkTrue(authType == AuthType.SECRET_AND_TOKEN,
+                    "Only support SECRET_AND_TOKEN for flink sort auth");
+            String authentication = 
extMap.get(InlongConstants.SORT_AUTHENTICATION);
+            Map<String, String> authProperties = 
JsonUtils.parseObject(authentication,
+                    new TypeReference<Map<String, String>>() {
+                    });
+            SecretTokenAuthentication secretTokenAuthentication = new 
SecretTokenAuthentication();
+            secretTokenAuthentication.configure(authProperties);
+            sortConf.setAuthentication(secretTokenAuthentication);
+        }
+        return sortConf;
+    }
+
+    private UserDefinedSortConf createUserDefinedSortConfig(Map<String, 
String> extMap) {
+        UserDefinedSortConf sortConf = new UserDefinedSortConf();
+        String sortName = extMap.get(InlongConstants.SORT_NAME);
+        sortConf.setSortName(sortName);
+        String properties = extMap.get(InlongConstants.SORT_PROPERTIES);
+        if (StringUtils.isNotBlank(properties)) {
+            sortConf.setProperties(JsonUtils.parseObject(properties,
+                    new TypeReference<Map<String, String>>() {
+                    }));
+        } else {
+            sortConf.setProperties(Maps.newHashMap());
+        }
+        return sortConf;
+    }
 }

Reply via email to