This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 044f62ef3 [Improve][Connector-V2] Improve fake source connector (#2944)
044f62ef3 is described below
commit 044f62ef32cd0d195bc4bb6c8492d48d15379ff4
Author: TyrantLucifer <[email protected]>
AuthorDate: Fri Sep 30 18:23:26 2022 +0800
[Improve][Connector-V2] Improve fake source connector (#2944)
* [Improve][Connector-V2] Improve fake source connector
* [Improve][Connector-V2] Update doc
* [Improve][Connector-V2] Fix e2e test cases
* [Improve][Connector-V2] Fix integration error.
* Revert "[Improve][Connector-V2] Fix integration error."
This reverts commit c840c696ceef27bfa2360e08efc626a77a26316a.
* [Improve][Connector-V2] Fix flink e2e test cases
* [Improve][Connector-V2] Optimize class name
---
docs/en/connector-v2/source/FakeSource.md | 111 +++++++++++-----
.../seatunnel/fake/source/FakeConfig.java | 70 ++++++++++
.../seatunnel/fake/source/FakeDataGenerator.java | 147 +++++++++++++++++++++
.../seatunnel/fake/source/FakeOptions.java | 40 ------
.../seatunnel/fake/source/FakeRandomData.java | 133 -------------------
.../seatunnel/fake/source/FakeSource.java | 10 +-
.../seatunnel/fake/source/FakeSourceReader.java | 14 +-
...domDataTest.java => FakeDataGeneratorTest.java} | 62 ++++-----
.../src/test/resources/complex.schema.conf | 63 ++++++---
.../src/test/resources/simple.schema.conf | 43 +++---
.../resources/assertion/fakesource_to_assert.conf | 4 +-
.../resources/assertion/fakesource_to_assert.conf | 2 +-
12 files changed, 412 insertions(+), 287 deletions(-)
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index c4bc5057c..ce8aef406 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -5,7 +5,7 @@
## Description
The FakeSource is a virtual data source, which randomly generates the number
of rows according to the data structure of the user-defined schema,
-just for testing, such as type conversion and feature testing
+just for some test cases such as type conversion or connector new feature
testing
## Key features
@@ -18,30 +18,43 @@ just for testing, such as type conversion and feature
testing
## Options
-| name | type | required | default value |
-|-------------------|--------|----------|---------------|
-| result_table_name | string | yes | - |
-| schema | config | yes | - |
-| row.num | long | no | 10 |
+| name | type | required | default value |
+|---------------|--------|----------|---------------|
+| schema | config | yes | - |
+| row.num | int | no | 5 |
+| map.size | int | no | 5 |
+| array.size | int | no | 5 |
+| bytes.length | int | no | 5 |
+| string.length | int | no | 5 |
-### result_table_name [string]
-The table name.
+### schema [config]
-### type [string]
+The schema of fake data that you want to generate
-Table structure description ,you should assign schema option to tell connector
how to parse data to the row you want.
-**Tips**: Most of Unstructured-Datasource contain this param, such as
LocalFile,HdfsFile.
-**Example**:
-
-### row.num
-Number of additional rows of generated data
+For example:
```hocon
-schema = {
- fields {
- c_map = "map<string, string>"
- c_array = "array<tinyint>"
+ schema = {
+ fields {
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
@@ -54,23 +67,61 @@ schema = {
c_null = "null"
c_bytes = bytes
c_date = date
- c_time = time
c_timestamp = timestamp
}
}
+ }
```
-## Example
+### row.num
+
+Total num of data that connector generated
+
+### map.size
+
+The size of `map` type that connector generated
-Simple source for FakeSource which contains enough datatype
+### array.size
+
+The size of `array` type that connector generated
+
+### bytes.length
+
+The length of `bytes` type that connector generated
+
+### string.length
+
+The length of `string` type that connector generated
+
+## Example
```hocon
-source {
- FakeSource {
- schema = {
- fields {
- c_map = "map<string, string>"
- c_array = "array<tinyint>"
+FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
@@ -83,11 +134,9 @@ source {
c_null = "null"
c_bytes = bytes
c_date = date
- c_time = time
c_timestamp = timestamp
}
}
- result_table_name = "fake"
}
}
-```
\ No newline at end of file
+```
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
new file mode 100644
index 000000000..8d4fe1d77
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Builder
+@Getter
+public class FakeConfig implements Serializable {
+ private static final String ROW_NUM = "row.num";
+ private static final String MAP_SIZE = "map.size";
+ private static final String ARRAY_SIZE = "array.size";
+ private static final String BYTES_LENGTH = "bytes.length";
+ private static final String STRING_LENGTH = "string.length";
+ private static final int DEFAULT_ROW_NUM = 5;
+ private static final int DEFAULT_MAP_SIZE = 5;
+ private static final int DEFAULT_ARRAY_SIZE = 5;
+ private static final int DEFAULT_BYTES_LENGTH = 5;
+ private static final int DEFAULT_STRING_LENGTH = 5;
+ @Builder.Default
+ private int rowNum = DEFAULT_ROW_NUM;
+ @Builder.Default
+ private int mapSize = DEFAULT_MAP_SIZE;
+ @Builder.Default
+ private int arraySize = DEFAULT_ARRAY_SIZE;
+ @Builder.Default
+ private int bytesLength = DEFAULT_BYTES_LENGTH;
+ @Builder.Default
+ private int stringLength = DEFAULT_STRING_LENGTH;
+
+ public static FakeConfig buildWithConfig(Config config) {
+ FakeConfigBuilder builder = FakeConfig.builder();
+ if (config.hasPath(ROW_NUM)) {
+ builder.rowNum(config.getInt(ROW_NUM));
+ }
+ if (config.hasPath(MAP_SIZE)) {
+ builder.mapSize(config.getInt(MAP_SIZE));
+ }
+ if (config.hasPath(ARRAY_SIZE)) {
+ builder.arraySize(config.getInt(ARRAY_SIZE));
+ }
+ if (config.hasPath(BYTES_LENGTH)) {
+ builder.bytesLength(config.getInt(BYTES_LENGTH));
+ }
+ if (config.hasPath(STRING_LENGTH)) {
+ builder.stringLength(config.getInt(STRING_LENGTH));
+ }
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
new file mode 100644
index 000000000..3adfb885a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class FakeDataGenerator {
+ public static final String SCHEMA = "schema";
+ private final SeaTunnelSchema schema;
+ private final FakeConfig fakeConfig;
+
+ public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) {
+ this.schema = schema;
+ this.fakeConfig = fakeConfig;
+ }
+
+ private SeaTunnelRow randomRow() {
+ SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ List<Object> randomRow = new ArrayList<>(fieldNames.length);
+ for (SeaTunnelDataType<?> fieldType : fieldTypes) {
+ randomRow.add(randomColumnValue(fieldType));
+ }
+ return new SeaTunnelRow(randomRow.toArray());
+ }
+
+ public List<SeaTunnelRow> generateFakedRows() {
+ ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
+ for (int i = 0; i < fakeConfig.getRowNum(); i++) {
+ seaTunnelRows.add(randomRow());
+ }
+ return seaTunnelRows;
+ }
+
+ @SuppressWarnings("magicnumber")
+ private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+ switch (fieldType.getSqlType()) {
+ case ARRAY:
+ ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+ BasicType<?> elementType = arrayType.getElementType();
+ int length = fakeConfig.getArraySize();
+ Object array = Array.newInstance(elementType.getTypeClass(),
length);
+ for (int i = 0; i < length; i++) {
+ Object value = randomColumnValue(elementType);
+ Array.set(array, i, value);
+ }
+ return array;
+ case MAP:
+ MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ HashMap<Object, Object> objectMap = new HashMap<>();
+ int mapSize = fakeConfig.getMapSize();
+ for (int i = 0; i < mapSize; i++) {
+ Object key = randomColumnValue(keyType);
+ Object value = randomColumnValue(valueType);
+ objectMap.put(key, value);
+ }
+ return objectMap;
+ case STRING:
+ return
RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
+ case BOOLEAN:
+ return RandomUtils.nextInt(0, 2) == 1;
+ case TINYINT:
+ return (byte) RandomUtils.nextInt(0, 255);
+ case SMALLINT:
+ return (short) RandomUtils.nextInt(Byte.MAX_VALUE,
Short.MAX_VALUE);
+ case INT:
+ return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
+ case BIGINT:
+ return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
+ case FLOAT:
+ return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
+ case DOUBLE:
+ return RandomUtils.nextDouble(Float.MAX_VALUE,
Double.MAX_VALUE);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) fieldType;
+ return new
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() -
decimalType.getScale()) + "." +
+
RandomStringUtils.randomNumeric(decimalType.getScale()));
+ case NULL:
+ return null;
+ case BYTES:
+ return RandomUtils.nextBytes(fakeConfig.getBytesLength());
+ case DATE:
+ return randomLocalDateTime().toLocalDate();
+ case TIME:
+ return randomLocalDateTime().toLocalTime();
+ case TIMESTAMP:
+ return randomLocalDateTime();
+ case ROW:
+ SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType)
fieldType).getFieldTypes();
+ Object[] objects = new Object[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ Object object = randomColumnValue(fieldTypes[i]);
+ objects[i] = object;
+ }
+ return new SeaTunnelRow(objects);
+ default:
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel Fake source
connector not support this data type");
+ }
+ }
+
+ @SuppressWarnings("magicnumber")
+ private LocalDateTime randomLocalDateTime() {
+ return LocalDateTime.of(
+ LocalDateTime.now().getYear(),
+ RandomUtils.nextInt(1, 12),
+ RandomUtils.nextInt(1, 28),
+ RandomUtils.nextInt(0, 24),
+ RandomUtils.nextInt(0, 59)
+ );
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
deleted file mode 100644
index 96dc5e8ac..000000000
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.io.Serializable;
-
-public class FakeOptions implements Serializable {
-
- private static final String ROW_NUM = "row.num";
- private static final Long DEFAULT_ROW_NUM = 10L;
- @Getter
- @Setter
- private Long rowNum;
-
- public static FakeOptions parse(Config config) {
- FakeOptions fakeOptions = new FakeOptions();
- fakeOptions.setRowNum(config.hasPath(ROW_NUM) ?
config.getLong(ROW_NUM) : DEFAULT_ROW_NUM);
- return fakeOptions;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
deleted file mode 100644
index 3ea562e4c..000000000
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class FakeRandomData {
- public static final String SCHEMA = "schema";
- private final SeaTunnelSchema schema;
-
- public FakeRandomData(SeaTunnelSchema schema) {
- this.schema = schema;
- }
-
- public SeaTunnelRow randomRow() {
- SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
- String[] fieldNames = seaTunnelRowType.getFieldNames();
- SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
- List<Object> randomRow = new ArrayList<>(fieldNames.length);
- for (SeaTunnelDataType<?> fieldType : fieldTypes) {
- randomRow.add(randomColumnValue(fieldType));
- }
- return new SeaTunnelRow(randomRow.toArray());
- }
-
- @SuppressWarnings("magicnumber")
- private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
- if (BOOLEAN_TYPE.equals(fieldType)) {
- return RandomUtils.nextInt(0, 2) == 1;
- } else if (BYTE_TYPE.equals(fieldType)) {
- return (byte) RandomUtils.nextInt(0, 255);
- } else if (SHORT_TYPE.equals(fieldType)) {
- return (short) RandomUtils.nextInt(Byte.MAX_VALUE,
Short.MAX_VALUE);
- } else if (INT_TYPE.equals(fieldType)) {
- return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
- } else if (LONG_TYPE.equals(fieldType)) {
- return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
- } else if (FLOAT_TYPE.equals(fieldType)) {
- return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
- } else if (DOUBLE_TYPE.equals(fieldType)) {
- return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
- } else if (STRING_TYPE.equals(fieldType)) {
- return RandomStringUtils.randomAlphabetic(10);
- } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
- return randomLocalDateTime().toLocalDate();
- } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
- return randomLocalDateTime().toLocalTime();
- } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
- return randomLocalDateTime();
- } else if (fieldType instanceof DecimalType) {
- DecimalType decimalType = (DecimalType) fieldType;
- return new
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() -
decimalType.getScale()) + "." +
- RandomStringUtils.randomNumeric(decimalType.getScale()));
- } else if (fieldType instanceof ArrayType) {
- ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
- BasicType<?> elementType = arrayType.getElementType();
- Object value = randomColumnValue(elementType);
- Object arr = Array.newInstance(elementType.getTypeClass(), 1);
- Array.set(arr, 0, value);
- return arr;
- } else if (fieldType instanceof MapType) {
- MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
- SeaTunnelDataType<?> keyType = mapType.getKeyType();
- Object key = randomColumnValue(keyType);
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- Object value = randomColumnValue(valueType);
- HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
- objectObjectHashMap.put(key, value);
- return objectObjectHashMap;
- } else if (fieldType instanceof PrimitiveByteArrayType) {
- return RandomUtils.nextBytes(3);
- } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
- return null;
- } else {
- throw new UnsupportedOperationException("Unexpected value: " +
fieldType);
- }
- }
-
- @SuppressWarnings("magicnumber")
- private LocalDateTime randomLocalDateTime() {
- return LocalDateTime.of(
- LocalDateTime.now().getYear(),
- RandomUtils.nextInt(1, 12),
- RandomUtils.nextInt(1, 28),
- RandomUtils.nextInt(0, 24),
- RandomUtils.nextInt(0, 59)
- );
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index eeede4439..5e22ce5fc 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -38,7 +38,7 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
private JobContext jobContext;
private SeaTunnelSchema schema;
- private FakeOptions fakeOptions;
+ private FakeConfig fakeConfig;
@Override
public Boundedness getBoundedness() {
@@ -52,7 +52,7 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
- return new FakeSourceReader(readerContext, new FakeRandomData(schema),
fakeOptions);
+ return new FakeSourceReader(readerContext, new
FakeDataGenerator(schema, fakeConfig));
}
@Override
@@ -63,9 +63,9 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
- assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
- this.schema =
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
- this.fakeOptions = FakeOptions.parse(pluginConfig);
+ assert pluginConfig.hasPath(FakeDataGenerator.SCHEMA);
+ this.schema =
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeDataGenerator.SCHEMA));
+ this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 6301a284f..b847838c0 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -26,19 +26,19 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceReader.class);
private final SingleSplitReaderContext context;
- private final FakeRandomData fakeRandomData;
- private final FakeOptions options;
+ private final FakeDataGenerator fakeDataGenerator;
- public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData
randomData, FakeOptions options) {
+ public FakeSourceReader(SingleSplitReaderContext context,
FakeDataGenerator randomData) {
this.context = context;
- this.fakeRandomData = randomData;
- this.options = options;
+ this.fakeDataGenerator = randomData;
}
@Override
@@ -55,8 +55,8 @@ public class FakeSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
// Generate a random number of rows to emit.
- for (int i = 0; i < options.getRowNum(); i++) {
- SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
+ List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows();
+ for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
output.collect(seaTunnelRow);
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
similarity index 55%
rename from
seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
rename to
seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 520718b1b..bc4d991b6 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,40 +30,44 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.FileNotFoundException;
-import java.lang.reflect.Array;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.List;
import java.util.Map;
-public class FakeRandomDataTest {
+public class FakeDataGeneratorTest {
@ParameterizedTest
@ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
public void testComplexSchemaParse(String conf) throws
FileNotFoundException, URISyntaxException {
- Config testConfigFile = getTestConfigFile(conf);
- SeaTunnelSchema seatunnelSchema =
SeaTunnelSchema.buildWithConfig(testConfigFile);
- FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema);
- SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
- Assertions.assertNotNull(seaTunnelRow);
- Object[] fields = seaTunnelRow.getFields();
- Assertions.assertNotNull(fields);
- SeaTunnelRowType seaTunnelRowType =
seatunnelSchema.getSeaTunnelRowType();
- SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
- for (int i = 0; i < fieldTypes.length; i++) {
- if (fieldTypes[i].getSqlType() != SqlType.NULL) {
- Assertions.assertNotNull(fields[i]);
- } else {
- Assertions.assertSame(fields[i], null);
- }
- if (fieldTypes[i].getSqlType() == SqlType.MAP) {
- Assertions.assertTrue(fields[i] instanceof Map);
- Map<?, ?> field = (Map) fields[i];
- field.forEach((k, v) -> Assertions.assertTrue(k != null && v
!= null));
- }
- if (fieldTypes[i].getSqlType() == SqlType.ARRAY) {
- Assertions.assertTrue(fields[i].getClass().isArray());
- Assertions.assertNotNull(Array.get(fields[i], 0));
+ Config testConfig = getTestConfigFile(conf);
+ SeaTunnelSchema seaTunnelSchema =
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA));
+ SeaTunnelRowType seaTunnelRowType =
seaTunnelSchema.getSeaTunnelRowType();
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(seaTunnelSchema, fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows();
+ Assertions.assertNotNull(seaTunnelRows);
+ Assertions.assertEquals(seaTunnelRows.size(), 10);
+ for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+ for (int i = 0; i < seaTunnelRowType.getFieldTypes().length; i++) {
+ switch (seaTunnelRowType.getFieldType(i).getSqlType()) {
+ case STRING:
+ Assertions.assertEquals(((String)
seaTunnelRow.getField(i)).length(), 10);
+ break;
+ case BYTES:
+ Assertions.assertEquals(((byte[])
seaTunnelRow.getField(i)).length, 10);
+ break;
+ case ARRAY:
+ Assertions.assertEquals(((Object[])
seaTunnelRow.getField(i)).length, 10);
+ break;
+ case MAP:
+ Assertions.assertEquals(((Map<?, ?>)
seaTunnelRow.getField(i)).size(), 10);
+ break;
+ default:
+ // do nothing
+ break;
+ }
}
}
}
@@ -74,14 +76,14 @@ public class FakeRandomDataTest {
if (!configFile.startsWith("/")) {
configFile = "/" + configFile;
}
- URL resource = FakeRandomDataTest.class.getResource(configFile);
+ URL resource = FakeDataGeneratorTest.class.getResource(configFile);
if (resource == null) {
throw new FileNotFoundException("Can't find config file: " +
configFile);
}
String path = Paths.get(resource.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(path));
- assert config.hasPath("schema");
- return config.getConfig("schema");
+ assert config.hasPath("FakeSource");
+ return config.getConfig("FakeSource");
}
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 6a06dbf06..96e82ee41 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -15,24 +15,47 @@
# limitations under the License.
#
-schema {
- fields {
- map = "map<string, map<string, string>>"
- map_array = "map<string, map<string, array<int>>>"
- array = "array<tinyint>"
- string = string
- boolean = boolean
- tinyint = tinyint
- smallint = smallint
- int = int
- bigint = bigint
- float = float
- double = double
- decimal = "decimal(30, 8)"
- null = "null"
- bytes = bytes
- date = date
- time = time
- timestamp = timestamp
+FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
}
-}
\ No newline at end of file
+ result_table_name = "fake"
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
index 6716f00cd..58848a91c 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
@@ -15,23 +15,30 @@
# limitations under the License.
#
-schema {
- fields {
- map = "map<string, string>"
- array = "array<tinyint>"
- string = string
- boolean = boolean
- tinyint = tinyint
- smallint = smallint
- int = int
- bigint = bigint
- float = float
- double = double
- decimal = "decimal(30, 8)"
- null = "null"
- bytes = bytes
- date = date
- time = time
- timestamp = timestamp
+FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
+ result_table_name = "fake"
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 33ed73bc5..3134cfb40 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -50,11 +50,11 @@ sink {
},
{
rule_type = MIN_LENGTH
- rule_value = 10
+ rule_value = 5
},
{
rule_type = MAX_LENGTH
- rule_value = 10
+ rule_value = 65535
}
]
},{
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index b2fda476b..e2a1cb128 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -61,7 +61,7 @@ sink {
},
{
rule_type = MIN_LENGTH
- rule_value = 10
+ rule_value = 5
},
{
rule_type = MAX_LENGTH