This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new febfb6bf3  [Feature] Reference variables as placeholders in program 
args and Flink SQL (#1866)
febfb6bf3 is described below

commit febfb6bf3ce52a746397adae5c895a1941ca6e86
Author: macksonmu <[email protected]>
AuthorDate: Fri Oct 21 23:55:00 2022 +0800

     [Feature] Reference variables as placeholders in program args and Flink 
SQL (#1866)
    
    * [Feature] Reference variables as placeholders in program args and Flink 
SQL
    
    Co-authored-by: mucj7 <[email protected]>
---
 .../core/controller/FlinkSqlController.java        |   7 +-
 .../core/controller/VariableController.java        |  21 +----
 .../streampark/console/core/entity/Variable.java   |   1 -
 .../console/core/mapper/ApplicationMapper.java     |   3 +
 .../console/core/mapper/FlinkSqlMapper.java        |   3 +
 .../console/core/service/ApplicationService.java   |   2 +
 .../console/core/service/FlinkSqlService.java      |   2 +
 .../console/core/service/VariableService.java      |   6 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  20 ++++-
 .../core/service/impl/FlinkSqlServiceImpl.java     |   9 ++
 .../core/service/impl/VariableServiceImpl.java     | 100 ++++++++++++++++++++-
 .../main/resources/mapper/core/FlinkSqlMapper.xml  |   7 ++
 .../console/core/service/VariableServiceTest.java  |  86 ++++++++++++++++++
 .../src/views/system/variable/Add.vue              |   2 +-
 14 files changed, 243 insertions(+), 26 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index 0e4fe4005..2623ff2e9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -23,6 +23,7 @@ import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.SqlCompleteService;
+import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.flink.core.FlinkSqlValidationResult;
 
 import io.swagger.annotations.Api;
@@ -47,11 +48,15 @@ public class FlinkSqlController {
     @Autowired
     private FlinkSqlService flinkSqlService;
 
+    @Autowired
+    private VariableService variableService;
+
     @Autowired
     private SqlCompleteService sqlComplete;
 
     @PostMapping("verify")
-    public RestResponse verify(String sql, Long versionId) {
+    public RestResponse verify(String sql, Long versionId, Long teamId) {
+        sql = variableService.replaceVariable(teamId, sql);
         FlinkSqlValidationResult flinkSqlValidationResult = 
flinkSqlService.verifySql(sql, versionId);
         if (!flinkSqlValidationResult.success()) {
             // record error type, such as error sql, reason and error 
start/end line
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java
index 3a647fef2..c86039554 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/VariableController.java
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.console.core.controller;
 
-import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -39,8 +38,6 @@ import org.springframework.web.bind.annotation.RestController;
 import javax.validation.Valid;
 import javax.validation.constraints.NotBlank;
 
-import java.util.regex.Pattern;
-
 @Slf4j
 @Validated
 @RestController
@@ -85,18 +82,13 @@ public class VariableController {
 
     @DeleteMapping("delete")
     @RequiresPermissions("variable:delete")
-    public RestResponse deleteVariables(@Valid Variable variable) throws 
Exception {
-        this.variableService.removeById(variable);
+    public RestResponse deleteVariable(@Valid Variable variable) throws 
Exception {
+        this.variableService.deleteVariable(variable);
         return RestResponse.success();
     }
 
     @PostMapping("check/code")
     public RestResponse checkVariableCode(@RequestParam Long teamId, 
@NotBlank(message = "{required}") String variableCode) {
-        try {
-            this.checkVariableCodeFormat(variableCode);
-        } catch (ApiAlertException e) {
-            return RestResponse.fail(e.getMessage(), 
ResponseCode.CODE_FAIL_ALERT);
-        }
         boolean result = this.variableService.findByVariableCode(teamId, 
variableCode) == null;
         return RestResponse.success(result);
     }
@@ -105,13 +97,4 @@ public class VariableController {
     public RestResponse selectVariables(@RequestParam Long teamId) {
         return 
RestResponse.success().data(this.variableService.findByTeamId(teamId));
     }
-
-    private void checkVariableCodeFormat(String variableCode) {
-        if (variableCode.length() < 3 || variableCode.length() > 50) {
-            throw new ApiAlertException("Sorry, variable code length should be 
no less than 3 and no more than 50 characters.");
-        }
-        if (!Pattern.matches(formatPattern, variableCode)) {
-            throw new ApiAlertException("Sorry, variable code can only contain 
letters, numbers, middle bars, bottom bars and dots, and the beginning can only 
be letters, For example, kafka_cluster.brokers-520");
-        }
-    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
index d78636d3a..b115de667 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
@@ -42,7 +42,6 @@ public class Variable implements Serializable {
     private String variableCode;
 
     @NotBlank(message = "{required}")
-    @Size(max = 50, message = "{noMoreThan}")
     private String variableValue;
 
     @Size(max = 100, message = "{noMoreThan}")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index c16dcb0c5..5b0fe77bf 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -38,6 +38,9 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
     @Select("select * from t_flink_app where project_id=#{projectId}")
     List<Application> getByProjectId(@Param("projectId") Long projectId);
 
+    @Select("select * from t_flink_app where team_id=#{teamId}")
+    List<Application> getByTeamId(@Param("teamId") Long teamId);
+
     @Update("update t_flink_app set 
app_id=#{application.appId},job_id=#{application.jobId},state=14,end_time=null 
where id=#{application.id}")
     boolean mapping(@Param("application") Application appParam);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
index 83d3ae380..450c6c383 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
@@ -23,6 +23,8 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Param;
 import org.apache.ibatis.annotations.Select;
 
+import java.util.List;
+
 public interface FlinkSqlMapper extends BaseMapper<FlinkSql> {
     @Select("select s.* from t_flink_sql s inner join t_flink_effective e on 
s.id = e.target_id where e.target_type=2 and e.app_id=#{appId}")
     FlinkSql getEffective(@Param("appId") Long appId);
@@ -30,4 +32,5 @@ public interface FlinkSqlMapper extends BaseMapper<FlinkSql> {
     @Select("select max(`version`) as maxVersion from t_flink_sql where 
app_id=#{appId}")
     Integer getLatestVersion(@Param("appId") Long appId);
 
+    List<FlinkSql> getByTeamId(@Param("teamId") Long teamId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 34301eded..ff9e73cce 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -90,6 +90,8 @@ public interface ApplicationService extends 
IService<Application> {
 
     List<Application> getByProjectId(Long id);
 
+    List<Application> getByTeamId(Long teamId);
+
     boolean checkBuildAndUpdate(Application app);
 
     void forcedStop(Application app);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
index 888f8d84e..25c7087f4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
@@ -49,4 +49,6 @@ public interface FlinkSqlService extends IService<FlinkSql> {
     void rollback(Application application);
 
     FlinkSqlValidationResult verifySql(String sql, Long versionId);
+
+    List<FlinkSql> getByTeamId(Long teamId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
index 292cfb3f0..0526377f0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java
@@ -48,7 +48,11 @@ public interface VariableService extends IService<Variable> {
      *
      * @param variable variable
      */
-    void createVariable(Variable variable) throws Exception;
+    void createVariable(Variable variable);
+
+    void deleteVariable(Variable variable);
 
     Variable findByVariableCode(Long teamId, String variableCode);
+
+    String replaceVariable(Long teamId, String mixed);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8fc9cea5f..acaa79f10 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -76,6 +76,7 @@ import 
org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ProjectService;
 import org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
+import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.task.FlinkTrackingTask;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.IngressController;
@@ -204,6 +205,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     @Autowired
     private FlinkClusterService flinkClusterService;
 
+    @Autowired
+    private VariableService variableService;
+
     @PostConstruct
     public void resetOptionState() {
         this.baseMapper.resetOptionState();
@@ -868,6 +872,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         return baseMapper.getByProjectId(id);
     }
 
+    @Override
+    public List<Application> getByTeamId(Long teamId) {
+        return baseMapper.getByTeamId(teamId);
+    }
+
     @Override
     @RefreshCache
     public boolean checkBuildAndUpdate(Application application) {
@@ -1315,7 +1324,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
 
         if (application.isFlinkSqlJob()) {
-            FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), false);
+            FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), true);
+            // Get the sql of the replaced placeholder
+            String realSql = 
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
+            flinkSql.setSql(DeflaterUtils.zipString(realSql));
             extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), 
flinkSql.getSql());
         }
 
@@ -1364,6 +1376,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 }
             }
         }
+
+        // Get the args after placeholder replacement
+        String applicationArgs = 
variableService.replaceVariable(application.getTeamId(), application.getArgs());
+
         SubmitRequest submitRequest = new SubmitRequest(
             flinkEnv.getFlinkVersion(),
             flinkEnv.getFlinkConf(),
@@ -1377,7 +1393,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appParam.getFlameGraph() ? getFlameGraph(application) : null,
             application.getOptionMap(),
             dynamicOption,
-            application.getArgs(),
+            applicationArgs,
             buildResult,
             kubernetesSubmitParam,
             extraParameter
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index e00738f8e..66e98df5a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.EffectiveService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
+import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.flink.core.FlinkSqlValidationResult;
 import org.apache.streampark.flink.proxy.FlinkShimsProxy;
 
@@ -60,6 +61,9 @@ public class FlinkSqlServiceImpl extends 
ServiceImpl<FlinkSqlMapper, FlinkSql> i
     @Autowired
     private FlinkEnvService flinkEnvService;
 
+    @Autowired
+    private VariableService variableService;
+
     @Override
     public FlinkSql getEffective(Long appId, boolean decode) {
         FlinkSql flinkSql = baseMapper.getEffective(appId);
@@ -200,4 +204,9 @@ public class FlinkSqlServiceImpl extends 
ServiceImpl<FlinkSqlMapper, FlinkSql> i
             return null;
         });
     }
+
+    @Override
+    public List<FlinkSql> getByTeamId(Long teamId) {
+        return this.baseMapper.getByTeamId(teamId);
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
index 10cb6f15b..89081e0ab 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
@@ -17,13 +17,17 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Variable;
 import org.apache.streampark.console.core.mapper.VariableMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -31,27 +35,42 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
 public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> 
implements VariableService {
 
+    private static final Pattern PLACEHOLDER_PATTERN = 
Pattern.compile("\\$\\{([A-Za-z])+([A-Za-z0-9._-])+\\}");
+
+    private static final String PLACEHOLDER_START = "${";
+
+    private static final String PLACEHOLDER_END = "}";
+
     @Autowired
     private ApplicationService applicationService;
 
+    @Autowired
+    private FlinkSqlService flinkSqlService;
+
     @Autowired
     private CommonService commonService;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void createVariable(Variable variable) throws Exception {
+    public void createVariable(Variable variable) {
         if (this.findByVariableCode(variable.getTeamId(), 
variable.getVariableCode()) != null) {
             throw new ApiAlertException("Sorry, the variable code already 
exists.");
         }
@@ -59,6 +78,15 @@ public class VariableServiceImpl extends 
ServiceImpl<VariableMapper, Variable> i
         this.save(variable);
     }
 
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void deleteVariable(Variable variable) {
+        if (isDependByApplications(variable)) {
+            throw new ApiAlertException("Sorry, the variable is actually 
used.");
+        }
+        this.removeById(variable);
+    }
+
     @Override
     public IPage<Variable> page(Variable variable, RestRequest request) {
         if (variable.getTeamId() == null) {
@@ -79,4 +107,74 @@ public class VariableServiceImpl extends 
ServiceImpl<VariableMapper, Variable> i
     public List<Variable> findByTeamId(Long teamId) {
         return baseMapper.selectByTeamId(teamId);
     }
+
+    /**
+     * Replace variable with defined variable codes.
+     * @param teamId
+     * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return
+     */
+    @Override
+    public String replaceVariable(Long teamId, String mixed) {
+        if (StringUtils.isEmpty(mixed)) {
+            return mixed;
+        }
+        List<Variable> variables = findByTeamId(teamId);
+        if (CollectionUtils.isEmpty(variables)) {
+            return mixed;
+        }
+        Map<String, String> variableMap = 
variables.stream().collect(Collectors.toMap(Variable::getVariableCode, 
Variable::getVariableValue));
+        String restore = mixed;
+        Matcher matcher = PLACEHOLDER_PATTERN.matcher(restore);
+        while (matcher.find()) {
+            String placeholder = matcher.group();
+            String variableCode = getCodeFromPlaceholder(placeholder);
+            String variableValue = variableMap.get(variableCode);
+            if (StringUtils.isNotEmpty(variableValue)) {
+                restore = restore.replace(placeholder, variableValue);
+            }
+        }
+        return restore;
+    }
+
+    private boolean isDependByApplications(Variable variable) {
+        // Detect whether the variable is dependent on the args of the 
application
+        List<Application> applications = 
applicationService.getByTeamId(variable.getTeamId());
+        if (applications != null) {
+            for (Application app : applications) {
+                if (isDepend(variable.getVariableCode(), app.getArgs())) {
+                    return true;
+                }
+            }
+        }
+
+        // Detect whether variables are dependent on all versions of flink sql
+        List<FlinkSql> flinkSqls = 
flinkSqlService.getByTeamId(variable.getTeamId());
+        if (flinkSqls != null) {
+            for (FlinkSql flinkSql : flinkSqls) {
+                if (isDepend(variable.getVariableCode(), 
DeflaterUtils.unzipString(flinkSql.getSql()))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Determine whether variableCode is dependent on mixed.
+     * @param variableCode Variable code, e.g. "kafka.cluster"
+     * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
+     * @return If mixed can match the variableCode, return true, otherwise 
return false
+     */
+    private boolean isDepend(String variableCode, String mixed) {
+        if (StringUtils.isEmpty(mixed)) {
+            return false;
+        }
+        String placeholder = String.format("%s%s%s", PLACEHOLDER_START, 
variableCode, PLACEHOLDER_END);
+        return mixed.contains(placeholder);
+    }
+
+    private String getCodeFromPlaceholder(String placeholder) {
+        return placeholder.substring(PLACEHOLDER_START.length(), 
placeholder.length() - PLACEHOLDER_END.length());
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
index 0b991a355..5c394a3de 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
@@ -27,4 +27,11 @@
         <result column="dependency" jdbcType="VARCHAR" property="dependency"/>
         <result column="create_time" jdbcType="DATE" property="createTime"/>
     </resultMap>
+
+    <select id="getByTeamId" resultType="flinkSql" parameterType="Long">
+        select s.*
+        from t_flink_sql s, t_flink_app a
+        where s.app_id = a.id
+        and a.team_id = #{teamId}
+    </select>
 </mapper>
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
new file mode 100644
index 000000000..daf4191fb
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.StreamParkConsoleBootstrap;
+import org.apache.streampark.console.core.entity.Variable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+/**
+ * org.apache.streampark.console.core.service.VariableServiceTest
+ */
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(classes = StreamParkConsoleBootstrap.class, webEnvironment = 
SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class VariableServiceTest {
+
+    @Autowired
+    private VariableService variableService;
+
+    /**
+     * Test whether the variable will be replaced normally
+     */
+    @Test
+    public void testNormalReplace() {
+        Long teamId = 100000L;
+        Long userId = 100000L;
+        String variableCode = "collect_kafka.brokers-520room";
+        String variableVariable = "broker1:port,broker3:port,broker3:port";
+        Variable variable = new Variable();
+        variable.setVariableCode(variableCode);
+        variable.setVariableValue(variableVariable);
+        variable.setDescription("420机房采集kafka broker, 集群规模50台.");
+        variable.setCreatorId(userId);
+        variable.setTeamId(teamId);
+        variableService.save(variable);
+        Variable findVariable = variableService.findByVariableCode(teamId, 
variableCode);
+        Assertions.assertTrue(findVariable != null);
+        String paramWithPlaceholders = "--kafka.brokers ${" + variableCode + 
"}";
+        String realParam = variableService.replaceVariable(teamId, 
paramWithPlaceholders);
+        Assertions.assertEquals(realParam, "--kafka.brokers " + 
variableVariable);
+    }
+
+    /**
+     * Test whether the variable cannot be replaced normally
+     */
+    @Test
+    public void testAbnormalReplace() {
+        Long teamId = 100000L;
+        Long userId = 100000L;
+        // It contains a non-normal character '#' which should not be matched
+        String variableCode = "collect_#kafkabrokers-520room";
+        String variableVariable = "broker1:port,broker3:port,broker3:port";
+        Variable variable = new Variable();
+        variable.setVariableCode(variableCode);
+        variable.setVariableValue(variableVariable);
+        variable.setDescription("420机房采集kafka broker, 集群规模50台.");
+        variable.setCreatorId(userId);
+        variable.setTeamId(teamId);
+        variableService.save(variable);
+        Variable findVariable = variableService.findByVariableCode(teamId, 
variableCode);
+        Assertions.assertTrue(findVariable != null);
+        String paramWithPlaceholders = "--kafka.brokers ${" + variableCode + 
"}";
+        String realParam = variableService.replaceVariable(teamId, 
paramWithPlaceholders);
+        Assertions.assertNotEquals("--kafka.brokers " + variableVariable, 
realParam);
+    }
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/views/system/variable/Add.vue
 
b/streampark-console/streampark-console-webapp/src/views/system/variable/Add.vue
index d4700c14c..d838b629c 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/system/variable/Add.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/system/variable/Add.vue
@@ -47,7 +47,7 @@
         <a-textarea
           rows="4"
           placeholder="Please enter Variable Value"
-          v-decorator="['variableValue', {rules: [{ required: true, max: 100, 
message: 'please enter Variable Value' }]}]" />
+          v-decorator="['variableValue', {rules: [{ required: true, message: 
'please enter Variable Value' }]}]" />
       </a-form-item>
       <a-form-item
         label="Description"

Reply via email to