This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd64ed52d4 [Feature][Connector-V2][Assert] Support check the precision 
and scale of Decimal type. (#6110)
dd64ed52d4 is described below

commit dd64ed52d438ad3362ffd9788b99df9be0b365f1
Author: Chengyu Yan <[email protected]>
AuthorDate: Tue Jan 2 10:34:16 2024 +0800

    [Feature][Connector-V2][Assert] Support check the precision and scale of 
Decimal type. (#6110)
---
 release-note.md                                    |  1 +
 .../assertion/excecutor/AssertExecutor.java        | 17 ++++++++
 .../seatunnel/assertion/rule/AssertRuleParser.java | 18 +++++---
 .../flink/assertion/AssertExecutorTest.java        | 51 ++++++++++++++++++++++
 .../flink/assertion/rule/AssertRuleParserTest.java | 47 +++++++++++++++++++-
 .../connectors/seatunnel/jdbc/JdbcHiveIT.java      |  2 +-
 .../resources/jdbc_hive_source_and_assert.conf     |  2 +-
 .../test/resources/mongodb_source_to_assert.conf   |  9 ++++
 8 files changed, 137 insertions(+), 10 deletions(-)

diff --git a/release-note.md b/release-note.md
index 07bb3d11d5..0ded29987d 100644
--- a/release-note.md
+++ b/release-note.md
@@ -185,6 +185,7 @@
 - [Transform-V2] Add Catalog support for FilterRowKindTransform (#4420)
 - [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
 - [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
+- [Connector-V2] [Assert] Support check the precision and scale of Decimal 
type (#6110)
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 5868fba912..21142e8158 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -17,9 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;
 
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
@@ -163,6 +165,21 @@ public class AssertExecutor {
     }
 
     private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
+        if (fieldType.getSqlType() == SqlType.DECIMAL) {
+            return checkDecimalType(value, fieldType);
+        }
         return value.getClass().equals(fieldType.getTypeClass());
     }
+
+    private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?> 
fieldType) {
+        if (!value.getClass().equals(fieldType.getTypeClass())) {
+            return false;
+        }
+        DecimalType fieldDecimalType = (DecimalType) fieldType;
+        BigDecimal valueObj = (BigDecimal) value;
+        if (valueObj.scale() != fieldDecimalType.getScale()) {
+            return false;
+        }
+        return valueObj.precision() <= fieldDecimalType.getPrecision();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index 7681f67cde..842dcabd90 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Maps;
 
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
@@ -39,6 +41,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
 
 public class AssertRuleParser {
 
+    private static final Pattern DECIMAL_TYPE_PATTERN =
+            Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");
+
     public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends 
Config> rowRuleList) {
 
         return assembleFieldValueRules(rowRuleList);
@@ -91,13 +96,14 @@ public class AssertRuleParser {
     }
 
     private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
-        if (fieldTypeStr.toLowerCase().startsWith("decimal(")) {
-            String lengthAndScale =
-                    fieldTypeStr.toLowerCase().replace("decimal(", 
"").replace(")", "");
-            String[] split = lengthAndScale.split(",");
-            return new DecimalType(Integer.valueOf(split[0]), 
Integer.valueOf(split[1]));
+        final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
+        Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
+        if (matcher.find()) {
+            int precision = Integer.parseInt(matcher.group(1));
+            int scale = Integer.parseInt(matcher.group(2));
+            return new DecimalType(precision, scale);
         }
-        return TYPES.get(fieldTypeStr.toLowerCase());
+        return TYPES.get(normalTypeStr);
     }
 
     private static final Map<String, SeaTunnelDataType<?>> TYPES = 
Maps.newHashMap();
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
index 4b9d941535..48b31989c6 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.flink.assertion;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,8 +29,11 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Lists;
 
+import java.math.BigDecimal;
+import java.util.Collections;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
@@ -111,4 +115,51 @@ public class AssertExecutorTest {
         rule.setFieldRules(valueRules);
         return rule;
     }
+
+    @Test
+    public void testDecimalTypeCheck() {
+        List<AssertFieldRule> rules = Lists.newArrayList();
+        AssertFieldRule rule = new AssertFieldRule();
+        rule.setFieldName("c_mock");
+        DecimalType assertFieldType = new DecimalType(10, 2);
+        rule.setFieldType(assertFieldType);
+
+        AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
+        valueRule.setEqualTo("99999999.90");
+
+        rule.setFieldRules(Collections.singletonList(valueRule));
+        rules.add(rule);
+
+        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new 
BigDecimal("99999999.90")});
+        SeaTunnelRowType mockType =
+                new SeaTunnelRowType(
+                        new String[] {"c_mock"}, new SeaTunnelDataType[] {new 
DecimalType(10, 2)});
+
+        AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, 
rules).orElse(null);
+        assertNull(failRule);
+    }
+
+    @Test
+    public void testDecimalTypeCheckError() {
+        List<AssertFieldRule> rules = Lists.newArrayList();
+        AssertFieldRule rule = new AssertFieldRule();
+        rule.setFieldName("c_mock");
+        DecimalType assertFieldType = new DecimalType(1, 0);
+        rule.setFieldType(assertFieldType);
+
+        AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
+        valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
+        rule.setFieldRules(Collections.singletonList(valueRule));
+        rules.add(rule);
+
+        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] 
{BigDecimal.valueOf(99999999.99)});
+        SeaTunnelRowType mockType =
+                new SeaTunnelRowType(
+                        new String[] {"c_mock"}, new SeaTunnelDataType[] {new 
DecimalType(10, 2)});
+
+        AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, 
rules).orElse(null);
+        assertNotNull(failRule);
+        assertEquals(assertFieldType, failRule.getFieldType());
+        assertEquals("c_mock", failRule.getFieldName());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
index 29eabbda08..494c3c4496 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
 
@@ -37,8 +38,39 @@ public class AssertRuleParserTest {
     public void testParseRules() {
         List<? extends Config> ruleConfigList = assembleConfig();
         List<AssertFieldRule> assertFieldRules = 
parser.parseRules(ruleConfigList);
-        assertEquals(assertFieldRules.size(), 2);
-        assertEquals(assertFieldRules.get(0).getFieldType(), 
BasicType.STRING_TYPE);
+        assertEquals(3, assertFieldRules.size());
+
+        AssertFieldRule nameRule = assertFieldRules.get(0);
+        List<AssertFieldRule.AssertRule> nameValueRules = 
nameRule.getFieldRules();
+        assertEquals(BasicType.STRING_TYPE, nameRule.getFieldType());
+        assertEquals("name", nameRule.getFieldName());
+        assertEquals(3, nameValueRules.size());
+        assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, 
nameValueRules.get(0).getRuleType());
+        assertEquals(
+                AssertFieldRule.AssertRuleType.MIN_LENGTH, 
nameValueRules.get(1).getRuleType());
+        assertEquals(3.0, nameValueRules.get(1).getRuleValue());
+        assertEquals(
+                AssertFieldRule.AssertRuleType.MAX_LENGTH, 
nameValueRules.get(2).getRuleType());
+        assertEquals(5.0, nameValueRules.get(2).getRuleValue());
+
+        AssertFieldRule ageRule = assertFieldRules.get(1);
+        List<AssertFieldRule.AssertRule> ageValueRules = 
ageRule.getFieldRules();
+        assertEquals("age", ageRule.getFieldName());
+        assertEquals(3, ageValueRules.size());
+        assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, 
ageValueRules.get(0).getRuleType());
+        assertEquals(AssertFieldRule.AssertRuleType.MIN, 
ageValueRules.get(1).getRuleType());
+        assertEquals(10.0, ageValueRules.get(1).getRuleValue());
+        assertEquals(AssertFieldRule.AssertRuleType.MAX, 
ageValueRules.get(2).getRuleType());
+        assertEquals(20.0, ageValueRules.get(2).getRuleValue());
+
+        AssertFieldRule decimalRule = assertFieldRules.get(2);
+        List<AssertFieldRule.AssertRule> decimalValueRules = 
decimalRule.getFieldRules();
+        assertEquals("c_decimal", decimalRule.getFieldName());
+        assertEquals(new DecimalType(10, 2), decimalRule.getFieldType());
+        assertEquals(2, decimalValueRules.size());
+        assertEquals(
+                AssertFieldRule.AssertRuleType.NOT_NULL, 
decimalValueRules.get(0).getRuleType());
+        assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
     }
 
     private List<? extends Config> assembleConfig() {
@@ -76,6 +108,17 @@ public class AssertRuleParserTest {
                         + "                     rule_value = 20\n"
                         + "                }\n"
                         + "            ]\n"
+                        + "        },{\n"
+                        + "            field_name = c_decimal\n"
+                        + "            field_type= \" decimal( 10 , 2 ) \"\n"
+                        + "            field_value = [\n"
+                        + "                {\n"
+                        + "                    rule_type = NOT_NULL\n"
+                        + "                },\n"
+                        + "                {\n"
+                        + "                    equals_to = \"12.12\"\n"
+                        + "                }\n"
+                        + "            ]\n"
                         + "        }\n"
                         + "        ]\n"
                         + "    \n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
index 936ea73bbe..f183c0c193 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
@@ -131,7 +131,7 @@ public class JdbcHiveIT extends AbstractJdbcIT {
                                 + "        TRUE,"
                                 + "        '2023-09-04',"
                                 + "        '2023-09-04 10:30:00',"
-                                + "        42.12,"
+                                + "        42.10,"
                                 + "        42.12)");
             }
         } catch (Exception exception) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index 21c4059578..0f2b909c12 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -126,7 +126,7 @@ sink{
         {
           field_name = hive_e2e_source_table.decimal_column
           field_type = "decimal(10,2)"
-          field_value = [{equals_to = 42.12}]
+          field_value = [{equals_to = "42.10"}]
           },
         {
          field_name = hive_e2e_source_table.numeric_column
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
index f2e586ebf1..4bde48369d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
@@ -105,6 +105,15 @@ sink {
               rule_type = NOT_NULL
             }
           ]
+        },
+        {
+          field_name = c_decimal
+          field_type = "decimal(33, 18)"
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
         }
       ]
     }

Reply via email to