This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dd64ed52d4 [Feature][Connector-V2][Assert] Support check the precision
and scale of Decimal type. (#6110)
dd64ed52d4 is described below
commit dd64ed52d438ad3362ffd9788b99df9be0b365f1
Author: Chengyu Yan <[email protected]>
AuthorDate: Tue Jan 2 10:34:16 2024 +0800
[Feature][Connector-V2][Assert] Support check the precision and scale of
Decimal type. (#6110)
---
release-note.md | 1 +
.../assertion/excecutor/AssertExecutor.java | 17 ++++++++
.../seatunnel/assertion/rule/AssertRuleParser.java | 18 +++++---
.../flink/assertion/AssertExecutorTest.java | 51 ++++++++++++++++++++++
.../flink/assertion/rule/AssertRuleParserTest.java | 47 +++++++++++++++++++-
.../connectors/seatunnel/jdbc/JdbcHiveIT.java | 2 +-
.../resources/jdbc_hive_source_and_assert.conf | 2 +-
.../test/resources/mongodb_source_to_assert.conf | 9 ++++
8 files changed, 137 insertions(+), 10 deletions(-)
diff --git a/release-note.md b/release-note.md
index 07bb3d11d5..0ded29987d 100644
--- a/release-note.md
+++ b/release-note.md
@@ -185,6 +185,7 @@
- [Transform-V2] Add Catalog support for FilterRowKindTransform (#4420)
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
+- [Connector-V2] [Assert] Support check the precision and scale of Decimal
type (#6110)
### Zeta(ST-Engine)
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 5868fba912..21142e8158 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -17,9 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
@@ -163,6 +165,21 @@ public class AssertExecutor {
}
private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
+ if (fieldType.getSqlType() == SqlType.DECIMAL) {
+ return checkDecimalType(value, fieldType);
+ }
return value.getClass().equals(fieldType.getTypeClass());
}
+
+ private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?>
fieldType) {
+ if (!value.getClass().equals(fieldType.getTypeClass())) {
+ return false;
+ }
+ DecimalType fieldDecimalType = (DecimalType) fieldType;
+ BigDecimal valueObj = (BigDecimal) value;
+ if (valueObj.scale() != fieldDecimalType.getScale()) {
+ return false;
+ }
+ return valueObj.precision() <= fieldDecimalType.getPrecision();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index 7681f67cde..842dcabd90 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
@@ -39,6 +41,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
public class AssertRuleParser {
+ private static final Pattern DECIMAL_TYPE_PATTERN =
+ Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");
+
public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends
Config> rowRuleList) {
return assembleFieldValueRules(rowRuleList);
@@ -91,13 +96,14 @@ public class AssertRuleParser {
}
private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
- if (fieldTypeStr.toLowerCase().startsWith("decimal(")) {
- String lengthAndScale =
- fieldTypeStr.toLowerCase().replace("decimal(",
"").replace(")", "");
- String[] split = lengthAndScale.split(",");
- return new DecimalType(Integer.valueOf(split[0]),
Integer.valueOf(split[1]));
+ final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
+ Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
+ if (matcher.find()) {
+ int precision = Integer.parseInt(matcher.group(1));
+ int scale = Integer.parseInt(matcher.group(2));
+ return new DecimalType(precision, scale);
}
- return TYPES.get(fieldTypeStr.toLowerCase());
+ return TYPES.get(normalTypeStr);
}
private static final Map<String, SeaTunnelDataType<?>> TYPES =
Maps.newHashMap();
diff --git
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
index 4b9d941535..48b31989c6 100644
---
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
+++
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.flink.assertion;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,8 +29,11 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.util.Collections;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -111,4 +115,51 @@ public class AssertExecutorTest {
rule.setFieldRules(valueRules);
return rule;
}
+
+ @Test
+ public void testDecimalTypeCheck() {
+ List<AssertFieldRule> rules = Lists.newArrayList();
+ AssertFieldRule rule = new AssertFieldRule();
+ rule.setFieldName("c_mock");
+ DecimalType assertFieldType = new DecimalType(10, 2);
+ rule.setFieldType(assertFieldType);
+
+ AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
+ valueRule.setEqualTo("99999999.90");
+
+ rule.setFieldRules(Collections.singletonList(valueRule));
+ rules.add(rule);
+
+ SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new
BigDecimal("99999999.90")});
+ SeaTunnelRowType mockType =
+ new SeaTunnelRowType(
+ new String[] {"c_mock"}, new SeaTunnelDataType[] {new
DecimalType(10, 2)});
+
+ AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType,
rules).orElse(null);
+ assertNull(failRule);
+ }
+
+ @Test
+ public void testDecimalTypeCheckError() {
+ List<AssertFieldRule> rules = Lists.newArrayList();
+ AssertFieldRule rule = new AssertFieldRule();
+ rule.setFieldName("c_mock");
+ DecimalType assertFieldType = new DecimalType(1, 0);
+ rule.setFieldType(assertFieldType);
+
+ AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
+ valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
+ rule.setFieldRules(Collections.singletonList(valueRule));
+ rules.add(rule);
+
+ SeaTunnelRow mockRow = new SeaTunnelRow(new Object[]
{BigDecimal.valueOf(99999999.99)});
+ SeaTunnelRowType mockType =
+ new SeaTunnelRowType(
+ new String[] {"c_mock"}, new SeaTunnelDataType[] {new
DecimalType(10, 2)});
+
+ AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType,
rules).orElse(null);
+ assertNotNull(failRule);
+ assertEquals(assertFieldType, failRule.getFieldType());
+ assertEquals("c_mock", failRule.getFieldName());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
index 29eabbda08..494c3c4496 100644
---
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
+++
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
@@ -37,8 +38,39 @@ public class AssertRuleParserTest {
public void testParseRules() {
List<? extends Config> ruleConfigList = assembleConfig();
List<AssertFieldRule> assertFieldRules =
parser.parseRules(ruleConfigList);
- assertEquals(assertFieldRules.size(), 2);
- assertEquals(assertFieldRules.get(0).getFieldType(),
BasicType.STRING_TYPE);
+ assertEquals(3, assertFieldRules.size());
+
+ AssertFieldRule nameRule = assertFieldRules.get(0);
+ List<AssertFieldRule.AssertRule> nameValueRules =
nameRule.getFieldRules();
+ assertEquals(BasicType.STRING_TYPE, nameRule.getFieldType());
+ assertEquals("name", nameRule.getFieldName());
+ assertEquals(3, nameValueRules.size());
+ assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL,
nameValueRules.get(0).getRuleType());
+ assertEquals(
+ AssertFieldRule.AssertRuleType.MIN_LENGTH,
nameValueRules.get(1).getRuleType());
+ assertEquals(3.0, nameValueRules.get(1).getRuleValue());
+ assertEquals(
+ AssertFieldRule.AssertRuleType.MAX_LENGTH,
nameValueRules.get(2).getRuleType());
+ assertEquals(5.0, nameValueRules.get(2).getRuleValue());
+
+ AssertFieldRule ageRule = assertFieldRules.get(1);
+ List<AssertFieldRule.AssertRule> ageValueRules =
ageRule.getFieldRules();
+ assertEquals("age", ageRule.getFieldName());
+ assertEquals(3, ageValueRules.size());
+ assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL,
ageValueRules.get(0).getRuleType());
+ assertEquals(AssertFieldRule.AssertRuleType.MIN,
ageValueRules.get(1).getRuleType());
+ assertEquals(10.0, ageValueRules.get(1).getRuleValue());
+ assertEquals(AssertFieldRule.AssertRuleType.MAX,
ageValueRules.get(2).getRuleType());
+ assertEquals(20.0, ageValueRules.get(2).getRuleValue());
+
+ AssertFieldRule decimalRule = assertFieldRules.get(2);
+ List<AssertFieldRule.AssertRule> decimalValueRules =
decimalRule.getFieldRules();
+ assertEquals("c_decimal", decimalRule.getFieldName());
+ assertEquals(new DecimalType(10, 2), decimalRule.getFieldType());
+ assertEquals(2, decimalValueRules.size());
+ assertEquals(
+ AssertFieldRule.AssertRuleType.NOT_NULL,
decimalValueRules.get(0).getRuleType());
+ assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
}
private List<? extends Config> assembleConfig() {
@@ -76,6 +108,17 @@ public class AssertRuleParserTest {
+ " rule_value = 20\n"
+ " }\n"
+ " ]\n"
+ + " },{\n"
+ + " field_name = c_decimal\n"
+ + " field_type= \" decimal( 10 , 2 ) \"\n"
+ + " field_value = [\n"
+ + " {\n"
+ + " rule_type = NOT_NULL\n"
+ + " },\n"
+ + " {\n"
+ + " equals_to = \"12.12\"\n"
+ + " }\n"
+ + " ]\n"
+ " }\n"
+ " ]\n"
+ " \n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
index 936ea73bbe..f183c0c193 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
@@ -131,7 +131,7 @@ public class JdbcHiveIT extends AbstractJdbcIT {
+ " TRUE,"
+ " '2023-09-04',"
+ " '2023-09-04 10:30:00',"
- + " 42.12,"
+ + " 42.10,"
+ " 42.12)");
}
} catch (Exception exception) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index 21c4059578..0f2b909c12 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -126,7 +126,7 @@ sink{
{
field_name = hive_e2e_source_table.decimal_column
field_type = "decimal(10,2)"
- field_value = [{equals_to = 42.12}]
+ field_value = [{equals_to = "42.10"}]
},
{
field_name = hive_e2e_source_table.numeric_column
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
index f2e586ebf1..4bde48369d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
@@ -105,6 +105,15 @@ sink {
rule_type = NOT_NULL
}
]
+ },
+ {
+ field_name = c_decimal
+ field_type = "decimal(33, 18)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
}
]
}