This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 33978689f5 [Improve][Connector-V2] Time supports default value (#7639)
33978689f5 is described below
commit 33978689f5205e6583086ed0f28590adf184a199
Author: corgy-w <[email protected]>
AuthorDate: Fri Sep 13 18:52:58 2024 +0800
[Improve][Connector-V2] Time supports default value (#7639)
---
docs/en/connector-v2/source/FakeSource.md | 73 +++++++++++-
.../seatunnel/fake/config/FakeConfig.java | 2 +
.../seatunnel/fake/source/FakeDataGenerator.java | 129 ++++++++++++++++++++-
.../fake/source/FakeDataGeneratorTest.java | 39 ++++++-
.../src/test/resources/fake-data.column.conf | 36 ++++++
.../test/resources/fake-data.schema.default.conf | 55 +++++++++
6 files changed, 326 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index 53d13366d4..6f6b259736 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -401,6 +401,75 @@ source {
}
```
+### Options `defaultValue` Case
+
+Custom data can be generated by `row` and `columns`. For the time type, obtain
the current time by
+`CURRENT_TIMESTAMP` 、`CURRENT_TIME` 、 `CURRENT_DATE`
+
+```hocon
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ time1 = timestamp
+ time2 = time
+ time3 = date
+ }
+ }
+ # use rows
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ }
+ ]
+```
+
+```hocon
+ schema = {
+ # use columns
+ columns = [
+ {
+ name = book_publication_time
+ type = timestamp
+ defaultValue = "2024-09-12 15:45:30"
+ comment = "book publication time"
+ },
+ {
+ name = book_publication_time2
+ type = timestamp
+ defaultValue = CURRENT_TIMESTAMP
+ comment = "book publication time2"
+ },
+ {
+ name = book_publication_time3
+ type = time
+ defaultValue = "15:45:30"
+ comment = "book publication time3"
+ },
+ {
+ name = book_publication_time4
+ type = time
+ defaultValue = CURRENT_TIME
+ comment = "book publication time4"
+ },
+ {
+ name = book_publication_time5
+ type = date
+ defaultValue = "2024-09-12"
+ comment = "book publication time5"
+ },
+ {
+ name = book_publication_time6
+ type = date
+ defaultValue = CURRENT_DATE
+ comment = "book publication time6"
+ }
+ ]
+ }
+```
+
### Use Vector Example
```hocon
@@ -408,8 +477,10 @@ source {
source {
FakeSource {
row.num = 10
+ # Low priority
vector.dimension= 4
binary.vector.dimension = 8
+ # Low priority
schema = {
table = "simple_example"
columns = [
@@ -452,8 +523,6 @@ source {
```
-ps: columnScale needs to be improved in schema-feature , used to specify the
dimension of vectors and precision of float. For details, see
[here](../../concept/schema-feature.md#Columns)
-
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 2b96c47442..96cd3fc464 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
+import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
@@ -451,6 +452,7 @@ public class FakeConfig implements Serializable {
}
@Getter
+ @Setter
@AllArgsConstructor
public static class RowData implements Serializable {
static final String KEY_KIND = "kind";
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 524d231063..017b2d2946 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -17,6 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.ArrayType;
@@ -25,8 +31,11 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
@@ -35,12 +44,24 @@ import
org.apache.seatunnel.format.json.JsonDeserializationSchema;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
+import static org.apache.seatunnel.api.table.type.SqlType.TIME;
+
public class FakeDataGenerator {
+ private static final String CURRENT_DATE = "CURRENT_DATE";
+ private static final String CURRENT_TIME = "CURRENT_TIME";
+ private static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+
+ private final ObjectMapper OBJECTMAPPER = new ObjectMapper();
+
private final CatalogTable catalogTable;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;
@@ -92,7 +113,10 @@ public class FakeDataGenerator {
// Use manual configuration data preferentially
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
+ SeaTunnelDataType<?>[] fieldTypes =
catalogTable.getSeaTunnelRowType().getFieldTypes();
+ String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
+ customField(rowData, fieldTypes, fieldNames);
seaTunnelRows.add(convertRow(rowData));
}
} else {
@@ -103,6 +127,69 @@ public class FakeDataGenerator {
return seaTunnelRows;
}
+ private void customField(
+ FakeConfig.RowData rowData, SeaTunnelDataType<?>[] fieldTypes,
String[] fieldNames) {
+ if (rowData.getFieldsJson() == null) {
+ return;
+ }
+
+ try {
+ JsonNode jsonNode = OBJECTMAPPER.readTree(rowData.getFieldsJson());
+ int arity = fieldTypes.length;
+
+ for (int i = 0; i < arity; i++) {
+ SeaTunnelDataType<?> fieldType = fieldTypes[i];
+ JsonNode field = jsonNode.isArray() ? jsonNode.get(i) :
jsonNode.get(fieldNames[i]);
+
+ if (field == null) {
+ continue;
+ }
+
+ String newValue = getNewValueForField(fieldType.getSqlType(),
field.asText());
+ if (newValue != null) {
+ jsonNode = replaceFieldValue(jsonNode, i, fieldNames[i],
newValue);
+ }
+ }
+
+ rowData.setFieldsJson(jsonNode.toString());
+ } catch (JsonProcessingException e) {
+ throw new FakeConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "The data type of the fake data is not supported",
+ e);
+ }
+ }
+
+ private String getNewValueForField(SqlType sqlType, String fieldValue) {
+ switch (sqlType) {
+ case TIME:
+ return fieldValue.equals(CURRENT_TIME) ?
LocalTime.now().toString() : null;
+ case DATE:
+ return fieldValue.equalsIgnoreCase(CURRENT_DATE)
+ ? LocalDate.now().toString()
+ : null;
+ case TIMESTAMP:
+ return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP)
+ ? LocalDateTime.now().toString()
+ : null;
+ default:
+ return null;
+ }
+ }
+
+ private JsonNode replaceFieldValue(
+ JsonNode jsonNode, int index, String fieldName, String newValue) {
+ JsonNode newFieldNode = OBJECTMAPPER.convertValue(newValue,
JsonNode.class);
+
+ if (jsonNode.isArray()) {
+ ((ArrayNode) jsonNode).set(index, newFieldNode);
+ } else {
+ ((ObjectNode) jsonNode).set(fieldName, newFieldNode);
+ }
+
+ return jsonNode;
+ }
+
@SuppressWarnings("magicnumber")
private Object randomColumnValue(Column column) {
SeaTunnelDataType<?> fieldType = column.getDataType();
@@ -152,11 +239,47 @@ public class FakeDataGenerator {
case BYTES:
return value(column, String::getBytes,
fakeDataRandomUtils::randomBytes);
case DATE:
- return value(column, String::toString,
fakeDataRandomUtils::randomLocalDate);
+ return value(
+ column,
+ defaultValue -> {
+ if (defaultValue.equalsIgnoreCase(CURRENT_DATE)) {
+ return LocalDate.now();
+ }
+ DateTimeFormatter dateTimeFormatter =
+ DateUtils.matchDateFormatter(defaultValue);
+ return LocalDate.parse(
+ defaultValue,
+ dateTimeFormatter == null
+ ? DateTimeFormatter.ISO_LOCAL_DATE
+ : dateTimeFormatter);
+ },
+ fakeDataRandomUtils::randomLocalDate);
case TIME:
- return value(column, String::toString,
fakeDataRandomUtils::randomLocalTime);
+ return value(
+ column,
+ defaultValue -> {
+ if (defaultValue.equalsIgnoreCase(CURRENT_TIME)) {
+ return LocalTime.now();
+ }
+ return LocalTime.parse(defaultValue,
DateTimeFormatter.ISO_LOCAL_TIME);
+ },
+ fakeDataRandomUtils::randomLocalTime);
case TIMESTAMP:
- return value(column, String::toString,
fakeDataRandomUtils::randomLocalDateTime);
+ return value(
+ column,
+ defaultValue -> {
+ if
(defaultValue.equalsIgnoreCase(CURRENT_TIMESTAMP)) {
+ return LocalDateTime.now();
+ }
+ DateTimeFormatter dateTimeFormatter =
+
DateTimeUtils.matchDateTimeFormatter(defaultValue);
+ return LocalDateTime.parse(
+ defaultValue,
+ dateTimeFormatter == null
+ ?
DateTimeFormatter.ISO_LOCAL_DATE_TIME
+ : dateTimeFormatter);
+ },
+ fakeDataRandomUtils::randomLocalDateTime);
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType)
fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index e33883f554..81aae384bb 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -41,6 +42,9 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -173,13 +177,17 @@ public class FakeDataGeneratorTest {
8, ((ByteBuffer)
seaTunnelRow.getField(8)).capacity() / 2);
// VectorType.VECTOR_SPARSE_FLOAT_TYPE
Assertions.assertEquals(8, ((Map)
seaTunnelRow.getField(9)).size());
+
Assertions.assertNotNull(seaTunnelRow.getField(10).toString());
+
Assertions.assertNotNull(seaTunnelRow.getField(11).toString());
Assertions.assertEquals(
- 268,
+ 436,
seaTunnelRow.getBytesSize(
new SeaTunnelRowType(
new String[] {
"field1", "field2", "field3",
"field4", "field5",
- "field6", "field7", "field8",
"field9", "field10"
+ "field6", "field7", "field8",
"field9", "field10",
+ "field11", "field12",
"field13", "field14",
+ "field15", "field16"
},
new SeaTunnelDataType<?>[] {
BasicType.STRING_TYPE,
@@ -191,11 +199,36 @@ public class FakeDataGeneratorTest {
VectorType.VECTOR_BINARY_TYPE,
VectorType.VECTOR_FLOAT16_TYPE,
VectorType.VECTOR_BFLOAT16_TYPE,
-
VectorType.VECTOR_SPARSE_FLOAT_TYPE
+
VectorType.VECTOR_SPARSE_FLOAT_TYPE,
+
LocalTimeType.LOCAL_DATE_TIME_TYPE,
+
LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE
})));
});
}
+ @ParameterizedTest
+ @ValueSource(strings = {"fake-data.schema.default.conf"})
+ public void testDataParse(String conf) throws FileNotFoundException,
URISyntaxException {
+ ReadonlyConfig testConfig = getTestConfigFile(conf);
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
+ fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+ seaTunnelRows.forEach(
+ seaTunnelRow -> {
+ Assertions.assertInstanceOf(Long.class,
seaTunnelRow.getField(0));
+ Assertions.assertInstanceOf(String.class,
seaTunnelRow.getField(1));
+ Assertions.assertInstanceOf(Integer.class,
seaTunnelRow.getField(2));
+ Assertions.assertInstanceOf(LocalDateTime.class,
seaTunnelRow.getField(3));
+ Assertions.assertInstanceOf(LocalTime.class,
seaTunnelRow.getField(4));
+ Assertions.assertInstanceOf(LocalDate.class,
seaTunnelRow.getField(5));
+ });
+ }
+
private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
index 9a1515264e..9486fc6423 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
@@ -91,6 +91,42 @@
type = sparse_float_vector
columnScale =8
comment = "vector"
+ },
+ {
+ name = book_publication_time
+ type = timestamp
+ defaultValue = "2024-09-12 15:45:30"
+ comment = "book publication time"
+ },
+ {
+ name = book_publication_time2
+ type = timestamp
+ defaultValue = CURRENT_TIMESTAMP
+ comment = "book publication time2"
+ },
+ {
+ name = book_publication_time3
+ type = time
+ defaultValue = "15:45:30"
+ comment = "book publication time3"
+ },
+ {
+ name = book_publication_time4
+ type = time
+ defaultValue = CURRENT_TIME
+ comment = "book publication time4"
+ },
+ {
+ name = book_publication_time5
+ type = date
+ defaultValue = "2024-09-12"
+ comment = "book publication time5"
+ },
+ {
+ name = book_publication_time6
+ type = date
+ defaultValue = CURRENT_DATE
+ comment = "book publication time6"
}
]
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
new file mode 100644
index 0000000000..911a997283
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.default.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ time1 = timestamp
+ time2 = time
+ time3 = date
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME,
CURRENT_DATE]
+ }
+ ]
+}
\ No newline at end of file