This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 33978689f5 [Improve][Connector-V2] Time supports default value (#7639)
33978689f5 is described below

commit 33978689f5205e6583086ed0f28590adf184a199
Author: corgy-w <[email protected]>
AuthorDate: Fri Sep 13 18:52:58 2024 +0800

    [Improve][Connector-V2] Time supports default value (#7639)
---
 docs/en/connector-v2/source/FakeSource.md          |  73 +++++++++++-
 .../seatunnel/fake/config/FakeConfig.java          |   2 +
 .../seatunnel/fake/source/FakeDataGenerator.java   | 129 ++++++++++++++++++++-
 .../fake/source/FakeDataGeneratorTest.java         |  39 ++++++-
 .../src/test/resources/fake-data.column.conf       |  36 ++++++
 .../test/resources/fake-data.schema.default.conf   |  55 +++++++++
 6 files changed, 326 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index 53d13366d4..6f6b259736 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -401,6 +401,75 @@ source {
 }
 ```
 
+### Options `defaultValue` Case
+
+Custom data can be generated by `row` and `columns`. For the time type, obtain 
the current time by
+`CURRENT_TIMESTAMP` 、`CURRENT_TIME` 、 `CURRENT_DATE`
+
+```hocon
+    schema = {
+        fields {
+            pk_id = bigint
+            name = string
+            score = int
+            time1 = timestamp
+            time2 = time
+            time3 = date
+        }
+    }
+    # use rows
+    rows = [
+        {
+            kind = INSERT
+            fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        }
+    ]
+```
+
+```hocon
+      schema = {
+          # use columns
+           columns = [
+           {
+              name = book_publication_time
+              type = timestamp
+              defaultValue = "2024-09-12 15:45:30"
+              comment = "book publication time"
+           },
+           {
+              name = book_publication_time2
+              type = timestamp
+              defaultValue = CURRENT_TIMESTAMP
+              comment = "book publication time2"
+           },
+           {
+              name = book_publication_time3
+              type = time
+              defaultValue = "15:45:30"
+              comment = "book publication time3"
+           },
+           {
+              name = book_publication_time4
+              type = time
+              defaultValue = CURRENT_TIME
+              comment = "book publication time4"
+           },
+           {
+              name = book_publication_time5
+              type = date
+              defaultValue = "2024-09-12"
+              comment = "book publication time5"
+           },
+           {
+              name = book_publication_time6
+              type = date
+              defaultValue = CURRENT_DATE
+              comment = "book publication time6"
+           }
+       ]
+      }
+```
+
 ### Use Vector Example
 
 ```hocon
@@ -408,8 +477,10 @@ source {
 source {
   FakeSource {
       row.num = 10
+      # Low priority 
       vector.dimension= 4
       binary.vector.dimension = 8
+      # Low priority 
       schema = {
            table = "simple_example"
            columns = [
@@ -452,8 +523,6 @@ source {
 
 ```
 
-ps: columnScale needs to be improved in schema-feature , used to specify the 
dimension of vectors and precision of float. For details, see 
[here](../../concept/schema-feature.md#Columns)
-
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 2b96c47442..96cd3fc464 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Getter;
+import lombok.Setter;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -451,6 +452,7 @@ public class FakeConfig implements Serializable {
     }
 
     @Getter
+    @Setter
     @AllArgsConstructor
     public static class RowData implements Serializable {
         static final String KEY_KIND = "kind";
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 524d231063..017b2d2946 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -17,6 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.type.ArrayType;
@@ -25,8 +31,11 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
@@ -35,12 +44,24 @@ import 
org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.function.Function;
 
+import static org.apache.seatunnel.api.table.type.SqlType.TIME;
+
 public class FakeDataGenerator {
+    private static final String CURRENT_DATE = "CURRENT_DATE";
+    private static final String CURRENT_TIME = "CURRENT_TIME";
+    private static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+
+    private final ObjectMapper OBJECTMAPPER = new ObjectMapper();
+
     private final CatalogTable catalogTable;
     private final FakeConfig fakeConfig;
     private final JsonDeserializationSchema jsonDeserializationSchema;
@@ -92,7 +113,10 @@ public class FakeDataGenerator {
         // Use manual configuration data preferentially
         List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
         if (fakeConfig.getFakeRows() != null) {
+            SeaTunnelDataType<?>[] fieldTypes = 
catalogTable.getSeaTunnelRowType().getFieldTypes();
+            String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
             for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
+                customField(rowData, fieldTypes, fieldNames);
                 seaTunnelRows.add(convertRow(rowData));
             }
         } else {
@@ -103,6 +127,69 @@ public class FakeDataGenerator {
         return seaTunnelRows;
     }
 
+    private void customField(
+            FakeConfig.RowData rowData, SeaTunnelDataType<?>[] fieldTypes, 
String[] fieldNames) {
+        if (rowData.getFieldsJson() == null) {
+            return;
+        }
+
+        try {
+            JsonNode jsonNode = OBJECTMAPPER.readTree(rowData.getFieldsJson());
+            int arity = fieldTypes.length;
+
+            for (int i = 0; i < arity; i++) {
+                SeaTunnelDataType<?> fieldType = fieldTypes[i];
+                JsonNode field = jsonNode.isArray() ? jsonNode.get(i) : 
jsonNode.get(fieldNames[i]);
+
+                if (field == null) {
+                    continue;
+                }
+
+                String newValue = getNewValueForField(fieldType.getSqlType(), 
field.asText());
+                if (newValue != null) {
+                    jsonNode = replaceFieldValue(jsonNode, i, fieldNames[i], 
newValue);
+                }
+            }
+
+            rowData.setFieldsJson(jsonNode.toString());
+        } catch (JsonProcessingException e) {
+            throw new FakeConnectorException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                    "The data type of the fake data is not supported",
+                    e);
+        }
+    }
+
+    private String getNewValueForField(SqlType sqlType, String fieldValue) {
+        switch (sqlType) {
+            case TIME:
+                return fieldValue.equals(CURRENT_TIME) ? 
LocalTime.now().toString() : null;
+            case DATE:
+                return fieldValue.equalsIgnoreCase(CURRENT_DATE)
+                        ? LocalDate.now().toString()
+                        : null;
+            case TIMESTAMP:
+                return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP)
+                        ? LocalDateTime.now().toString()
+                        : null;
+            default:
+                return null;
+        }
+    }
+
+    private JsonNode replaceFieldValue(
+            JsonNode jsonNode, int index, String fieldName, String newValue) {
+        JsonNode newFieldNode = OBJECTMAPPER.convertValue(newValue, 
JsonNode.class);
+
+        if (jsonNode.isArray()) {
+            ((ArrayNode) jsonNode).set(index, newFieldNode);
+        } else {
+            ((ObjectNode) jsonNode).set(fieldName, newFieldNode);
+        }
+
+        return jsonNode;
+    }
+
     @SuppressWarnings("magicnumber")
     private Object randomColumnValue(Column column) {
         SeaTunnelDataType<?> fieldType = column.getDataType();
@@ -152,11 +239,47 @@ public class FakeDataGenerator {
             case BYTES:
                 return value(column, String::getBytes, 
fakeDataRandomUtils::randomBytes);
             case DATE:
-                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalDate);
+                return value(
+                        column,
+                        defaultValue -> {
+                            if (defaultValue.equalsIgnoreCase(CURRENT_DATE)) {
+                                return LocalDate.now();
+                            }
+                            DateTimeFormatter dateTimeFormatter =
+                                    DateUtils.matchDateFormatter(defaultValue);
+                            return LocalDate.parse(
+                                    defaultValue,
+                                    dateTimeFormatter == null
+                                            ? DateTimeFormatter.ISO_LOCAL_DATE
+                                            : dateTimeFormatter);
+                        },
+                        fakeDataRandomUtils::randomLocalDate);
             case TIME:
-                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalTime);
+                return value(
+                        column,
+                        defaultValue -> {
+                            if (defaultValue.equalsIgnoreCase(CURRENT_TIME)) {
+                                return LocalTime.now();
+                            }
+                            return LocalTime.parse(defaultValue, 
DateTimeFormatter.ISO_LOCAL_TIME);
+                        },
+                        fakeDataRandomUtils::randomLocalTime);
             case TIMESTAMP:
-                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalDateTime);
+                return value(
+                        column,
+                        defaultValue -> {
+                            if 
(defaultValue.equalsIgnoreCase(CURRENT_TIMESTAMP)) {
+                                return LocalDateTime.now();
+                            }
+                            DateTimeFormatter dateTimeFormatter =
+                                    
DateTimeUtils.matchDateTimeFormatter(defaultValue);
+                            return LocalDateTime.parse(
+                                    defaultValue,
+                                    dateTimeFormatter == null
+                                            ? 
DateTimeFormatter.ISO_LOCAL_DATE_TIME
+                                            : dateTimeFormatter);
+                        },
+                        fakeDataRandomUtils::randomLocalDateTime);
             case ROW:
                 SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) 
fieldType).getFieldTypes();
                 Object[] objects = new Object[fieldTypes.length];
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index e33883f554..81aae384bb 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -41,6 +42,9 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -173,13 +177,17 @@ public class FakeDataGeneratorTest {
                             8, ((ByteBuffer) 
seaTunnelRow.getField(8)).capacity() / 2);
                     // VectorType.VECTOR_SPARSE_FLOAT_TYPE
                     Assertions.assertEquals(8, ((Map) 
seaTunnelRow.getField(9)).size());
+                    
Assertions.assertNotNull(seaTunnelRow.getField(10).toString());
+                    
Assertions.assertNotNull(seaTunnelRow.getField(11).toString());
                     Assertions.assertEquals(
-                            268,
+                            436,
                             seaTunnelRow.getBytesSize(
                                     new SeaTunnelRowType(
                                             new String[] {
                                                 "field1", "field2", "field3", 
"field4", "field5",
-                                                "field6", "field7", "field8", 
"field9", "field10"
+                                                "field6", "field7", "field8", 
"field9", "field10",
+                                                "field11", "field12", 
"field13", "field14",
+                                                "field15", "field16"
                                             },
                                             new SeaTunnelDataType<?>[] {
                                                 BasicType.STRING_TYPE,
@@ -191,11 +199,36 @@ public class FakeDataGeneratorTest {
                                                 VectorType.VECTOR_BINARY_TYPE,
                                                 VectorType.VECTOR_FLOAT16_TYPE,
                                                 
VectorType.VECTOR_BFLOAT16_TYPE,
-                                                
VectorType.VECTOR_SPARSE_FLOAT_TYPE
+                                                
VectorType.VECTOR_SPARSE_FLOAT_TYPE,
+                                                
LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                                
LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                                LocalTimeType.LOCAL_TIME_TYPE,
+                                                LocalTimeType.LOCAL_TIME_TYPE,
+                                                LocalTimeType.LOCAL_DATE_TYPE,
+                                                LocalTimeType.LOCAL_DATE_TYPE
                                             })));
                 });
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"fake-data.schema.default.conf"})
+    public void testDataParse(String conf) throws FileNotFoundException, 
URISyntaxException {
+        ReadonlyConfig testConfig = getTestConfigFile(conf);
+        FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        List<SeaTunnelRow> seaTunnelRows =
+                fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+        seaTunnelRows.forEach(
+                seaTunnelRow -> {
+                    Assertions.assertInstanceOf(Long.class, 
seaTunnelRow.getField(0));
+                    Assertions.assertInstanceOf(String.class, 
seaTunnelRow.getField(1));
+                    Assertions.assertInstanceOf(Integer.class, 
seaTunnelRow.getField(2));
+                    Assertions.assertInstanceOf(LocalDateTime.class, 
seaTunnelRow.getField(3));
+                    Assertions.assertInstanceOf(LocalTime.class, 
seaTunnelRow.getField(4));
+                    Assertions.assertInstanceOf(LocalDate.class, 
seaTunnelRow.getField(5));
+                });
+    }
+
     private ReadonlyConfig getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         if (!configFile.startsWith("/")) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
index 9a1515264e..9486fc6423 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
@@ -91,6 +91,42 @@
               type = sparse_float_vector
               columnScale =8
               comment = "vector"
+           },
+           {
+              name = book_publication_time
+              type = timestamp
+              defaultValue = "2024-09-12 15:45:30"
+              comment = "book publication time"
+           },
+           {
+              name = book_publication_time2
+              type = timestamp
+              defaultValue = CURRENT_TIMESTAMP
+              comment = "book publication time2"
+           },
+           {
+              name = book_publication_time3
+              type = time
+              defaultValue = "15:45:30"
+              comment = "book publication time3"
+           },
+           {
+              name = book_publication_time4
+              type = time
+              defaultValue = CURRENT_TIME
+              comment = "book publication time4"
+           },
+           {
+              name = book_publication_time5
+              type = date
+              defaultValue = "2024-09-12"
+              comment = "book publication time5"
+           },
+           {
+              name = book_publication_time6
+              type = date
+              defaultValue = CURRENT_DATE
+              comment = "book publication time6"
            }
        ]
       }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
new file mode 100644
index 0000000000..911a997283
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FakeSource {
+    schema = {
+        fields {
+            pk_id = bigint
+            name = string
+            score = int
+            time1 = timestamp
+            time2 = time
+            time3 = date
+        }
+    }
+    rows = [
+        {
+            kind = INSERT
+            fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        },
+        {
+            kind = INSERT
+            fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        },
+        {
+            kind = INSERT
+            fields = [3, "C", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        },
+        {
+            kind = UPDATE_BEFORE
+            fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        },
+        {
+            kind = UPDATE_AFTER
+            fields = [1, "A_1", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        },
+        {
+            kind = DELETE
+            fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME, 
CURRENT_DATE]
+        }
+    ]
+}
\ No newline at end of file

Reply via email to