This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bc1a6b4bd2 [Feature] Support config variable substitution with default
value (#7562)
bc1a6b4bd2 is described below
commit bc1a6b4bd217753e92ef73cf9b341d6a056944bb
Author: Jast <[email protected]>
AuthorDate: Tue Sep 10 21:00:53 2024 +0800
[Feature] Support config variable substitution with default value (#7562)
---
docs/en/concept/config.md | 69 +++---
docs/zh/concept/config.md | 55 +++--
.../seatunnel/api/sink/TablePlaceholder.java | 233 ++++-----------------
...eholder.java => TablePlaceholderProcessor.java} | 88 +++-----
.../api/table/factory/TableSinkFactoryContext.java | 4 +-
...est.java => TablePlaceholderProcessorTest.java} | 12 +-
.../seatunnel/common/utils/PlaceholderUtils.java | 51 +++++
seatunnel-core/seatunnel-core-starter/pom.xml | 7 +
.../core/starter/utils/ConfigBuilder.java | 106 +++++++++-
.../core/starter/utils/ConfigShadeTest.java | 68 ++++++
.../config_variables_with_default_value.conf | 52 +++++
...config_variables_with_reserved_placeholder.conf | 52 +++++
.../seatunnel/engine/e2e/UserVariableIT.java | 15 ++
...ke_to_console_with_default_value.variables.conf | 52 +++++
14 files changed, 555 insertions(+), 309 deletions(-)
diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md
index 70fb551526..defc308646 100644
--- a/docs/en/concept/config.md
+++ b/docs/en/concept/config.md
@@ -205,8 +205,26 @@ sql = """ select * from "table" """
## Config Variable Substitution
-In config file we can define some variables and replace it in run time. **This
is only support `hocon` format file**.
+In a config file, we can define variables and replace them at runtime.
However, note that only HOCON format files are supported.
+### Usage of Variables:
+- `${varName}`: If the variable is not provided, an exception will be thrown.
+- `${varName:default}`: If the variable is not provided, the default value
will be used. If you set a default value, it should be enclosed in double
quotes.
+- `${varName:}`: If the variable is not provided, an empty string will be used.
+
+If you do not set the variable value through `-i`, you can also pass the value
by setting the system environment variables. Variable substitution supports
obtaining variable values through environment variables.
+For example, you can set the environment variable in the shell script as
follows:
+```shell
+export varName="value with space"
+```
+Then you can use the variable in the config file.
+
+If you set a variable without default value in the config file but do not pass
it during execution, an exception will be thrown. Example:
+```shell
+Caused by:
org.apache.seatunnel.core.starter.exception.CommandExecuteException: Variable
substitution error: ${resName}_table
+```
+
+### Example:
```hocon
env {
job.mode = "BATCH"
@@ -216,14 +234,14 @@ env {
source {
FakeSource {
- result_table_name = ${resName}
- row.num = ${rowNum}
+ result_table_name = "${resName:fake_test}_table"
+ row.num = "${rowNum:50}"
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
- name = ${nameType}
- age = "int"
+ name = "${nameType:string}"
+ age = ${ageType}
}
}
}
@@ -231,9 +249,9 @@ source {
transform {
sql {
- source_table_name = "fake"
+ source_table_name = "${resName:fake_test}_table"
result_table_name = "sql"
- query = "select * from "${resName}" where name = '"${nameVal}"' "
+ query = "select * from ${resName:fake_test}_table where name =
'${nameVal}' "
}
}
@@ -245,26 +263,24 @@ sink {
password = ${password}
}
}
-
```
-In the above config, we define some variables, like `${rowNum}`, `${resName}`.
-We can replace those parameters with this shell command:
+In the configuration above, we have defined several variables like
`${rowNum}`, `${resName}`. We can replace these parameters using the following
shell command:
```shell
./bin/seatunnel.sh -c <this_config_file>
-i jobName='this_is_a_job_name'
--i resName=fake
--i rowNum=10
-i strTemplate=['abc','d~f','hi']
--i nameType=string
+-i ageType=int
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
-m local
```
-Then the final submitted config is:
+In this case, `resName`, `rowNum`, and `nameType` are not set, so they will
take their default values.
+
+The final submitted configuration would be:
```hocon
env {
@@ -275,13 +291,13 @@ env {
source {
FakeSource {
- result_table_name = "fake"
- row.num = 10
- string.template = ["abc","d~f","h i"]
+ result_table_name = "fake_test_table"
+ row.num = 50
+ string.template = ['abc','d~f','hi']
int.template = [20, 21]
schema = {
fields {
- name = string
+ name = "string"
age = "int"
}
}
@@ -290,9 +306,9 @@ source {
transform {
sql {
- source_table_name = "fake"
+ source_table_name = "fake_test_table"
result_table_name = "sql"
- query = "select * from fake where name = 'abc' "
+ query = "select * from fake_test_table where name = 'abc' "
}
}
@@ -302,15 +318,16 @@ sink {
source_table_name = "sql"
username = "seatunnel=2.3.1"
password = "$a^b%c.d~e0*9("
- }
+ }
}
```
-Some Notes:
-- Quota with `'` if the value has special character such as `(`
-- If the replacement variables is in `"` or `'`, like `resName` and `nameVal`,
you need add `"`
-- The value can't have space `' '`, like `-i jobName='this is a job name' `,
this will be replaced to `job.name = "this"`
-- If you want to use dynamic parameters, you can use the following format: -i
date=$(date +"%Y%m%d").
+### Important Notes:
+- If a value contains special characters like `(`, enclose it in single quotes
(`'`).
+- If the substitution variable contains double or single quotes (e.g.,
`"resName"` or `"nameVal"`), you need to include them with the value.
+- The value cannot contain spaces (`' '`). For example, `-i jobName='this is a
job name'` will be replaced with `job.name = "this"`. You can use environment
variables to pass values with spaces.
+- For dynamic parameters, you can use the following format: `-i date=$(date
+"%Y%m%d")`.
+- Cannot use specified system reserved characters; they will not be replaced
by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`,
`${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`,
`${field_names}`. For details, please refer to [Sink Parameter
Placeholders](sink-options-placeholders.md).
## What's More
diff --git a/docs/zh/concept/config.md b/docs/zh/concept/config.md
index 7a6390969c..1456f91f29 100644
--- a/docs/zh/concept/config.md
+++ b/docs/zh/concept/config.md
@@ -192,6 +192,25 @@ sql = """ select * from "table" """
在配置文件中,我们可以定义一些变量并在运行时替换它们。但是注意仅支持 hocon 格式的文件。
+变量使用方法:
+ - `${varName}`,如果变量未传值,则抛出异常。
+ - `${varName:default}`,如果变量未传值,则使用默认值。如果设置默认值则变量需要写在双引号中。
+ - `${varName:}`,如果变量未传值,则使用空字符串。
+
+如果您不通过`-i`设置变量值,也可以通过设置系统的环境变量传值,变量替换支持通过环境变量获取变量值。
+例如,您可以在shell脚本中设置环境变量如下:
+```shell
+export varName="value with space"
+```
+然后您可以在配置文件中使用变量。
+
+如果您在配置文件中设置了没有默认值的变量,但在执行中未传递,则会抛出异常。
+如:
+```shell
+Caused by:
org.apache.seatunnel.core.starter.exception.CommandExecuteException: Variable
substitution error: ${resName}_table
+```
+
+具体样例:
```hocon
env {
job.mode = "BATCH"
@@ -201,14 +220,14 @@ env {
source {
FakeSource {
- result_table_name = ${resName}
- row.num = ${rowNum}
+ result_table_name = "${resName:fake_test}_table"
+ row.num = "${rowNum:50}"
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
- name = ${nameType}
- age = "int"
+ name = "${nameType:string}"
+ age = ${ageType}
}
}
}
@@ -216,9 +235,9 @@ source {
transform {
sql {
- source_table_name = "fake"
+ source_table_name = "${resName:fake_test}_table"
result_table_name = "sql"
- query = "select * from "${resName}" where name = '"${nameVal}"' "
+ query = "select * from ${resName:fake_test}_table where name =
'${nameVal}' "
}
}
@@ -230,7 +249,6 @@ sink {
password = ${password}
}
}
-
```
在上述配置中,我们定义了一些变量,如 ${rowNum}、${resName}。
@@ -239,16 +257,17 @@ sink {
```shell
./bin/seatunnel.sh -c <this_config_file>
-i jobName='this_is_a_job_name'
--i resName=fake
--i rowNum=10
-i strTemplate=['abc','d~f','hi']
--i nameType=string
+-i ageType=int
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
--e local
+-m local
```
+其中 `resName`,`rowNum`,`nameType` 我们未设置,他将获取默认值
+
+
然后最终提交的配置是:
```hocon
@@ -260,8 +279,8 @@ env {
source {
FakeSource {
- result_table_name = "fake"
- row.num = 10
+ result_table_name = "fake_test_table"
+ row.num = 50
string.template = ['abc','d~f','hi']
int.template = [20, 21]
schema = {
@@ -275,9 +294,9 @@ source {
transform {
sql {
- source_table_name = "fake"
+ source_table_name = "fake_test_table"
result_table_name = "sql"
- query = "select * from "fake" where name = 'abc' "
+ query = "select * from fake_test_table where name = 'abc' "
}
}
@@ -286,7 +305,7 @@ sink {
Console {
source_table_name = "sql"
username = "seatunnel=2.3.1"
- password = "$a^b%c.d~e0*9("
+ password = "$a^b%c.d~e0*9("
}
}
@@ -296,9 +315,9 @@ sink {
- 如果值包含特殊字符,如`(`,请使用`'`引号将其括起来。
- 如果替换变量包含`"`或`'`(如`"resName"`和`"nameVal"`),需要添加`"`。
-- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。
+- 值不能包含空格`' '`。例如, `-i jobName='this is a job name'`将被替换为`job.name = "this"`。
你可以使用环境变量传递带有空格的值。
- 如果要使用动态参数,可以使用以下格式: `-i date=$(date +"%Y%m%d")`。
-
+-
不能使用指定系统保留字符,它将不会被`-i`替换,如:`${database_name}`、`${schema_name}`、`${table_name}`、`${schema_full_name}`、`${table_full_name}`、`${primary_key}`、`${unique_key}`、`${field_names}`。具体可参考[Sink参数占位符](sink-options-placeholders.md)
## 此外
如果你想了解更多关于格式配置的详细信息,请查看
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)。
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
index f599e22135..2f78ce89e3 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
@@ -17,211 +17,48 @@
package org.apache.seatunnel.api.sink;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-
-import org.apache.commons.lang3.ObjectUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public class TablePlaceholder {
- // Placeholder ${database_name} or ${database_name:default_value}
- public static final String REPLACE_DATABASE_NAME_KEY = "database_name";
- // Placeholder ${schema_name} or ${schema_name:default_value}
- public static final String REPLACE_SCHEMA_NAME_KEY = "schema_name";
- // Placeholder ${schema_full_name} or ${schema_full_name:default_value}
- public static final String REPLACE_SCHEMA_FULL_NAME_KEY =
"schema_full_name";
- // Placeholder ${table_name} or ${table_name:default_value}
- public static final String REPLACE_TABLE_NAME_KEY = "table_name";
- // Placeholder ${table_full_name} or ${table_full_name:default_value}
- public static final String REPLACE_TABLE_FULL_NAME_KEY = "table_full_name";
- // Placeholder ${primary_key} or ${primary_key:default_value}
- public static final String REPLACE_PRIMARY_KEY = "primary_key";
- // Placeholder ${unique_key} or ${unique_key:default_value}
- public static final String REPLACE_UNIQUE_KEY = "unique_key";
- // Placeholder ${field_names} or ${field_names:default_value}
- public static final String REPLACE_FIELD_NAMES_KEY = "field_names";
- public static final String NAME_DELIMITER = ".";
- public static final String FIELD_DELIMITER = ",";
-
- private static String replacePlaceholders(String input, String
placeholderName, String value) {
- return replacePlaceholders(input, placeholderName, value, null);
- }
-
- private static String replacePlaceholders(
- String input, String placeholderName, String value, String
defaultValue) {
- String placeholderRegex = "\\$\\{" + Pattern.quote(placeholderName) +
"(:[^}]*)?\\}";
- Pattern pattern = Pattern.compile(placeholderRegex);
- Matcher matcher = pattern.matcher(input);
-
- StringBuffer result = new StringBuffer();
- while (matcher.find()) {
- String replacement =
- value != null && !value.isEmpty()
- ? value
- : (matcher.group(1) != null
- ? matcher.group(1).substring(1).trim()
- : defaultValue);
- if (replacement == null) {
- continue;
- }
- matcher.appendReplacement(result,
Matcher.quoteReplacement(replacement));
- }
- matcher.appendTail(result);
- return result.toString();
- }
-
- private static String replaceTableIdentifier(
- String placeholder, TableIdentifier identifier, String
defaultValue) {
- placeholder =
- replacePlaceholders(
- placeholder,
- REPLACE_DATABASE_NAME_KEY,
- identifier.getDatabaseName(),
- defaultValue);
- placeholder =
- replacePlaceholders(
- placeholder,
- REPLACE_SCHEMA_NAME_KEY,
- identifier.getSchemaName(),
- defaultValue);
- placeholder =
- replacePlaceholders(
- placeholder,
- REPLACE_TABLE_NAME_KEY,
- identifier.getTableName(),
- defaultValue);
-
- List<String> fullPath = new ArrayList<>();
- if (identifier.getDatabaseName() != null) {
- fullPath.add(identifier.getDatabaseName());
- }
- if (identifier.getSchemaName() != null) {
- fullPath.add(identifier.getSchemaName());
+import java.util.HashSet;
+import java.util.Set;
+
+public enum TablePlaceholder {
+
+ // Placeholder ${database_name} or${database_name:default_value}
+ REPLACE_DATABASE_NAME_KEY("database_name"),
+ // Placeholder ${schema_name} or${schema_name:default_value}
+ REPLACE_SCHEMA_NAME_KEY("schema_name"),
+ // Placeholder ${schema_full_name} or${schema_full_name:default_value}
+ REPLACE_SCHEMA_FULL_NAME_KEY("schema_full_name"),
+ // Placeholder ${table_name} or${table_name:default_value}
+ REPLACE_TABLE_NAME_KEY("table_name"),
+ // Placeholder ${table_full_name} or${table_full_name:default_value}
+ REPLACE_TABLE_FULL_NAME_KEY("table_full_name"),
+ // Placeholder ${primary_key} or${primary_key:default_value}
+ REPLACE_PRIMARY_KEY("primary_key"),
+ // Placeholder ${unique_key} or${unique_key:default_value}
+ REPLACE_UNIQUE_KEY("unique_key"),
+ // Placeholder ${field_names} or${field_names:default_value}
+ REPLACE_FIELD_NAMES_KEY("field_names");
+
+ private static Set<String> PLACEHOLDER_KEYS = new HashSet<>();
+
+ static {
+ // O(1) complexity, using static to load all system placeholders
+ for (TablePlaceholder placeholder : TablePlaceholder.values()) {
+ PLACEHOLDER_KEYS.add(placeholder.getPlaceholder());
}
- if (!fullPath.isEmpty()) {
- placeholder =
- replacePlaceholders(
- placeholder,
- REPLACE_SCHEMA_FULL_NAME_KEY,
- String.join(NAME_DELIMITER, fullPath),
- defaultValue);
- }
-
- if (identifier.getTableName() != null) {
- fullPath.add(identifier.getTableName());
- }
- if (!fullPath.isEmpty()) {
- placeholder =
- replacePlaceholders(
- placeholder,
- REPLACE_TABLE_FULL_NAME_KEY,
- String.join(NAME_DELIMITER, fullPath),
- defaultValue);
- }
- return placeholder;
- }
-
- public static String replaceTableIdentifier(String placeholder,
TableIdentifier identifier) {
- return replaceTableIdentifier(placeholder, identifier, "");
}
- public static String replaceTablePrimaryKey(String placeholder, PrimaryKey
primaryKey) {
- if (primaryKey != null && !primaryKey.getColumnNames().isEmpty()) {
- String pkFieldsString = String.join(FIELD_DELIMITER,
primaryKey.getColumnNames());
- return replacePlaceholders(placeholder, REPLACE_PRIMARY_KEY,
pkFieldsString);
- }
- return placeholder;
- }
-
- public static String replaceTableUniqueKey(
- String placeholder, List<ConstraintKey> constraintKeys) {
- Optional<String> ukFieldsString =
- constraintKeys.stream()
- .filter(
- e ->
- e.getConstraintType()
-
.equals(ConstraintKey.ConstraintType.UNIQUE_KEY))
- .findFirst()
- .map(
- e ->
- e.getColumnNames().stream()
- .map(f -> f.getColumnName())
-
.collect(Collectors.joining(FIELD_DELIMITER)));
- if (ukFieldsString.isPresent()) {
- return replacePlaceholders(placeholder, REPLACE_UNIQUE_KEY,
ukFieldsString.get());
- }
- return placeholder;
- }
+ private final String key;
- public static String replaceTableFieldNames(String placeholder,
TableSchema schema) {
- return replacePlaceholders(
- placeholder,
- REPLACE_FIELD_NAMES_KEY,
- String.join(FIELD_DELIMITER, schema.getFieldNames()));
+ TablePlaceholder(String placeholder) {
+ this.key = placeholder;
}
- public static ReadonlyConfig replaceTablePlaceholder(
- ReadonlyConfig config, CatalogTable table) {
- return replaceTablePlaceholder(config, table, Collections.emptyList());
+ public String getPlaceholder() {
+ return key;
}
- public static ReadonlyConfig replaceTablePlaceholder(
- ReadonlyConfig config, CatalogTable table, Collection<String>
excludeKeys) {
- Map<String, Object> copyOnWriteData =
ObjectUtils.clone(config.getSourceMap());
- for (String key : copyOnWriteData.keySet()) {
- if (excludeKeys.contains(key)) {
- continue;
- }
- Object value = copyOnWriteData.get(key);
- if (value != null) {
- if (value instanceof String) {
- String strValue = (String) value;
- strValue = replaceTableIdentifier(strValue,
table.getTableId());
- strValue =
- replaceTablePrimaryKey(
- strValue,
table.getTableSchema().getPrimaryKey());
- strValue =
- replaceTableUniqueKey(
- strValue,
table.getTableSchema().getConstraintKeys());
- strValue = replaceTableFieldNames(strValue,
table.getTableSchema());
- copyOnWriteData.put(key, strValue);
- } else if (value instanceof List) {
- List listValue = (List) value;
- if (listValue.size() == 1 && listValue.get(0) instanceof
String) {
- String strValue = (String) listValue.get(0);
- if (strValue.equals("${" + REPLACE_PRIMARY_KEY + "}"))
{
- strValue =
- replaceTablePrimaryKey(
- strValue,
table.getTableSchema().getPrimaryKey());
- listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
- } else if (strValue.equals("${" + REPLACE_UNIQUE_KEY +
"}")) {
- strValue =
- replaceTableUniqueKey(
- strValue,
table.getTableSchema().getConstraintKeys());
- listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
- } else if (strValue.equals("${" +
REPLACE_FIELD_NAMES_KEY + "}")) {
- strValue = replaceTableFieldNames(strValue,
table.getTableSchema());
- listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
- }
- copyOnWriteData.put(key, listValue);
- }
- }
- }
- }
- return ReadonlyConfig.fromMap(copyOnWriteData);
+ public static boolean isSystemPlaceholder(String str) {
+ return PLACEHOLDER_KEYS.contains(str);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessor.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessor.java
index f599e22135..4b7f9df3af 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessor.java
@@ -33,75 +33,34 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class TablePlaceholder {
- // Placeholder ${database_name} or ${database_name:default_value}
- public static final String REPLACE_DATABASE_NAME_KEY = "database_name";
- // Placeholder ${schema_name} or ${schema_name:default_value}
- public static final String REPLACE_SCHEMA_NAME_KEY = "schema_name";
- // Placeholder ${schema_full_name} or ${schema_full_name:default_value}
- public static final String REPLACE_SCHEMA_FULL_NAME_KEY =
"schema_full_name";
- // Placeholder ${table_name} or ${table_name:default_value}
- public static final String REPLACE_TABLE_NAME_KEY = "table_name";
- // Placeholder ${table_full_name} or ${table_full_name:default_value}
- public static final String REPLACE_TABLE_FULL_NAME_KEY = "table_full_name";
- // Placeholder ${primary_key} or ${primary_key:default_value}
- public static final String REPLACE_PRIMARY_KEY = "primary_key";
- // Placeholder ${unique_key} or ${unique_key:default_value}
- public static final String REPLACE_UNIQUE_KEY = "unique_key";
- // Placeholder ${field_names} or ${field_names:default_value}
- public static final String REPLACE_FIELD_NAMES_KEY = "field_names";
- public static final String NAME_DELIMITER = ".";
- public static final String FIELD_DELIMITER = ",";
+import static
org.apache.seatunnel.common.utils.PlaceholderUtils.replacePlaceholders;
- private static String replacePlaceholders(String input, String
placeholderName, String value) {
- return replacePlaceholders(input, placeholderName, value, null);
- }
+public class TablePlaceholderProcessor {
- private static String replacePlaceholders(
- String input, String placeholderName, String value, String
defaultValue) {
- String placeholderRegex = "\\$\\{" + Pattern.quote(placeholderName) +
"(:[^}]*)?\\}";
- Pattern pattern = Pattern.compile(placeholderRegex);
- Matcher matcher = pattern.matcher(input);
-
- StringBuffer result = new StringBuffer();
- while (matcher.find()) {
- String replacement =
- value != null && !value.isEmpty()
- ? value
- : (matcher.group(1) != null
- ? matcher.group(1).substring(1).trim()
- : defaultValue);
- if (replacement == null) {
- continue;
- }
- matcher.appendReplacement(result,
Matcher.quoteReplacement(replacement));
- }
- matcher.appendTail(result);
- return result.toString();
- }
+ public static final String NAME_DELIMITER = ".";
+
+ public static final String FIELD_DELIMITER = ",";
private static String replaceTableIdentifier(
String placeholder, TableIdentifier identifier, String
defaultValue) {
placeholder =
replacePlaceholders(
placeholder,
- REPLACE_DATABASE_NAME_KEY,
+
TablePlaceholder.REPLACE_DATABASE_NAME_KEY.getPlaceholder(),
identifier.getDatabaseName(),
defaultValue);
placeholder =
replacePlaceholders(
placeholder,
- REPLACE_SCHEMA_NAME_KEY,
+
TablePlaceholder.REPLACE_SCHEMA_NAME_KEY.getPlaceholder(),
identifier.getSchemaName(),
defaultValue);
placeholder =
replacePlaceholders(
placeholder,
- REPLACE_TABLE_NAME_KEY,
+
TablePlaceholder.REPLACE_TABLE_NAME_KEY.getPlaceholder(),
identifier.getTableName(),
defaultValue);
@@ -116,7 +75,7 @@ public class TablePlaceholder {
placeholder =
replacePlaceholders(
placeholder,
- REPLACE_SCHEMA_FULL_NAME_KEY,
+
TablePlaceholder.REPLACE_SCHEMA_FULL_NAME_KEY.getPlaceholder(),
String.join(NAME_DELIMITER, fullPath),
defaultValue);
}
@@ -128,7 +87,7 @@ public class TablePlaceholder {
placeholder =
replacePlaceholders(
placeholder,
- REPLACE_TABLE_FULL_NAME_KEY,
+
TablePlaceholder.REPLACE_TABLE_FULL_NAME_KEY.getPlaceholder(),
String.join(NAME_DELIMITER, fullPath),
defaultValue);
}
@@ -142,7 +101,10 @@ public class TablePlaceholder {
public static String replaceTablePrimaryKey(String placeholder, PrimaryKey
primaryKey) {
if (primaryKey != null && !primaryKey.getColumnNames().isEmpty()) {
String pkFieldsString = String.join(FIELD_DELIMITER,
primaryKey.getColumnNames());
- return replacePlaceholders(placeholder, REPLACE_PRIMARY_KEY,
pkFieldsString);
+ return replacePlaceholders(
+ placeholder,
+ TablePlaceholder.REPLACE_PRIMARY_KEY.getPlaceholder(),
+ pkFieldsString);
}
return placeholder;
}
@@ -162,7 +124,10 @@ public class TablePlaceholder {
.map(f -> f.getColumnName())
.collect(Collectors.joining(FIELD_DELIMITER)));
if (ukFieldsString.isPresent()) {
- return replacePlaceholders(placeholder, REPLACE_UNIQUE_KEY,
ukFieldsString.get());
+ return replacePlaceholders(
+ placeholder,
+ TablePlaceholder.REPLACE_UNIQUE_KEY.getPlaceholder(),
+ ukFieldsString.get());
}
return placeholder;
}
@@ -170,7 +135,7 @@ public class TablePlaceholder {
public static String replaceTableFieldNames(String placeholder,
TableSchema schema) {
return replacePlaceholders(
placeholder,
- REPLACE_FIELD_NAMES_KEY,
+ TablePlaceholder.REPLACE_FIELD_NAMES_KEY.getPlaceholder(),
String.join(FIELD_DELIMITER, schema.getFieldNames()));
}
@@ -203,17 +168,26 @@ public class TablePlaceholder {
List listValue = (List) value;
if (listValue.size() == 1 && listValue.get(0) instanceof
String) {
String strValue = (String) listValue.get(0);
- if (strValue.equals("${" + REPLACE_PRIMARY_KEY + "}"))
{
+ if (strValue.equals(
+ "${"
+ +
TablePlaceholder.REPLACE_PRIMARY_KEY.getPlaceholder()
+ + "}")) {
strValue =
replaceTablePrimaryKey(
strValue,
table.getTableSchema().getPrimaryKey());
listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
- } else if (strValue.equals("${" + REPLACE_UNIQUE_KEY +
"}")) {
+ } else if (strValue.equals(
+ "${"
+ +
TablePlaceholder.REPLACE_UNIQUE_KEY.getPlaceholder()
+ + "}")) {
strValue =
replaceTableUniqueKey(
strValue,
table.getTableSchema().getConstraintKeys());
listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
- } else if (strValue.equals("${" +
REPLACE_FIELD_NAMES_KEY + "}")) {
+ } else if (strValue.equals(
+ "${"
+ +
TablePlaceholder.REPLACE_FIELD_NAMES_KEY.getPlaceholder()
+ + "}")) {
strValue = replaceTableFieldNames(strValue,
table.getTableSchema());
listValue =
Arrays.asList(strValue.split(FIELD_DELIMITER));
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
index 3e0eb24cd5..b83c1087e2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.TablePlaceholder;
+import org.apache.seatunnel.api.sink.TablePlaceholderProcessor;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import com.google.common.annotations.VisibleForTesting;
@@ -48,7 +48,7 @@ public class TableSinkFactoryContext extends
TableFactoryContext {
ClassLoader classLoader,
Collection<String> excludeTablePlaceholderReplaceKeys) {
ReadonlyConfig rewriteConfig =
- TablePlaceholder.replaceTablePlaceholder(
+ TablePlaceholderProcessor.replaceTablePlaceholder(
options, catalogTable,
excludeTablePlaceholderReplaceKeys);
return new TableSinkFactoryContext(catalogTable, rewriteConfig,
classLoader);
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessorTest.java
similarity index 96%
rename from
seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
rename to
seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessorTest.java
index 16a69d5db3..71f23822da 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderProcessorTest.java
@@ -37,7 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class TablePlaceholderTest {
+public class TablePlaceholderProcessorTest {
private static final Option<String> DATABASE =
Options.key("database").stringType().noDefaultValue();
private static final Option<String> SCHEMA =
@@ -60,7 +60,7 @@ public class TablePlaceholderTest {
public void testSinkOptions() {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTable();
- ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(config, table);
+ ReadonlyConfig newConfig =
TablePlaceholderProcessor.replaceTablePlaceholder(config, table);
Assertions.assertEquals("xyz_my-database_test",
newConfig.get(DATABASE));
Assertions.assertEquals("xyz_my-schema_test", newConfig.get(SCHEMA));
@@ -78,7 +78,7 @@ public class TablePlaceholderTest {
public void testSinkOptionsWithNoTablePath() {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
- ReadonlyConfig newConfig =
TablePlaceholder.replaceTablePlaceholder(config, table);
+ ReadonlyConfig newConfig =
TablePlaceholderProcessor.replaceTablePlaceholder(config, table);
Assertions.assertEquals("xyz_default_db_test",
newConfig.get(DATABASE));
Assertions.assertEquals("xyz_default_schema_test",
newConfig.get(SCHEMA));
@@ -97,7 +97,7 @@ public class TablePlaceholderTest {
ReadonlyConfig config = createConfig();
CatalogTable table = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig =
- TablePlaceholder.replaceTablePlaceholder(
+ TablePlaceholderProcessor.replaceTablePlaceholder(
config, table, Arrays.asList(DATABASE.key()));
Assertions.assertEquals("xyz_${database_name: default_db}_test",
newConfig.get(DATABASE));
@@ -118,9 +118,9 @@ public class TablePlaceholderTest {
CatalogTable table1 = createTestTable();
CatalogTable table2 = createTestTableWithNoDatabaseAndSchemaName();
ReadonlyConfig newConfig1 =
- TablePlaceholder.replaceTablePlaceholder(config, table1,
Arrays.asList());
+ TablePlaceholderProcessor.replaceTablePlaceholder(config,
table1, Arrays.asList());
ReadonlyConfig newConfig2 =
- TablePlaceholder.replaceTablePlaceholder(config, table2,
Arrays.asList());
+ TablePlaceholderProcessor.replaceTablePlaceholder(config,
table2, Arrays.asList());
Assertions.assertEquals("xyz_my-database_test",
newConfig1.get(DATABASE));
Assertions.assertEquals("xyz_my-schema_test", newConfig1.get(SCHEMA));
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
new file mode 100644
index 0000000000..ab697e7357
--- /dev/null
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/PlaceholderUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PlaceholderUtils {
+
+ public static String replacePlaceholders(String input, String
placeholderName, String value) {
+ return replacePlaceholders(input, placeholderName, value, null);
+ }
+
+ public static String replacePlaceholders(
+ String input, String placeholderName, String value, String
defaultValue) {
+ String placeholderRegex = "\\$\\{" + Pattern.quote(placeholderName) +
"(:[^}]*)?\\}";
+ Pattern pattern = Pattern.compile(placeholderRegex);
+ Matcher matcher = pattern.matcher(input);
+
+ StringBuffer result = new StringBuffer();
+ while (matcher.find()) {
+ String replacement =
+ value != null && !value.isEmpty()
+ ? value
+ : (matcher.group(1) != null
+ ? matcher.group(1).substring(1).trim()
+ : defaultValue);
+ if (replacement == null) {
+ continue;
+ }
+ matcher.appendReplacement(result,
Matcher.quoteReplacement(replacement));
+ }
+ matcher.appendTail(result);
+ return result.toString();
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/pom.xml
b/seatunnel-core/seatunnel-core-starter/pom.xml
index 3f2da12954..332a806bc8 100644
--- a/seatunnel-core/seatunnel-core-starter/pom.xml
+++ b/seatunnel-core/seatunnel-core-starter/pom.xml
@@ -60,5 +60,12 @@
<artifactId>jcommander</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <version>1.5.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index 57392f6a3f..40dea79166 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -26,22 +26,28 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
import org.apache.seatunnel.shade.com.typesafe.config.impl.Parseable;
import org.apache.seatunnel.api.configuration.ConfigAdapter;
+import org.apache.seatunnel.api.sink.TablePlaceholder;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.ParserException;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.common.utils.PlaceholderUtils.replacePlaceholders;
import static
org.apache.seatunnel.core.starter.utils.ConfigShadeUtils.DEFAULT_SENSITIVE_KEYWORDS;
/** Used to build the {@link Config} from config file. */
@@ -51,6 +57,8 @@ public class ConfigBuilder {
public static final ConfigRenderOptions CONFIG_RENDER_OPTIONS =
ConfigRenderOptions.concise().setFormatted(true);
+ private static final String PLACEHOLDER_REGEX =
"\\$\\{([^:{}]+)(?::[^}]*)?\\}";
+
private ConfigBuilder() {
// utility class and cannot be instantiated
}
@@ -172,6 +180,14 @@ public class ConfigBuilder {
.filter(Objects::nonNull)
.map(variable -> variable.split("=", 2))
.filter(pair -> pair.length == 2)
+ .peek(
+ pair -> {
+ if
(TablePlaceholder.isSystemPlaceholder(pair[0])) {
+ throw new ConfigCheckException(
+ "System placeholders cannot be
used. Incorrect config parameter: "
+ + pair[0]);
+ }
+ })
.forEach(pair -> System.setProperty(pair[0], pair[1]));
Config systemConfig =
Parseable.newProperties(
@@ -180,12 +196,98 @@ public class ConfigBuilder {
.setOriginDescription("system
properties"))
.parse()
.toConfig();
- return config.resolveWith(
- systemConfig,
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ Config resolvedConfig =
+ config.resolveWith(
+ systemConfig,
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ Map<String, Object> configMap = resolvedConfig.root().unwrapped();
+
+ configMap.forEach(
+ (key, value) -> {
+ if (value instanceof Map) {
+ processVariablesMap((Map<String, Object>) value);
+ } else if (value instanceof List) {
+ ((List<Map<String, Object>>) value)
+ .forEach(map -> processVariablesMap(map));
+ }
+ });
+
+ return ConfigFactory.parseString(
+ JsonUtils.toJsonString(configMap),
+
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON))
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
return config;
}
+ private static void processVariablesMap(Map<String, Object> mapValue) {
+ mapValue.forEach(
+ (innerKey, innerValue) -> {
+ if (innerValue instanceof Map) {
+ processVariablesMap((Map<String, Object>) innerValue);
+ } else if (innerValue instanceof List) {
+ mapValue.put(innerKey, processVariablesList((List<?>)
innerValue));
+ } else {
+ processVariable(innerKey, innerValue, mapValue);
+ }
+ });
+ }
+
+ private static List<?> processVariablesList(List<?> list) {
+ return list.stream()
+ .map(
+ variable -> {
+ if (variable instanceof String) {
+ String variableString = (String) variable;
+ return
extractPlaceholder(variableString).stream()
+ .reduce(
+ variableString,
+ (result, placeholder) -> {
+ return replacePlaceholders(
+ result,
+ placeholder,
+
System.getProperty(placeholder),
+ null);
+ });
+ }
+ return variable;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static void processVariable(
+ String variableKey, Object variableValue, Map<String, Object>
parentMap) {
+ if (Objects.isNull(variableValue)) {
+ return;
+ }
+ String variableString = variableValue.toString();
+ List<String> placeholders = extractPlaceholder(variableString);
+
+ for (String placeholder : placeholders) {
+ String replacedValue =
+ replacePlaceholders(
+ variableString, placeholder,
System.getProperty(placeholder), null);
+ variableString = replacedValue;
+ }
+
+ if (!placeholders.isEmpty()) {
+ parentMap.put(variableKey, variableString);
+ }
+ }
+
+ public static List<String> extractPlaceholder(String input) {
+ Pattern pattern = Pattern.compile(PLACEHOLDER_REGEX);
+ Matcher matcher = pattern.matcher(input);
+ List<String> placeholders = new ArrayList<>();
+
+ while (matcher.find()) {
+ placeholders.add(matcher.group(1));
+ }
+
+ return placeholders;
+ }
+
public static String mapToString(Map<String, Object> configMap) {
ConfigParseOptions configParseOptions =
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON);
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
index db5df31866..b91b144d45 100644
---
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
+++
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
@@ -25,9 +25,11 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.api.configuration.ConfigShade;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import com.beust.jcommander.internal.Lists;
import lombok.extern.slf4j.Slf4j;
@@ -181,6 +183,72 @@ public class ConfigShadeTest {
}
}
+ // Set the system environment variables through SetEnvironmentVariable to
verify whether the
+ // parameters set by the system environment variables are effective
+ @SetEnvironmentVariable(key = "jobName", value = "seatunnel variable test
job")
+ @Test
+ public void testVariableReplacementWithDefaultValue() throws
URISyntaxException {
+ String jobName = "seatunnel variable test job";
+ Assertions.assertEquals(System.getenv("jobName"), jobName);
+ String ageType = "int";
+ String sourceTableName = "sql";
+ String containSpaceString = "f h";
+ List<String> variables = new ArrayList<>();
+ variables.add("strTemplate=[abc,de~," + containSpaceString + "]");
+ variables.add("ageType=" + ageType);
+ // Set the environment variable value nameVal to `f h` to verify
whether setting the space
+ // through the environment variable is effective
+ System.setProperty("nameValForEnv", containSpaceString);
+ variables.add("sourceTableName=" + sourceTableName);
+ URL resource =
+
ConfigShadeTest.class.getResource("/config_variables_with_default_value.conf");
+ Assertions.assertNotNull(resource);
+ Config config = ConfigBuilder.of(Paths.get(resource.toURI()),
variables);
+ Config envConfig = config.getConfig("env");
+ Assertions.assertEquals(envConfig.getString("job.name"), jobName);
+ List<? extends ConfigObject> sourceConfigs =
config.getObjectList("source");
+ for (ConfigObject configObject : sourceConfigs) {
+ Config sourceConfig = configObject.toConfig();
+ List<String> list1 = sourceConfig.getStringList("string.template");
+ Assertions.assertEquals(list1.get(0), "abc");
+ Assertions.assertEquals(list1.get(1), "de~");
+ Assertions.assertEquals(list1.get(2), containSpaceString);
+ Assertions.assertEquals(sourceConfig.getInt("row.num"), 50);
+
Assertions.assertEquals(sourceConfig.getString("result_table_name"),
"fake_test_table");
+ }
+ List<? extends ConfigObject> transformConfigs =
config.getObjectList("transform");
+ for (ConfigObject configObject : transformConfigs) {
+ Config transformConfig = configObject.toConfig();
+ Assertions.assertEquals(
+ transformConfig.getString("query"),
+ "select * from fake_test_table where name = 'f h' ");
+ }
+ List<? extends ConfigObject> sinkConfigs =
config.getObjectList("sink");
+ for (ConfigObject sinkObject : sinkConfigs) {
+ Config sinkConfig = sinkObject.toConfig();
+ Assertions.assertEquals(sinkConfig.getString("source_table_name"),
sourceTableName);
+ }
+ }
+
+ @Test
+ public void testVariableReplacementWithReservedPlaceholder() {
+ List<String> variables = new ArrayList<>();
+ variables.add("strTemplate=[abc,de~,f h]");
+ // Set up a reserved placeholder
+ variables.add("table_name=sql");
+ URL resource =
+ ConfigShadeTest.class.getResource(
+ "/config_variables_with_reserved_placeholder.conf");
+ Assertions.assertNotNull(resource);
+ ConfigCheckException configCheckException =
+ Assertions.assertThrows(
+ ConfigCheckException.class,
+ () -> ConfigBuilder.of(Paths.get(resource.toURI()),
variables));
+ Assertions.assertEquals(
+ "System placeholders cannot be used. Incorrect config
parameter: table_name",
+ configCheckException.getMessage());
+ }
+
@Test
public void testDecryptAndEncrypt() {
String encryptUsername = ConfigShadeUtils.encryptOption("base64",
USERNAME);
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_default_value.conf
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_default_value.conf
new file mode 100644
index 0000000000..1bdde33ac0
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_default_value.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "BATCH"
+ job.name = ${jobName}
+ parallelism = 2
+}
+
+source {
+ FakeSource {
+ result_table_name = "${resName:fake_test}_table"
+ row.num = "${rowNum:50}"
+ string.template = ${strTemplate}
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = "${nameType:string}"
+ age = ${ageType}
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "${resName:fake_test}_table"
+ result_table_name = "sql"
+ query = "select * from ${resName:fake_test}_table where name =
'${nameValForEnv}' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = ${sourceTableName}
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_reserved_placeholder.conf
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_reserved_placeholder.conf
new file mode 100644
index 0000000000..1909884d2c
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-starter/src/test/resources/config_variables_with_reserved_placeholder.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "BATCH"
+ job.name = "seatunnel variable test job"
+ parallelism = 2
+}
+
+source {
+ FakeSource {
+ result_table_name = "${resName:fake_test}_table"
+ row.num = "${rowNum:50}"
+ string.template = ${strTemplate}
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = "${nameType:string}"
+ age = int
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "${resName:fake_test}_table"
+ result_table_name = "sql"
+ query = "select * from ${resName:fake_test}_table where name = 'abc' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = ${table_name}
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
index 87e05821b1..0b92a89961 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UserVariableIT.java
@@ -44,4 +44,19 @@ public class UserVariableIT extends TestSuiteBase {
container.executeJob("/fake_to_console.variables.conf",
variables);
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+
+ @TestTemplate
+ public void userVariableWithDefaultValueTest(TestContainer container)
+ throws IOException, InterruptedException {
+ List<String> variables = new ArrayList<>();
+ String list = "[abc,def]";
+ variables.add("strTemplate=" + list);
+ variables.add("ageType=int");
+ variables.add("nameVal=abc");
+ variables.add("sourceTableName=sql");
+ Container.ExecResult execResult =
+ container.executeJob(
+ "/fake_to_console_with_default_value.variables.conf",
variables);
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console_with_default_value.variables.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console_with_default_value.variables.conf
new file mode 100644
index 0000000000..f6b367185e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_console_with_default_value.variables.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ job.mode = "BATCH"
+ job.name = "${jobName:fake_to_console_with_default_value}"
+ parallelism = 2
+}
+
+source {
+ FakeSource {
+ result_table_name = "${resName:fake_test}_table"
+ row.num = "${rowNum:50}"
+ string.template = ${strTemplate}
+ int.template = [20, 21]
+ schema = {
+ fields {
+ name = "${nameType:string}"
+ age = ${ageType}
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "${resName:fake_test}_table"
+ result_table_name = "sql"
+ query = "select * from ${resName:fake_test}_table where name =
'${nameVal}' "
+ }
+
+}
+
+sink {
+ Console {
+ source_table_name = ${sourceTableName}
+ }
+}
\ No newline at end of file